From e9251a9325595f40aa8665bf976e8058ee0774af Mon Sep 17 00:00:00 2001 From: keorn Date: Tue, 24 Jan 2017 22:03:03 +0100 Subject: [PATCH] Generic engine utilities (#4258) * move modules up * make structs generic * reound to view and tests * fix --- ethcore/src/engines/mod.rs | 2 + ethcore/src/engines/tendermint/message.rs | 77 ++-- ethcore/src/engines/tendermint/mod.rs | 132 +++---- ethcore/src/engines/tendermint/params.rs | 38 +- ethcore/src/engines/tendermint/transition.rs | 102 ------ .../src/engines/tendermint/vote_collector.rs | 307 ---------------- ethcore/src/engines/transition.rs | 78 ++++ ethcore/src/engines/vote_collector.rs | 345 ++++++++++++++++++ 8 files changed, 580 insertions(+), 501 deletions(-) delete mode 100644 ethcore/src/engines/tendermint/transition.rs delete mode 100644 ethcore/src/engines/tendermint/vote_collector.rs create mode 100644 ethcore/src/engines/transition.rs create mode 100644 ethcore/src/engines/vote_collector.rs diff --git a/ethcore/src/engines/mod.rs b/ethcore/src/engines/mod.rs index 23d382cac..77a97a4ca 100644 --- a/ethcore/src/engines/mod.rs +++ b/ethcore/src/engines/mod.rs @@ -16,6 +16,8 @@ //! Consensus engine specification and basic implementations. +mod transition; +mod vote_collector; mod null_engine; mod instant_seal; mod basic_authority; diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs index ff258185d..3a752a2d1 100644 --- a/ethcore/src/engines/tendermint/message.rs +++ b/ethcore/src/engines/tendermint/message.rs @@ -17,14 +17,15 @@ //! Tendermint message handling. use util::*; -use super::{Height, Round, BlockHash, Step}; +use super::{Height, View, BlockHash, Step}; use error::Error; use header::Header; -use rlp::*; +use rlp::{Rlp, UntrustedRlp, RlpStream, Stream, Encodable, Decodable, Decoder, DecoderError, View as RlpView}; use ethkey::{recover, public_to_address}; +use super::super::vote_collector::Message; /// Message transmitted between consensus participants. -#[derive(Debug, PartialEq, Eq, Clone, Hash)] +#[derive(Debug, PartialEq, Eq, Clone, Hash, Default)] pub struct ConsensusMessage { pub vote_step: VoteStep, pub block_hash: Option, @@ -35,42 +36,55 @@ pub struct ConsensusMessage { #[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct VoteStep { pub height: Height, - pub round: Round, + pub view: View, pub step: Step, } + impl VoteStep { - pub fn new(height: Height, round: Round, step: Step) -> Self { - VoteStep { height: height, round: round, step: step } + pub fn new(height: Height, view: View, step: Step) -> Self { + VoteStep { height: height, view: view, step: step } } pub fn is_height(&self, height: Height) -> bool { self.height == height } - pub fn is_round(&self, height: Height, round: Round) -> bool { - self.height == height && self.round == round + pub fn is_view(&self, height: Height, view: View) -> bool { + self.height == height && self.view == view } } -/// Header consensus round. -pub fn consensus_round(header: &Header) -> Result { - let round_rlp = header.seal().get(0).expect("seal passed basic verification; seal has 3 fields; qed"); - UntrustedRlp::new(round_rlp.as_slice()).as_val() +/// Header consensus view. +pub fn consensus_view(header: &Header) -> Result { + let view_rlp = header.seal().get(0).expect("seal passed basic verification; seal has 3 fields; qed"); + UntrustedRlp::new(view_rlp.as_slice()).as_val() +} + +impl Message for ConsensusMessage { + type Round = VoteStep; + + fn signature(&self) -> H520 { self.signature } + + fn block_hash(&self) -> Option { self.block_hash } + + fn round(&self) -> &VoteStep { &self.vote_step } + + fn is_broadcastable(&self) -> bool { self.vote_step.step.is_pre() } } impl ConsensusMessage { - pub fn new(signature: H520, height: Height, round: Round, step: Step, block_hash: Option) -> Self { + pub fn new(signature: H520, height: Height, view: View, step: Step, block_hash: Option) -> Self { ConsensusMessage { signature: signature, block_hash: block_hash, - vote_step: VoteStep::new(height, round, step), + vote_step: VoteStep::new(height, view, step), } } pub fn new_proposal(header: &Header) -> Result { Ok(ConsensusMessage { - vote_step: VoteStep::new(header.number() as Height, consensus_round(header)?, Step::Propose), + vote_step: VoteStep::new(header.number() as Height, consensus_view(header)?, Step::Propose), signature: UntrustedRlp::new(header.seal().get(1).expect("seal passed basic verification; seal has 3 fields; qed").as_slice()).as_val()?, block_hash: Some(header.bare_hash()), }) @@ -100,6 +114,12 @@ impl ConsensusMessage { } } +impl Default for VoteStep { + fn default() -> Self { + VoteStep::new(0, 0, Step::Propose) + } +} + impl PartialOrd for VoteStep { fn partial_cmp(&self, m: &VoteStep) -> Option { Some(self.cmp(m)) @@ -110,8 +130,8 @@ impl Ord for VoteStep { fn cmp(&self, m: &VoteStep) -> Ordering { if self.height != m.height { self.height.cmp(&m.height) - } else if self.round != m.round { - self.round.cmp(&m.round) + } else if self.view != m.view { + self.view.cmp(&m.view) } else { self.step.number().cmp(&m.step.number()) } @@ -146,7 +166,7 @@ impl Encodable for Step { } } -/// (signature, (height, round, step, block_hash)) +/// (signature, (height, view, step, block_hash)) impl Decodable for ConsensusMessage { fn decode(decoder: &D) -> Result where D: Decoder { let rlp = decoder.as_rlp(); @@ -175,7 +195,7 @@ impl Encodable for ConsensusMessage { pub fn message_info_rlp(vote_step: &VoteStep, block_hash: Option) -> Bytes { // TODO: figure out whats wrong with nested list encoding let mut s = RlpStream::new_list(5); - s.append(&vote_step.height).append(&vote_step.round).append(&vote_step.step).append(&block_hash.unwrap_or_else(H256::zero)); + s.append(&vote_step.height).append(&vote_step.view).append(&vote_step.step).append(&block_hash.unwrap_or_else(H256::zero)); s.out() } @@ -189,11 +209,11 @@ pub fn message_full_rlp(signature: &H520, vote_info: &Bytes) -> Bytes { mod tests { use util::*; use rlp::*; - use super::super::Step; - use super::*; + use ethkey::Secret; use account_provider::AccountProvider; use header::Header; - use ethkey::Secret; + use super::super::Step; + use super::*; #[test] fn encode_decode() { @@ -201,7 +221,7 @@ mod tests { signature: H520::default(), vote_step: VoteStep { height: 10, - round: 123, + view: 123, step: Step::Precommit, }, block_hash: Some("1".sha3()) @@ -214,7 +234,7 @@ mod tests { signature: H520::default(), vote_step: VoteStep { height: 1314, - round: 0, + view: 0, step: Step::Prevote, }, block_hash: None @@ -255,7 +275,7 @@ mod tests { signature: Default::default(), vote_step: VoteStep { height: 0, - round: 0, + view: 0, step: Step::Propose, }, block_hash: Some(header.bare_hash()) @@ -275,4 +295,11 @@ mod tests { assert_eq!(pro.precommit_hash(), pre.sha3()); } + + #[test] + fn step_ordering() { + assert!(VoteStep::new(10, 123, Step::Precommit) < VoteStep::new(11, 123, Step::Precommit)); + assert!(VoteStep::new(10, 123, Step::Propose) < VoteStep::new(11, 123, Step::Precommit)); + assert!(VoteStep::new(10, 122, Step::Propose) < VoteStep::new(11, 123, Step::Propose)); + } } diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 7dae44aaa..7adcd51e5 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -15,17 +15,15 @@ // along with Parity. If not, see . /// Tendermint BFT consensus engine with round robin proof-of-authority. -/// At each blockchain `Height` there can be multiple `Round`s of voting. -/// Signatures always sign `Height`, `Round`, `Step` and `BlockHash` which is a block hash without seal. +/// At each blockchain `Height` there can be multiple `View`s of voting. +/// Signatures always sign `Height`, `View`, `Step` and `BlockHash` which is a block hash without seal. /// First a block with `Seal::Proposal` is issued by the designated proposer. -/// Next the `Round` proceeds through `Prevote` and `Precommit` `Step`s. -/// Block is issued when there is enough `Precommit` votes collected on a particular block at the end of a `Round`. +/// Next the `View` proceeds through `Prevote` and `Precommit` `Step`s. +/// Block is issued when there is enough `Precommit` votes collected on a particular block at the end of a `View`. /// Once enough votes have been gathered the proposer issues that block in the `Commit` step. mod message; -mod transition; mod params; -mod vote_collector; use std::sync::Weak; use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; @@ -35,7 +33,7 @@ use error::{Error, BlockError}; use header::Header; use builtin::Builtin; use env_info::EnvInfo; -use rlp::{UntrustedRlp, View}; +use rlp::{UntrustedRlp, View as RlpView}; use ethkey::{recover, public_to_address, Signature}; use account_provider::AccountProvider; use block::*; @@ -46,10 +44,10 @@ use state::CleanupMode; use io::IoService; use super::signer::EngineSigner; use super::validator_set::{ValidatorSet, new_validator_set}; +use super::transition::TransitionHandler; +use super::vote_collector::VoteCollector; use self::message::*; -use self::transition::TransitionHandler; use self::params::TendermintParams; -use self::vote_collector::VoteCollector; #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub enum Step { @@ -69,7 +67,7 @@ impl Step { } pub type Height = usize; -pub type Round = usize; +pub type View = usize; pub type BlockHash = H256; /// Engine using `Tendermint` consensus algorithm, suitable for EVM chain. @@ -82,17 +80,17 @@ pub struct Tendermint { block_reward: U256, /// Blockchain height. height: AtomicUsize, - /// Consensus round. - round: AtomicUsize, + /// Consensus view. + view: AtomicUsize, /// Consensus step. step: RwLock, /// Vote accumulator. - votes: VoteCollector, + votes: VoteCollector, /// Used to sign messages and proposals. signer: EngineSigner, /// Message for the last PoLC. lock_change: RwLock>, - /// Last lock round. + /// Last lock view. last_lock: AtomicUsize, /// Bare hash of the proposed block, used for seal submission. proposal: RwLock>, @@ -112,16 +110,16 @@ impl Tendermint { step_service: IoService::::start()?, block_reward: our_params.block_reward, height: AtomicUsize::new(1), - round: AtomicUsize::new(0), + view: AtomicUsize::new(0), step: RwLock::new(Step::Propose), - votes: VoteCollector::new(), + votes: VoteCollector::default(), signer: Default::default(), lock_change: RwLock::new(None), last_lock: AtomicUsize::new(0), proposal: RwLock::new(None), validators: new_validator_set(our_params.validators), }); - let handler = TransitionHandler::new(Arc::downgrade(&engine), our_params.timeouts); + let handler = TransitionHandler::new(Arc::downgrade(&engine) as Weak, Box::new(our_params.timeouts)); engine.step_service.register_handler(Arc::new(handler))?; Ok(engine) } @@ -152,7 +150,7 @@ impl Tendermint { fn generate_message(&self, block_hash: Option) -> Option { let h = self.height.load(AtomicOrdering::SeqCst); - let r = self.round.load(AtomicOrdering::SeqCst); + let r = self.view.load(AtomicOrdering::SeqCst); let s = self.step.read(); let vote_info = message_info_rlp(&VoteStep::new(h, r, *s), block_hash); match self.signer.sign(vote_info.sha3()).map(Into::into) { @@ -181,7 +179,7 @@ impl Tendermint { /// Broadcast all messages since last issued block to get the peers up to speed. fn broadcast_old_messages(&self) { - for m in self.votes.get_up_to(self.height.load(AtomicOrdering::SeqCst)).into_iter() { + for m in self.votes.get_up_to(&VoteStep::new(self.height.load(AtomicOrdering::SeqCst), self.view.load(AtomicOrdering::SeqCst), Step::Precommit)).into_iter() { self.broadcast_message(m); } } @@ -191,7 +189,7 @@ impl Tendermint { debug!(target: "poa", "Received a Commit, transitioning to height {}.", new_height); self.last_lock.store(0, AtomicOrdering::SeqCst); self.height.store(new_height, AtomicOrdering::SeqCst); - self.round.store(0, AtomicOrdering::SeqCst); + self.view.store(0, AtomicOrdering::SeqCst); *self.lock_change.write() = None; } @@ -208,7 +206,7 @@ impl Tendermint { }, Step::Prevote => { let block_hash = match *self.lock_change.read() { - Some(ref m) if !self.should_unlock(m.vote_step.round) => m.block_hash, + Some(ref m) if !self.should_unlock(m.vote_step.view) => m.block_hash, _ => self.proposal.read().clone(), }; self.generate_and_broadcast_message(block_hash); @@ -216,9 +214,9 @@ impl Tendermint { Step::Precommit => { trace!(target: "poa", "to_step: Precommit."); let block_hash = match *self.lock_change.read() { - Some(ref m) if self.is_round(m) && m.block_hash.is_some() => { - trace!(target: "poa", "Setting last lock: {}", m.vote_step.round); - self.last_lock.store(m.vote_step.round, AtomicOrdering::SeqCst); + Some(ref m) if self.is_view(m) && m.block_hash.is_some() => { + trace!(target: "poa", "Setting last lock: {}", m.vote_step.view); + self.last_lock.store(m.vote_step.view, AtomicOrdering::SeqCst); m.block_hash }, _ => None, @@ -228,15 +226,17 @@ impl Tendermint { Step::Commit => { trace!(target: "poa", "to_step: Commit."); // Commit the block using a complete signature set. - let round = self.round.load(AtomicOrdering::SeqCst); + let view = self.view.load(AtomicOrdering::SeqCst); let height = self.height.load(AtomicOrdering::SeqCst); if let Some(block_hash) = *self.proposal.read() { // Generate seal and remove old votes. if self.is_signer_proposer() { - if let Some(seal) = self.votes.seal_signatures(height, round, &block_hash) { + let proposal_step = VoteStep::new(height, view, Step::Propose); + let precommit_step = VoteStep::new(proposal_step.height, proposal_step.view, Step::Precommit); + if let Some(seal) = self.votes.seal_signatures(proposal_step, precommit_step, &block_hash) { trace!(target: "poa", "Collected seal: {:?}", seal); let seal = vec![ - ::rlp::encode(&round).to_vec(), + ::rlp::encode(&view).to_vec(), ::rlp::encode(&seal.proposal).to_vec(), ::rlp::encode(&seal.votes).to_vec() ]; @@ -259,16 +259,16 @@ impl Tendermint { n > self.validators.count() * 2/3 } - /// Find the designated for the given round. - fn round_proposer(&self, height: Height, round: Round) -> Address { - let proposer_nonce = height + round; + /// Find the designated for the given view. + fn view_proposer(&self, height: Height, view: View) -> Address { + let proposer_nonce = height + view; trace!(target: "poa", "Proposer nonce: {}", proposer_nonce); self.validators.get(proposer_nonce) } - /// Check if address is a proposer for given round. - fn is_round_proposer(&self, height: Height, round: Round, address: &Address) -> Result<(), EngineError> { - let proposer = self.round_proposer(height, round); + /// Check if address is a proposer for given view. + fn is_view_proposer(&self, height: Height, view: View, address: &Address) -> Result<(), EngineError> { + let proposer = self.view_proposer(height, view); if proposer == *address { Ok(()) } else { @@ -278,7 +278,7 @@ impl Tendermint { /// Check if current signer is the current proposer. fn is_signer_proposer(&self) -> bool { - let proposer = self.round_proposer(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst)); + let proposer = self.view_proposer(self.height.load(AtomicOrdering::SeqCst), self.view.load(AtomicOrdering::SeqCst)); self.signer.is_address(&proposer) } @@ -286,29 +286,29 @@ impl Tendermint { message.vote_step.is_height(self.height.load(AtomicOrdering::SeqCst)) } - fn is_round(&self, message: &ConsensusMessage) -> bool { - message.vote_step.is_round(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst)) + fn is_view(&self, message: &ConsensusMessage) -> bool { + message.vote_step.is_view(self.height.load(AtomicOrdering::SeqCst), self.view.load(AtomicOrdering::SeqCst)) } - fn increment_round(&self, n: Round) { - trace!(target: "poa", "increment_round: New round."); - self.round.fetch_add(n, AtomicOrdering::SeqCst); + fn increment_view(&self, n: View) { + trace!(target: "poa", "increment_view: New view."); + self.view.fetch_add(n, AtomicOrdering::SeqCst); } - fn should_unlock(&self, lock_change_round: Round) -> bool { - self.last_lock.load(AtomicOrdering::SeqCst) < lock_change_round - && lock_change_round < self.round.load(AtomicOrdering::SeqCst) + fn should_unlock(&self, lock_change_view: View) -> bool { + self.last_lock.load(AtomicOrdering::SeqCst) < lock_change_view + && lock_change_view < self.view.load(AtomicOrdering::SeqCst) } fn has_enough_any_votes(&self) -> bool { - let step_votes = self.votes.count_step_votes(&VoteStep::new(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), *self.step.read())); + let step_votes = self.votes.count_round_votes(&VoteStep::new(self.height.load(AtomicOrdering::SeqCst), self.view.load(AtomicOrdering::SeqCst), *self.step.read())); self.is_above_threshold(step_votes) } fn has_enough_future_step_votes(&self, vote_step: &VoteStep) -> bool { - if vote_step.round > self.round.load(AtomicOrdering::SeqCst) { - let step_votes = self.votes.count_step_votes(vote_step); + if vote_step.view > self.view.load(AtomicOrdering::SeqCst) { + let step_votes = self.votes.count_round_votes(vote_step); self.is_above_threshold(step_votes) } else { false @@ -339,21 +339,21 @@ impl Tendermint { let next_step = match *self.step.read() { Step::Precommit if self.has_enough_aligned_votes(message) => { if message.block_hash.is_none() { - self.increment_round(1); + self.increment_view(1); Some(Step::Propose) } else { Some(Step::Commit) } }, Step::Precommit if self.has_enough_future_step_votes(&vote_step) => { - self.increment_round(vote_step.round - self.round.load(AtomicOrdering::SeqCst)); + self.increment_view(vote_step.view - self.view.load(AtomicOrdering::SeqCst)); Some(Step::Precommit) }, // Avoid counting twice. Step::Prevote if lock_change => Some(Step::Precommit), Step::Prevote if self.has_enough_aligned_votes(message) => Some(Step::Precommit), Step::Prevote if self.has_enough_future_step_votes(&vote_step) => { - self.increment_round(vote_step.round - self.round.load(AtomicOrdering::SeqCst)); + self.increment_view(vote_step.view - self.view.load(AtomicOrdering::SeqCst)); Some(Step::Prevote) }, _ => None, @@ -370,7 +370,7 @@ impl Tendermint { impl Engine for Tendermint { fn name(&self) -> &str { "Tendermint" } fn version(&self) -> SemanticVersion { SemanticVersion::new(1, 0, 0) } - /// (consensus round, proposal signature, authority signatures) + /// (consensus view, proposal signature, authority signatures) fn seal_fields(&self) -> usize { 3 } fn params(&self) -> &CommonParams { &self.params } @@ -385,7 +385,7 @@ impl Engine for Tendermint { map![ "signature".into() => message.signature.to_string(), "height".into() => message.vote_step.height.to_string(), - "round".into() => message.vote_step.round.to_string(), + "view".into() => message.vote_step.view.to_string(), "block_hash".into() => message.block_hash.as_ref().map(ToString::to_string).unwrap_or("".into()) ] } @@ -395,8 +395,8 @@ impl Engine for Tendermint { } fn populate_from_parent(&self, header: &mut Header, parent: &Header, gas_floor_target: U256, _gas_ceil_target: U256) { - // Chain scoring: total weight is sqrt(U256::max_value())*height - round - let new_difficulty = U256::from(U128::max_value()) + consensus_round(parent).expect("Header has been verified; qed").into() - self.round.load(AtomicOrdering::SeqCst).into(); + // Chain scoring: total weight is sqrt(U256::max_value())*height - view + let new_difficulty = U256::from(U128::max_value()) + consensus_view(parent).expect("Header has been verified; qed").into() - self.view.load(AtomicOrdering::SeqCst).into(); header.set_difficulty(new_difficulty); header.set_gas_limit({ let gas_limit = parent.gas_limit().clone(); @@ -424,17 +424,17 @@ impl Engine for Tendermint { } let height = header.number() as Height; - let round = self.round.load(AtomicOrdering::SeqCst); + let view = self.view.load(AtomicOrdering::SeqCst); let bh = Some(header.bare_hash()); - let vote_info = message_info_rlp(&VoteStep::new(height, round, Step::Propose), bh.clone()); + let vote_info = message_info_rlp(&VoteStep::new(height, view, Step::Propose), bh.clone()); if let Ok(signature) = self.signer.sign(vote_info.sha3()).map(Into::into) { // Insert Propose vote. - debug!(target: "poa", "Submitting proposal {} at height {} round {}.", header.bare_hash(), height, round); - self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), author); + debug!(target: "poa", "Submitting proposal {} at height {} view {}.", header.bare_hash(), height, view); + self.votes.vote(ConsensusMessage::new(signature, height, view, Step::Propose, bh), author); // Remember proposal for later seal submission. *self.proposal.write() = bh; Seal::Proposal(vec![ - ::rlp::encode(&round).to_vec(), + ::rlp::encode(&view).to_vec(), ::rlp::encode(&signature).to_vec(), ::rlp::EMPTY_LIST_RLP.to_vec() ]) @@ -535,7 +535,7 @@ impl Engine for Tendermint { found: signatures_len }))?; } - self.is_round_proposer(proposal.vote_step.height, proposal.vote_step.round, &proposer)?; + self.is_view_proposer(proposal.vote_step.height, proposal.vote_step.view, &proposer)?; } Ok(()) } @@ -583,7 +583,7 @@ impl Engine for Tendermint { } let proposer = proposal.verify().expect("block went through full verification; this Engine tries verify; qed"); debug!(target: "poa", "Received a new proposal {:?} from {}.", proposal.vote_step, proposer); - if self.is_round(&proposal) { + if self.is_view(&proposal) { *self.proposal.write() = proposal.block_hash.clone(); } self.votes.vote(proposal, &proposer); @@ -597,7 +597,7 @@ impl Engine for Tendermint { trace!(target: "poa", "Propose timeout."); if self.proposal.read().is_none() { // Report the proposer if no proposal was received. - let current_proposer = self.round_proposer(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst)); + let current_proposer = self.view_proposer(self.height.load(AtomicOrdering::SeqCst), self.view.load(AtomicOrdering::SeqCst)); self.validators.report_benign(¤t_proposer); } Step::Prevote @@ -613,7 +613,7 @@ impl Engine for Tendermint { }, Step::Precommit if self.has_enough_any_votes() => { trace!(target: "poa", "Precommit timeout."); - self.increment_round(1); + self.increment_view(1); Step::Propose }, Step::Precommit => { @@ -673,19 +673,19 @@ mod tests { } } - fn vote(engine: &Engine, signer: F, height: usize, round: usize, step: Step, block_hash: Option) -> Bytes where F: FnOnce(H256) -> Result { - let mi = message_info_rlp(&VoteStep::new(height, round, step), block_hash); + fn vote(engine: &Engine, signer: F, height: usize, view: usize, step: Step, block_hash: Option) -> Bytes where F: FnOnce(H256) -> Result { + let mi = message_info_rlp(&VoteStep::new(height, view, step), block_hash); let m = message_full_rlp(&signer(mi.sha3()).unwrap().into(), &mi); engine.handle_message(&m).unwrap(); m } - fn proposal_seal(tap: &Arc, header: &Header, round: Round) -> Vec { + fn proposal_seal(tap: &Arc, header: &Header, view: View) -> Vec { let author = header.author(); - let vote_info = message_info_rlp(&VoteStep::new(header.number() as Height, round, Step::Propose), Some(header.bare_hash())); + let vote_info = message_info_rlp(&VoteStep::new(header.number() as Height, view, Step::Propose), Some(header.bare_hash())); let signature = tap.sign(*author, None, vote_info.sha3()).unwrap(); vec![ - ::rlp::encode(&round).to_vec(), + ::rlp::encode(&view).to_vec(), ::rlp::encode(&H520::from(signature)).to_vec(), ::rlp::EMPTY_LIST_RLP.to_vec() ] diff --git a/ethcore/src/engines/tendermint/params.rs b/ethcore/src/engines/tendermint/params.rs index aad3a4b7a..d03a77046 100644 --- a/ethcore/src/engines/tendermint/params.rs +++ b/ethcore/src/engines/tendermint/params.rs @@ -17,9 +17,10 @@ //! Tendermint specific parameters. use ethjson; -use super::transition::TendermintTimeouts; use util::{U256, Uint}; use time::Duration; +use super::super::transition::Timeouts; +use super::Step; /// `Tendermint` params. #[derive(Debug)] @@ -34,6 +35,41 @@ pub struct TendermintParams { pub block_reward: U256, } +/// Base timeout of each step in ms. +#[derive(Debug, Clone)] +pub struct TendermintTimeouts { + pub propose: Duration, + pub prevote: Duration, + pub precommit: Duration, + pub commit: Duration, +} + +impl Default for TendermintTimeouts { + fn default() -> Self { + TendermintTimeouts { + propose: Duration::milliseconds(1000), + prevote: Duration::milliseconds(1000), + precommit: Duration::milliseconds(1000), + commit: Duration::milliseconds(1000), + } + } +} + +impl Timeouts for TendermintTimeouts { + fn initial(&self) -> Duration { + self.propose + } + + fn timeout(&self, step: &Step) -> Duration { + match *step { + Step::Propose => self.propose, + Step::Prevote => self.prevote, + Step::Precommit => self.precommit, + Step::Commit => self.commit, + } + } +} + fn to_duration(ms: ethjson::uint::Uint) -> Duration { let ms: usize = ms.into(); Duration::milliseconds(ms as i64) diff --git a/ethcore/src/engines/tendermint/transition.rs b/ethcore/src/engines/tendermint/transition.rs deleted file mode 100644 index 36fc98174..000000000 --- a/ethcore/src/engines/tendermint/transition.rs +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -//! Tendermint timeout handling. - -use std::sync::Weak; -use time::Duration; -use io::{IoContext, IoHandler, TimerToken}; -use super::{Tendermint, Step}; -use engines::Engine; - -pub struct TransitionHandler { - engine: Weak, - timeouts: TendermintTimeouts, -} - -impl TransitionHandler { - pub fn new(engine: Weak, timeouts: TendermintTimeouts) -> Self { - TransitionHandler { - engine: engine, - timeouts: timeouts, - } - } -} - -/// Base timeout of each step in ms. -#[derive(Debug, Clone)] -pub struct TendermintTimeouts { - pub propose: Duration, - pub prevote: Duration, - pub precommit: Duration, - pub commit: Duration, -} - -impl TendermintTimeouts { - pub fn for_step(&self, step: Step) -> Duration { - match step { - Step::Propose => self.propose, - Step::Prevote => self.prevote, - Step::Precommit => self.precommit, - Step::Commit => self.commit, - } - } -} - -impl Default for TendermintTimeouts { - fn default() -> Self { - TendermintTimeouts { - propose: Duration::milliseconds(1000), - prevote: Duration::milliseconds(1000), - precommit: Duration::milliseconds(1000), - commit: Duration::milliseconds(1000), - } - } -} - -/// Timer token representing the consensus step timeouts. -pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; - -fn set_timeout(io: &IoContext, timeout: Duration) { - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout.num_milliseconds() as u64) - .unwrap_or_else(|e| warn!(target: "poa", "Failed to set consensus step timeout: {}.", e)) -} - -impl IoHandler for TransitionHandler { - fn initialize(&self, io: &IoContext) { - set_timeout(io, self.timeouts.propose) - } - - fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if timer == ENGINE_TIMEOUT_TOKEN { - if let Some(engine) = self.engine.upgrade() { - engine.step(); - } - } - } - - fn message(&self, io: &IoContext, next_step: &Step) { - if let Err(io_err) = io.clear_timer(ENGINE_TIMEOUT_TOKEN) { - warn!(target: "poa", "Could not remove consensus timer {}.", io_err) - } - match *next_step { - Step::Propose => set_timeout(io, self.timeouts.propose), - Step::Prevote => set_timeout(io, self.timeouts.prevote), - Step::Precommit => set_timeout(io, self.timeouts.precommit), - Step::Commit => set_timeout(io, self.timeouts.commit), - }; - } -} diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs deleted file mode 100644 index 758dfb5a3..000000000 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -//! Collects votes on hashes at each height and round. - -use util::*; -use super::message::*; -use super::{Height, Round, Step, BlockHash}; - -#[derive(Debug)] -pub struct VoteCollector { - /// Storing all Proposals, Prevotes and Precommits. - votes: RwLock>, -} - -#[derive(Debug, Default)] -struct StepCollector { - voted: HashSet
, - pub block_votes: HashMap, HashMap>, - messages: HashSet, -} - -impl StepCollector { - /// Returns Some(&Address) when validator is double voting. - fn insert<'a>(&mut self, message: ConsensusMessage, address: &'a Address) -> Option<&'a Address> { - // Do nothing when message was seen. - if self.messages.insert(message.clone()) { - if self.voted.insert(address.clone()) { - self - .block_votes - .entry(message.block_hash) - .or_insert_with(HashMap::new) - .insert(message.signature, address.clone()); - } else { - // Bad validator sent a different message. - return Some(address); - } - } - None - } - - /// Count all votes for the given block hash at this step. - fn count_block(&self, block_hash: &Option) -> usize { - self.block_votes.get(block_hash).map_or(0, HashMap::len) - } - - /// Count all votes collected for the given step. - fn count(&self) -> usize { - self.block_votes.values().map(HashMap::len).sum() - } -} - -#[derive(Debug)] -pub struct SealSignatures { - pub proposal: H520, - pub votes: Vec, -} - -impl PartialEq for SealSignatures { - fn eq(&self, other: &SealSignatures) -> bool { - self.proposal == other.proposal - && self.votes.iter().collect::>() == other.votes.iter().collect::>() - } -} - -impl Eq for SealSignatures {} - -impl VoteCollector { - pub fn new() -> Self { - let mut collector = BTreeMap::new(); - // Insert dummy entry to fulfill invariant: "only messages newer than the oldest are inserted". - collector.insert(VoteStep::new(0, 0, Step::Propose), Default::default()); - VoteCollector { votes: RwLock::new(collector) } - } - - /// Insert vote if it is newer than the oldest one. - pub fn vote<'a>(&self, message: ConsensusMessage, voter: &'a Address) -> Option<&'a Address> { - self - .votes - .write() - .entry(message.vote_step.clone()) - .or_insert_with(Default::default) - .insert(message, voter) - } - - /// Checks if the message should be ignored. - pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool { - self - .votes - .read() - .get(&message.vote_step) - .map_or(false, |c| { - let is_known = c.messages.contains(message); - if is_known { trace!(target: "poa", "Known message: {:?}.", message); } - is_known - }) - || { - let guard = self.votes.read(); - let is_old = guard.keys().next().map_or(true, |oldest| message.vote_step <= *oldest); - if is_old { trace!(target: "poa", "Old message {:?}.", message); } - is_old - } - } - - /// Throws out messages older than message, leaves message as marker for the oldest. - pub fn throw_out_old(&self, vote_step: &VoteStep) { - let mut guard = self.votes.write(); - let new_collector = guard.split_off(vote_step); - *guard = new_collector; - } - - /// Collects the signatures used to seal a block. - pub fn seal_signatures(&self, height: Height, round: Round, block_hash: &H256) -> Option { - let ref bh = Some(*block_hash); - let precommit_step = VoteStep::new(height, round, Step::Precommit); - let maybe_seal = { - let guard = self.votes.read(); - guard - .get(&VoteStep::new(height, round, Step::Propose)) - .and_then(|c| c.block_votes.get(bh)) - .and_then(|proposals| proposals.keys().next()) - .map(|proposal| SealSignatures { - proposal: proposal.clone(), - votes: guard - .get(&precommit_step) - .and_then(|c| c.block_votes.get(bh)) - .map(|precommits| precommits.keys().cloned().collect()) - .unwrap_or_else(Vec::new), - }) - .and_then(|seal| if seal.votes.is_empty() { None } else { Some(seal) }) - }; - if maybe_seal.is_some() { - // Remove messages that are no longer relevant. - self.throw_out_old(&precommit_step); - } - maybe_seal - } - - /// Count votes which agree with the given message. - pub fn count_aligned_votes(&self, message: &ConsensusMessage) -> usize { - self - .votes - .read() - .get(&message.vote_step) - .map_or(0, |m| m.count_block(&message.block_hash)) - } - - /// Count all votes collected for a given step. - pub fn count_step_votes(&self, vote_step: &VoteStep) -> usize { - self.votes.read().get(vote_step).map_or(0, StepCollector::count) - } - - /// Get all messages older than the height. - pub fn get_up_to(&self, height: Height) -> Vec { - let guard = self.votes.read(); - guard - .iter() - .filter(|&(s, _)| s.step.is_pre()) - .take_while(|&(s, _)| s.height <= height) - .map(|(_, c)| c.messages.iter().map(|m| ::rlp::encode(m).to_vec()).collect::>()) - .fold(Vec::new(), |mut acc, mut messages| { acc.append(&mut messages); acc }) - } - - /// Retrieve address from which the message was sent from cache. - pub fn get(&self, message: &ConsensusMessage) -> Option
{ - let guard = self.votes.read(); - guard.get(&message.vote_step).and_then(|c| c.block_votes.get(&message.block_hash)).and_then(|origins| origins.get(&message.signature).cloned()) - } -} - -#[cfg(test)] -mod tests { - use util::*; - use super::*; - use super::super::{BlockHash, Step}; - use super::super::message::*; - - fn random_vote(collector: &VoteCollector, signature: H520, vote_step: VoteStep, block_hash: Option) -> bool { - full_vote(collector, signature, vote_step, block_hash, &H160::random()).is_none() - } - - fn full_vote<'a>(collector: &VoteCollector, signature: H520, vote_step: VoteStep, block_hash: Option, address: &'a Address) -> Option<&'a Address> { - collector.vote(ConsensusMessage { signature: signature, vote_step: vote_step, block_hash: block_hash }, address) - } - - #[test] - fn seal_retrieval() { - let collector = VoteCollector::new(); - let bh = Some("1".sha3()); - let h = 1; - let r = 2; - let mut signatures = Vec::new(); - for _ in 0..5 { - signatures.push(H520::random()); - } - let propose_step = VoteStep::new(h, r, Step::Propose); - let prevote_step = VoteStep::new(h, r, Step::Prevote); - let precommit_step = VoteStep::new(h, r, Step::Precommit); - // Wrong height proposal. - random_vote(&collector, signatures[4].clone(), VoteStep::new(h - 1, r, Step::Propose), bh.clone()); - // Good proposal - random_vote(&collector, signatures[0].clone(), propose_step.clone(), bh.clone()); - // Wrong block proposal. - random_vote(&collector, signatures[0].clone(), propose_step.clone(), Some("0".sha3())); - // Wrong block precommit. - random_vote(&collector, signatures[3].clone(), precommit_step.clone(), Some("0".sha3())); - // Wrong round proposal. - random_vote(&collector, signatures[0].clone(), VoteStep::new(h, r - 1, Step::Propose), bh.clone()); - // Prevote. - random_vote(&collector, signatures[0].clone(), prevote_step.clone(), bh.clone()); - // Relevant precommit. - random_vote(&collector, signatures[2].clone(), precommit_step.clone(), bh.clone()); - // Replcated vote. - random_vote(&collector, signatures[2].clone(), precommit_step.clone(), bh.clone()); - // Wrong round precommit. - random_vote(&collector, signatures[4].clone(), VoteStep::new(h, r + 1, Step::Precommit), bh.clone()); - // Wrong height precommit. - random_vote(&collector, signatures[3].clone(), VoteStep::new(h + 1, r, Step::Precommit), bh.clone()); - // Relevant precommit. - random_vote(&collector, signatures[1].clone(), precommit_step.clone(), bh.clone()); - // Wrong round precommit, same signature. - random_vote(&collector, signatures[1].clone(), VoteStep::new(h, r + 1, Step::Precommit), bh.clone()); - // Wrong round precommit. - random_vote(&collector, signatures[4].clone(), VoteStep::new(h, r - 1, Step::Precommit), bh.clone()); - let seal = SealSignatures { - proposal: signatures[0], - votes: signatures[1..3].to_vec() - }; - assert_eq!(seal, collector.seal_signatures(h, r, &bh.unwrap()).unwrap()); - } - - #[test] - fn count_votes() { - let collector = VoteCollector::new(); - let prevote_step = VoteStep::new(3, 2, Step::Prevote); - let precommit_step = VoteStep::new(3, 2, Step::Precommit); - // good prevote - random_vote(&collector, H520::random(), prevote_step.clone(), Some("0".sha3())); - random_vote(&collector, H520::random(), VoteStep::new(3, 1, Step::Prevote), Some("0".sha3())); - // good precommit - random_vote(&collector, H520::random(), precommit_step.clone(), Some("0".sha3())); - random_vote(&collector, H520::random(), VoteStep::new(3, 3, Step::Precommit), Some("0".sha3())); - // good prevote - random_vote(&collector, H520::random(), prevote_step.clone(), Some("1".sha3())); - // good prevote - let same_sig = H520::random(); - random_vote(&collector, same_sig.clone(), prevote_step.clone(), Some("1".sha3())); - random_vote(&collector, same_sig, prevote_step.clone(), Some("1".sha3())); - // good precommit - random_vote(&collector, H520::random(), precommit_step.clone(), Some("1".sha3())); - // good prevote - random_vote(&collector, H520::random(), prevote_step.clone(), Some("0".sha3())); - random_vote(&collector, H520::random(), VoteStep::new(2, 2, Step::Precommit), Some("2".sha3())); - - assert_eq!(collector.count_step_votes(&prevote_step), 4); - assert_eq!(collector.count_step_votes(&precommit_step), 2); - - let message = ConsensusMessage { - signature: H520::default(), - vote_step: prevote_step, - block_hash: Some("1".sha3()) - }; - assert_eq!(collector.count_aligned_votes(&message), 2); - } - - #[test] - fn remove_old() { - let collector = VoteCollector::new(); - let vote = |height, round, step, hash| { - random_vote(&collector, H520::random(), VoteStep::new(height, round, step), hash); - }; - vote(3, 2, Step::Prevote, Some("0".sha3())); - vote(3, 1, Step::Prevote, Some("0".sha3())); - vote(3, 3, Step::Precommit, Some("0".sha3())); - vote(3, 2, Step::Prevote, Some("1".sha3())); - vote(3, 2, Step::Prevote, Some("1".sha3())); - vote(3, 2, Step::Prevote, Some("0".sha3())); - vote(2, 2, Step::Precommit, Some("2".sha3())); - - collector.throw_out_old(&VoteStep::new(3, 2, Step::Precommit)); - assert_eq!(collector.votes.read().len(), 1); - } - - #[test] - fn malicious_authority() { - let collector = VoteCollector::new(); - let vote_step = VoteStep::new(3, 2, Step::Prevote); - // Vote is inserted fine. - assert!(full_vote(&collector, H520::random(), vote_step.clone(), Some("0".sha3()), &Address::default()).is_none()); - // Returns the double voting address. - full_vote(&collector, H520::random(), vote_step.clone(), Some("1".sha3()), &Address::default()).unwrap(); - assert_eq!(collector.count_step_votes(&vote_step), 1); - } -} diff --git a/ethcore/src/engines/transition.rs b/ethcore/src/engines/transition.rs new file mode 100644 index 000000000..dd1df74fa --- /dev/null +++ b/ethcore/src/engines/transition.rs @@ -0,0 +1,78 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Engine timeout transitioning calls `Engine.step()` on timeout. + +use std::sync::Weak; +use time::Duration; +use io::{IoContext, IoHandler, TimerToken}; +use engines::Engine; + +/// Timeouts lookup +pub trait Timeouts: Send + Sync { + /// Return the first timeout. + fn initial(&self) -> Duration; + + /// Get a timeout based on step. + fn timeout(&self, step: &S) -> Duration; +} + +/// Timeout transition handling. +pub struct TransitionHandler { + engine: Weak, + timeouts: Box>, +} + +impl TransitionHandler where S: Sync + Send + Clone { + /// New step caller by timeouts. + pub fn new(engine: Weak, timeouts: Box>) -> Self { + TransitionHandler { + engine: engine, + timeouts: timeouts, + } + } +} + +/// Timer token representing the consensus step timeouts. +pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; + +fn set_timeout(io: &IoContext, timeout: Duration) { + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout.num_milliseconds() as u64) + .unwrap_or_else(|e| warn!(target: "engine", "Failed to set consensus step timeout: {}.", e)) +} + +impl IoHandler for TransitionHandler where S: Sync + Send + Clone + 'static { + fn initialize(&self, io: &IoContext) { + set_timeout(io, self.timeouts.initial()); + } + + /// Call step after timeout. + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if timer == ENGINE_TIMEOUT_TOKEN { + if let Some(engine) = self.engine.upgrade() { + engine.step(); + } + } + } + + /// Set a new timer on message. + fn message(&self, io: &IoContext, next: &S) { + if let Err(io_err) = io.clear_timer(ENGINE_TIMEOUT_TOKEN) { + warn!(target: "engine", "Could not remove consensus timer {}.", io_err) + } + set_timeout(io, self.timeouts.timeout(next)); + } +} diff --git a/ethcore/src/engines/vote_collector.rs b/ethcore/src/engines/vote_collector.rs new file mode 100644 index 000000000..89e7aa345 --- /dev/null +++ b/ethcore/src/engines/vote_collector.rs @@ -0,0 +1,345 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Collects votes on hashes at each Message::Round. + +use std::fmt::Debug; +use util::*; +use rlp::Encodable; + +pub trait Message: Clone + PartialEq + Eq + Hash + Encodable + Debug { + type Round: Clone + PartialEq + Eq + Hash + Default + Debug + Ord; + + fn signature(&self) -> H520; + + fn block_hash(&self) -> Option; + + fn round(&self) -> &Self::Round; + + fn is_broadcastable(&self) -> bool; +} + +/// Storing all Proposals, Prevotes and Precommits. +#[derive(Debug)] +pub struct VoteCollector { + votes: RwLock>>, +} + +#[derive(Debug, Default)] +struct StepCollector { + voted: HashSet
, + pub block_votes: HashMap, HashMap>, + messages: HashSet, +} + +impl StepCollector { + /// Returns Some(&Address) when validator is double voting. + fn insert<'a>(&mut self, message: M, address: &'a Address) -> Option<&'a Address> { + // Do nothing when message was seen. + if self.messages.insert(message.clone()) { + if self.voted.insert(address.clone()) { + self + .block_votes + .entry(message.block_hash()) + .or_insert_with(HashMap::new) + .insert(message.signature(), address.clone()); + } else { + // Bad validator sent a different message. + return Some(address); + } + } + None + } + + /// Count all votes for the given block hash at this round. + fn count_block(&self, block_hash: &Option) -> usize { + self.block_votes.get(block_hash).map_or(0, HashMap::len) + } + + /// Count all votes collected for the given round. + fn count(&self) -> usize { + self.block_votes.values().map(HashMap::len).sum() + } +} + +#[derive(Debug)] +pub struct SealSignatures { + pub proposal: H520, + pub votes: Vec, +} + +impl PartialEq for SealSignatures { + fn eq(&self, other: &SealSignatures) -> bool { + self.proposal == other.proposal + && self.votes.iter().collect::>() == other.votes.iter().collect::>() + } +} + +impl Eq for SealSignatures {} + +impl Default for VoteCollector { + fn default() -> Self { + let mut collector = BTreeMap::new(); + // Insert dummy entry to fulfill invariant: "only messages newer than the oldest are inserted". + collector.insert(Default::default(), Default::default()); + VoteCollector { votes: RwLock::new(collector) } + } +} + +impl VoteCollector { + /// Insert vote if it is newer than the oldest one. + pub fn vote<'a>(&self, message: M, voter: &'a Address) -> Option<&'a Address> { + self + .votes + .write() + .entry(message.round().clone()) + .or_insert_with(Default::default) + .insert(message, voter) + } + + /// Checks if the message should be ignored. + pub fn is_old_or_known(&self, message: &M) -> bool { + self + .votes + .read() + .get(&message.round()) + .map_or(false, |c| { + let is_known = c.messages.contains(message); + if is_known { trace!(target: "poa", "Known message: {:?}.", message); } + is_known + }) + || { + let guard = self.votes.read(); + let is_old = guard.keys().next().map_or(true, |oldest| message.round() <= oldest); + if is_old { trace!(target: "poa", "Old message {:?}.", message); } + is_old + } + } + + /// Throws out messages older than message, leaves message as marker for the oldest. + pub fn throw_out_old(&self, vote_round: &M::Round) { + let mut guard = self.votes.write(); + let new_collector = guard.split_off(vote_round); + *guard = new_collector; + } + + /// Collects the signatures used to seal a block. + pub fn seal_signatures(&self, proposal_round: M::Round, commit_round: M::Round, block_hash: &H256) -> Option { + let ref bh = Some(*block_hash); + let maybe_seal = { + let guard = self.votes.read(); + guard + .get(&proposal_round) + .and_then(|c| c.block_votes.get(bh)) + .and_then(|proposals| proposals.keys().next()) + .map(|proposal| SealSignatures { + proposal: proposal.clone(), + votes: guard + .get(&commit_round) + .and_then(|c| c.block_votes.get(bh)) + .map(|precommits| precommits.keys().cloned().collect()) + .unwrap_or_else(Vec::new), + }) + .and_then(|seal| if seal.votes.is_empty() { None } else { Some(seal) }) + }; + if maybe_seal.is_some() { + // Remove messages that are no longer relevant. + self.throw_out_old(&commit_round); + } + maybe_seal + } + + /// Count votes which agree with the given message. + pub fn count_aligned_votes(&self, message: &M) -> usize { + self + .votes + .read() + .get(&message.round()) + .map_or(0, |m| m.count_block(&message.block_hash())) + } + + /// Count all votes collected for a given round. + pub fn count_round_votes(&self, vote_round: &M::Round) -> usize { + self.votes.read().get(vote_round).map_or(0, StepCollector::count) + } + + /// Get all messages older than the round. + pub fn get_up_to(&self, round: &M::Round) -> Vec { + let guard = self.votes.read(); + guard + .iter() + .take_while(|&(r, _)| r <= round) + .map(|(_, c)| c.messages.iter().filter(|m| m.is_broadcastable()).map(|m| ::rlp::encode(m).to_vec()).collect::>()) + .fold(Vec::new(), |mut acc, mut messages| { acc.append(&mut messages); acc }) + } + + /// Retrieve address from which the message was sent from cache. + pub fn get(&self, message: &M) -> Option
{ + let guard = self.votes.read(); + guard.get(&message.round()).and_then(|c| c.block_votes.get(&message.block_hash())).and_then(|origins| origins.get(&message.signature()).cloned()) + } + + /// Count the number of total rounds kept track of. + #[cfg(test)] + pub fn len(&self) -> usize { + self.votes.read().len() + } +} + +#[cfg(test)] +mod tests { + use util::*; + use rlp::*; + use super::*; + + #[derive(Debug, PartialEq, Eq, Clone, Hash, Default)] + struct TestMessage { + step: TestStep, + block_hash: Option, + signature: H520, + } + + type TestStep = u64; + + impl Message for TestMessage { + type Round = TestStep; + + fn signature(&self) -> H520 { self.signature } + + fn block_hash(&self) -> Option { self.block_hash } + + fn round(&self) -> &TestStep { &self.step } + + fn is_broadcastable(&self) -> bool { true } + } + + impl Encodable for TestMessage { + fn rlp_append(&self, s: &mut RlpStream) { + s.begin_list(3) + .append(&self.signature) + .append(&self.step) + .append(&self.block_hash.unwrap_or_else(H256::zero)); + } + } + + fn random_vote(collector: &VoteCollector, signature: H520, step: TestStep, block_hash: Option) -> bool { + full_vote(collector, signature, step, block_hash, &H160::random()).is_none() + } + + fn full_vote<'a>(collector: &VoteCollector, signature: H520, step: TestStep, block_hash: Option, address: &'a Address) -> Option<&'a Address> { + collector.vote(TestMessage { signature: signature, step: step, block_hash: block_hash }, address) + } + + #[test] + fn seal_retrieval() { + let collector = VoteCollector::default(); + let bh = Some("1".sha3()); + let mut signatures = Vec::new(); + for _ in 0..5 { + signatures.push(H520::random()); + } + let propose_round = 3; + let commit_round = 5; + // Wrong round. + random_vote(&collector, signatures[4].clone(), 1, bh.clone()); + // Good proposal + random_vote(&collector, signatures[0].clone(), propose_round.clone(), bh.clone()); + // Wrong block proposal. + random_vote(&collector, signatures[0].clone(), propose_round.clone(), Some("0".sha3())); + // Wrong block commit. + random_vote(&collector, signatures[3].clone(), commit_round.clone(), Some("0".sha3())); + // Wrong round. + random_vote(&collector, signatures[0].clone(), 6, bh.clone()); + // Wrong round. + random_vote(&collector, signatures[0].clone(), 4, bh.clone()); + // Relevant commit. + random_vote(&collector, signatures[2].clone(), commit_round.clone(), bh.clone()); + // Replicated vote. + random_vote(&collector, signatures[2].clone(), commit_round.clone(), bh.clone()); + // Wrong round. + random_vote(&collector, signatures[4].clone(), 6, bh.clone()); + // Relevant precommit. + random_vote(&collector, signatures[1].clone(), commit_round.clone(), bh.clone()); + // Wrong round, same signature. + random_vote(&collector, signatures[1].clone(), 7, bh.clone()); + let seal = SealSignatures { + proposal: signatures[0], + votes: signatures[1..3].to_vec() + }; + assert_eq!(seal, collector.seal_signatures(propose_round, commit_round, &bh.unwrap()).unwrap()); + } + + #[test] + fn count_votes() { + let collector = VoteCollector::default(); + let round1 = 1; + let round3 = 3; + // good 1 + random_vote(&collector, H520::random(), round1, Some("0".sha3())); + random_vote(&collector, H520::random(), 0, Some("0".sha3())); + // good 3 + random_vote(&collector, H520::random(), round3, Some("0".sha3())); + random_vote(&collector, H520::random(), 2, Some("0".sha3())); + // good prevote + random_vote(&collector, H520::random(), round1, Some("1".sha3())); + // good prevote + let same_sig = H520::random(); + random_vote(&collector, same_sig.clone(), round1, Some("1".sha3())); + random_vote(&collector, same_sig, round1, Some("1".sha3())); + // good precommit + random_vote(&collector, H520::random(), round3, Some("1".sha3())); + // good prevote + random_vote(&collector, H520::random(), round1, Some("0".sha3())); + random_vote(&collector, H520::random(), 4, Some("2".sha3())); + + assert_eq!(collector.count_round_votes(&round1), 4); + assert_eq!(collector.count_round_votes(&round3), 2); + + let message = TestMessage { + signature: H520::default(), + step: round1, + block_hash: Some("1".sha3()) + }; + assert_eq!(collector.count_aligned_votes(&message), 2); + } + + #[test] + fn remove_old() { + let collector = VoteCollector::default(); + let vote = |round, hash| { + random_vote(&collector, H520::random(), round, hash); + }; + vote(6, Some("0".sha3())); + vote(3, Some("0".sha3())); + vote(7, Some("0".sha3())); + vote(8, Some("1".sha3())); + vote(1, Some("1".sha3())); + + collector.throw_out_old(&7); + assert_eq!(collector.len(), 2); + } + + #[test] + fn malicious_authority() { + let collector = VoteCollector::default(); + let round = 3; + // Vote is inserted fine. + assert!(full_vote(&collector, H520::random(), round, Some("0".sha3()), &Address::default()).is_none()); + // Returns the double voting address. + full_vote(&collector, H520::random(), round, Some("1".sha3()), &Address::default()).unwrap(); + assert_eq!(collector.count_round_votes(&round), 1); + } +}