diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs index f69d662cd..1f17ed902 100644 --- a/ethcore/src/engines/tendermint/message.rs +++ b/ethcore/src/engines/tendermint/message.rs @@ -23,15 +23,35 @@ use header::Header; use rlp::*; use ethkey::{recover, public_to_address}; -#[derive(Debug, PartialEq, Eq, Clone)] +/// Message transmitted between consensus participants. +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct ConsensusMessage { + pub vote_step: VoteStep, + pub block_hash: Option, pub signature: H520, +} + +/// Complete step of the consensus process. +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +pub struct VoteStep { pub height: Height, pub round: Round, pub step: Step, - pub block_hash: Option, } +impl VoteStep { + pub fn new(height: Height, round: Round, step: Step) -> Self { + VoteStep { height: height, round: round, step: step } + } + + pub fn is_height(&self, height: Height) -> bool { + self.height == height + } + + pub fn is_round(&self, height: Height, round: Round) -> bool { + self.height == height && self.round == round + } +} fn consensus_round(header: &Header) -> Result { let round_rlp = header.seal().get(0).expect("seal passed basic verification; seal has 3 fields; qed"); @@ -42,53 +62,29 @@ impl ConsensusMessage { pub fn new(signature: H520, height: Height, round: Round, step: Step, block_hash: Option) -> Self { ConsensusMessage { signature: signature, - height: height, - round: round, - step: step, block_hash: block_hash, + vote_step: VoteStep::new(height, round, step), } } pub fn new_proposal(header: &Header) -> Result { Ok(ConsensusMessage { + vote_step: VoteStep::new(header.number() as Height, consensus_round(header)?, Step::Propose), signature: UntrustedRlp::new(header.seal().get(1).expect("seal passed basic verification; seal has 3 fields; qed").as_slice()).as_val()?, - height: header.number() as Height, - round: consensus_round(header)?, - step: Step::Propose, block_hash: Some(header.bare_hash()), }) } pub fn new_commit(proposal: &ConsensusMessage, signature: H520) -> Self { + let mut vote_step = proposal.vote_step.clone(); + vote_step.step = Step::Precommit; ConsensusMessage { - signature: signature, - height: proposal.height, - round: proposal.round, - step: Step::Precommit, + vote_step: vote_step, block_hash: proposal.block_hash, + signature: signature, } } - pub fn is_height(&self, height: Height) -> bool { - self.height == height - } - - pub fn is_round(&self, height: Height, round: Round) -> bool { - self.height == height && self.round == round - } - - pub fn is_step(&self, height: Height, round: Round, step: Step) -> bool { - self.height == height && self.round == round && self.step == step - } - - pub fn is_block_hash(&self, h: Height, r: Round, s: Step, block_hash: Option) -> bool { - self.height == h && self.round == r && self.step == s && self.block_hash == block_hash - } - - pub fn is_aligned(&self, m: &ConsensusMessage) -> bool { - self.is_block_hash(m.height, m.round, m.step, m.block_hash) - } - pub fn verify(&self) -> Result { let full_rlp = ::rlp::encode(self); let block_info = Rlp::new(&full_rlp).at(1); @@ -97,16 +93,30 @@ impl ConsensusMessage { } pub fn precommit_hash(&self) -> H256 { - message_info_rlp(self.height, self.round, Step::Precommit, self.block_hash).sha3() + let mut vote_step = self.vote_step.clone(); + vote_step.step = Step::Precommit; + message_info_rlp(&vote_step, self.block_hash).sha3() } } -impl PartialOrd for ConsensusMessage { - fn partial_cmp(&self, m: &ConsensusMessage) -> Option { +impl PartialOrd for VoteStep { + fn partial_cmp(&self, m: &VoteStep) -> Option { Some(self.cmp(m)) } } +impl Ord for VoteStep { + fn cmp(&self, m: &VoteStep) -> Ordering { + if self.height != m.height { + self.height.cmp(&m.height) + } else if self.round != m.round { + self.round.cmp(&m.round) + } else { + self.step.number().cmp(&m.step.number()) + } + } +} + impl Step { fn number(&self) -> u8 { match *self { @@ -118,20 +128,6 @@ impl Step { } } -impl Ord for ConsensusMessage { - fn cmp(&self, m: &ConsensusMessage) -> Ordering { - if self.height != m.height { - self.height.cmp(&m.height) - } else if self.round != m.round { - self.round.cmp(&m.round) - } else if self.step != m.step { - self.step.number().cmp(&m.step.number()) - } else { - self.signature.cmp(&m.signature) - } - } -} - impl Decodable for Step { fn decode(decoder: &D) -> Result where D: Decoder { match decoder.as_rlp().as_val()? { @@ -149,42 +145,39 @@ impl Encodable for Step { } } -/// (signature, height, round, step, block_hash) +/// (signature, (height, round, step, block_hash)) impl Decodable for ConsensusMessage { fn decode(decoder: &D) -> Result where D: Decoder { let rlp = decoder.as_rlp(); let m = rlp.at(1)?; let block_message: H256 = m.val_at(3)?; Ok(ConsensusMessage { - signature: rlp.val_at(0)?, - height: m.val_at(0)?, - round: m.val_at(1)?, - step: m.val_at(2)?, + vote_step: VoteStep::new(m.val_at(0)?, m.val_at(1)?, m.val_at(2)?), block_hash: match block_message.is_zero() { true => None, false => Some(block_message), - } + }, + signature: rlp.val_at(0)?, }) } } impl Encodable for ConsensusMessage { fn rlp_append(&self, s: &mut RlpStream) { - let info = message_info_rlp(self.height, self.round, self.step, self.block_hash); + let info = message_info_rlp(&self.vote_step, self.block_hash); s.begin_list(2) .append(&self.signature) .append_raw(&info, 1); } } -pub fn message_info_rlp(height: Height, round: Round, step: Step, block_hash: Option) -> Bytes { +pub fn message_info_rlp(vote_step: &VoteStep, block_hash: Option) -> Bytes { // TODO: figure out whats wrong with nested list encoding let mut s = RlpStream::new_list(5); - s.append(&height).append(&round).append(&step).append(&block_hash.unwrap_or_else(H256::zero)); + s.append(&vote_step.height).append(&vote_step.round).append(&vote_step.step).append(&block_hash.unwrap_or_else(H256::zero)); s.out() } - pub fn message_full_rlp(signature: &H520, vote_info: &Bytes) -> Bytes { let mut s = RlpStream::new_list(2); s.append(signature).append_raw(vote_info, 1); @@ -204,10 +197,12 @@ mod tests { #[test] fn encode_decode() { let message = ConsensusMessage { - signature: H520::default(), - height: 10, - round: 123, - step: Step::Precommit, + signature: H520::default(), + vote_step: VoteStep { + height: 10, + round: 123, + step: Step::Precommit, + }, block_hash: Some("1".sha3()) }; let raw_rlp = ::rlp::encode(&message).to_vec(); @@ -215,10 +210,12 @@ mod tests { assert_eq!(message, rlp.as_val()); let message = ConsensusMessage { - signature: H520::default(), - height: 1314, - round: 0, - step: Step::Prevote, + signature: H520::default(), + vote_step: VoteStep { + height: 1314, + round: 0, + step: Step::Prevote, + }, block_hash: None }; let raw_rlp = ::rlp::encode(&message); @@ -232,7 +229,7 @@ mod tests { let addr = tap.insert_account(Secret::from_slice(&"0".sha3()).unwrap(), "0").unwrap(); tap.unlock_account_permanently(addr, "0".into()).unwrap(); - let mi = message_info_rlp(123, 2, Step::Precommit, Some(H256::default())); + let mi = message_info_rlp(&VoteStep::new(123, 2, Step::Precommit), Some(H256::default())); let raw_rlp = message_full_rlp(&tap.sign(addr, None, mi.sha3()).unwrap().into(), &mi); @@ -255,9 +252,11 @@ mod tests { message, ConsensusMessage { signature: Default::default(), - height: 0, - round: 0, - step: Step::Propose, + vote_step: VoteStep { + height: 0, + round: 0, + step: Step::Propose, + }, block_hash: Some(header.bare_hash()) } ); @@ -268,12 +267,10 @@ mod tests { let header = Header::default(); let pro = ConsensusMessage { signature: Default::default(), - height: 0, - round: 0, - step: Step::Propose, + vote_step: VoteStep::new(0, 0, Step::Propose), block_hash: Some(header.bare_hash()) }; - let pre = message_info_rlp(0, 0, Step::Precommit, Some(header.bare_hash())); + let pre = message_info_rlp(&VoteStep::new(0, 0, Step::Precommit), Some(header.bare_hash())); assert_eq!(pro.precommit_hash(), pre.sha3()); } diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 84b768e47..9b7fa8cf6 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -53,7 +53,7 @@ use self::transition::TransitionHandler; use self::params::TendermintParams; use self::vote_collector::VoteCollector; -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub enum Step { Propose, Prevote, @@ -163,13 +163,13 @@ impl Tendermint { let h = self.height.load(AtomicOrdering::SeqCst); let r = self.round.load(AtomicOrdering::SeqCst); let s = self.step.read(); - let vote_info = message_info_rlp(h, r, *s, block_hash); + let vote_info = message_info_rlp(&VoteStep::new(h, r, *s), block_hash); let authority = self.authority.read(); match ap.sign(*authority, self.password.read().clone(), vote_info.sha3()).map(Into::into) { Ok(signature) => { let message_rlp = message_full_rlp(&signature, &vote_info); let message = ConsensusMessage::new(signature, h, r, *s, block_hash); - self.votes.vote(message.clone(), *authority); + self.votes.vote(message.clone(), &*authority); debug!(target: "poa", "Generated {:?} as {}.", message, *authority); self.handle_valid_message(&message); @@ -221,7 +221,7 @@ impl Tendermint { }, Step::Prevote => { let block_hash = match *self.lock_change.read() { - Some(ref m) if !self.should_unlock(m.round) => m.block_hash, + Some(ref m) if !self.should_unlock(m.vote_step.round) => m.block_hash, _ => self.proposal.read().clone(), }; self.generate_and_broadcast_message(block_hash); @@ -230,8 +230,8 @@ impl Tendermint { trace!(target: "poa", "to_step: Precommit."); let block_hash = match *self.lock_change.read() { Some(ref m) if self.is_round(m) && m.block_hash.is_some() => { - trace!(target: "poa", "Setting last lock: {}", m.round); - self.last_lock.store(m.round, AtomicOrdering::SeqCst); + trace!(target: "poa", "Setting last lock: {}", m.vote_step.round); + self.last_lock.store(m.vote_step.round, AtomicOrdering::SeqCst); m.block_hash }, _ => None, @@ -246,7 +246,7 @@ impl Tendermint { if let Some(block_hash) = *self.proposal.read() { // Generate seal and remove old votes. if self.is_proposer(&*self.authority.read()).is_ok() { - if let Some(seal) = self.votes.seal_signatures(height, round, block_hash) { + if let Some(seal) = self.votes.seal_signatures(height, round, &block_hash) { trace!(target: "poa", "Collected seal: {:?}", seal); let seal = vec![ ::rlp::encode(&round).to_vec(), @@ -290,11 +290,11 @@ impl Tendermint { } fn is_height(&self, message: &ConsensusMessage) -> bool { - message.is_height(self.height.load(AtomicOrdering::SeqCst)) + message.vote_step.is_height(self.height.load(AtomicOrdering::SeqCst)) } fn is_round(&self, message: &ConsensusMessage) -> bool { - message.is_round(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst)) + message.vote_step.is_round(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst)) } fn increment_round(&self, n: Round) { @@ -309,14 +309,14 @@ impl Tendermint { fn has_enough_any_votes(&self) -> bool { - let step_votes = self.votes.count_step_votes(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), *self.step.read()); + let step_votes = self.votes.count_step_votes(&VoteStep::new(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), *self.step.read())); self.is_above_threshold(step_votes) } - fn has_enough_future_step_votes(&self, message: &ConsensusMessage) -> bool { - if message.round > self.round.load(AtomicOrdering::SeqCst) { - let step_votes = self.votes.count_step_votes(message.height, message.round, message.step); - self.is_above_threshold(step_votes) + fn has_enough_future_step_votes(&self, vote_step: &VoteStep) -> bool { + if vote_step.round > self.round.load(AtomicOrdering::SeqCst) { + let step_votes = self.votes.count_step_votes(vote_step); + self.is_above_threshold(step_votes) } else { false } @@ -328,12 +328,13 @@ impl Tendermint { } fn handle_valid_message(&self, message: &ConsensusMessage) { + let ref vote_step = message.vote_step; let is_newer_than_lock = match *self.lock_change.read() { - Some(ref lock) => message > lock, + Some(ref lock) => vote_step > &lock.vote_step, None => true, }; let lock_change = is_newer_than_lock - && message.step == Step::Prevote + && vote_step.step == Step::Prevote && message.block_hash.is_some() && self.has_enough_aligned_votes(message); if lock_change { @@ -351,15 +352,15 @@ impl Tendermint { Some(Step::Commit) } }, - Step::Precommit if self.has_enough_future_step_votes(message) => { - self.increment_round(message.round - self.round.load(AtomicOrdering::SeqCst)); + Step::Precommit if self.has_enough_future_step_votes(&vote_step) => { + self.increment_round(vote_step.round - self.round.load(AtomicOrdering::SeqCst)); Some(Step::Precommit) }, // Avoid counting twice. Step::Prevote if lock_change => Some(Step::Precommit), Step::Prevote if self.has_enough_aligned_votes(message) => Some(Step::Precommit), - Step::Prevote if self.has_enough_future_step_votes(message) => { - self.increment_round(message.round - self.round.load(AtomicOrdering::SeqCst)); + Step::Prevote if self.has_enough_future_step_votes(&vote_step) => { + self.increment_round(vote_step.round - self.round.load(AtomicOrdering::SeqCst)); Some(Step::Prevote) }, _ => None, @@ -390,8 +391,8 @@ impl Engine for Tendermint { let message = ConsensusMessage::new_proposal(header).expect("Invalid header."); map![ "signature".into() => message.signature.to_string(), - "height".into() => message.height.to_string(), - "round".into() => message.round.to_string(), + "height".into() => message.vote_step.height.to_string(), + "round".into() => message.vote_step.round.to_string(), "block_hash".into() => message.block_hash.as_ref().map(ToString::to_string).unwrap_or("".into()) ] } @@ -431,11 +432,11 @@ impl Engine for Tendermint { let height = header.number() as Height; let round = self.round.load(AtomicOrdering::SeqCst); let bh = Some(header.bare_hash()); - let vote_info = message_info_rlp(height, round, Step::Propose, bh.clone()); + let vote_info = message_info_rlp(&VoteStep::new(height, round, Step::Propose), bh.clone()); if let Ok(signature) = ap.sign(*author, self.password.read().clone(), vote_info.sha3()).map(H520::from) { // Insert Propose vote. debug!(target: "poa", "Submitting proposal {} at height {} round {}.", header.bare_hash(), height, round); - self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), *author); + self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), author); // Remember proposal for later seal submission. *self.proposal.write() = bh; Seal::Proposal(vec![ @@ -461,9 +462,11 @@ impl Engine for Tendermint { if !self.is_authority(&sender) { Err(EngineError::NotAuthorized(sender))?; } - self.broadcast_message(rlp.as_raw().to_vec()); + if self.votes.vote(message.clone(), &sender).is_some() { + Err(EngineError::DoubleVote(sender))? + } trace!(target: "poa", "Handling a valid {:?} from {}.", message, sender); - self.votes.vote(message.clone(), sender); + self.broadcast_message(rlp.as_raw().to_vec()); self.handle_valid_message(&message); } Ok(()) @@ -541,7 +544,7 @@ impl Engine for Tendermint { found: signatures_len }))?; } - self.is_round_proposer(proposal.height, proposal.round, &proposer)?; + self.is_round_proposer(proposal.vote_step.height, proposal.vote_step.round, &proposer)?; } Ok(()) } @@ -607,16 +610,16 @@ impl Engine for Tendermint { let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed"); if signatures_len != 1 { // New Commit received, skip to next height. - trace!(target: "poa", "Received a commit for height {}, round {}.", proposal.height, proposal.round); - self.to_next_height(proposal.height); + trace!(target: "poa", "Received a commit: {:?}.", proposal.vote_step); + self.to_next_height(proposal.vote_step.height); return false; } let proposer = proposal.verify().expect("block went through full verification; this Engine tries verify; qed"); - debug!(target: "poa", "Received a new proposal for height {}, round {} from {}.", proposal.height, proposal.round, proposer); + debug!(target: "poa", "Received a new proposal {:?} from {}.", proposal.vote_step, proposer); if self.is_round(&proposal) { *self.proposal.write() = proposal.block_hash.clone(); } - self.votes.vote(proposal, proposer); + self.votes.vote(proposal, &proposer); true } @@ -704,7 +707,7 @@ mod tests { } fn vote(engine: &Engine, signer: F, height: usize, round: usize, step: Step, block_hash: Option) -> Bytes where F: FnOnce(H256) -> Result { - let mi = message_info_rlp(height, round, step, block_hash); + let mi = message_info_rlp(&VoteStep::new(height, round, step), block_hash); let m = message_full_rlp(&signer(mi.sha3()).unwrap().into(), &mi); engine.handle_message(&m).unwrap(); m @@ -712,7 +715,7 @@ mod tests { fn proposal_seal(tap: &Arc, header: &Header, round: Round) -> Vec { let author = header.author(); - let vote_info = message_info_rlp(header.number() as Height, round, Step::Propose, Some(header.bare_hash())); + let vote_info = message_info_rlp(&VoteStep::new(header.number() as Height, round, Step::Propose), Some(header.bare_hash())); let signature = tap.sign(*author, None, vote_info.sha3()).unwrap(); vec![ ::rlp::encode(&round).to_vec(), @@ -826,7 +829,7 @@ mod tests { header.set_author(proposer); let mut seal = proposal_seal(&tap, &header, 0); - let vote_info = message_info_rlp(0, 0, Step::Precommit, Some(header.bare_hash())); + let vote_info = message_info_rlp(&VoteStep::new(0, 0, Step::Precommit), Some(header.bare_hash())); let signature1 = tap.sign(proposer, None, vote_info.sha3()).unwrap(); seal[2] = ::rlp::encode(&vec![H520::from(signature1.clone())]).to_vec(); diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs index be592bc8f..758dfb5a3 100644 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ b/ethcore/src/engines/tendermint/vote_collector.rs @@ -17,13 +17,50 @@ //! Collects votes on hashes at each height and round. use util::*; -use super::message::ConsensusMessage; -use super::{Height, Round, Step}; +use super::message::*; +use super::{Height, Round, Step, BlockHash}; #[derive(Debug)] pub struct VoteCollector { /// Storing all Proposals, Prevotes and Precommits. - votes: RwLock>, + votes: RwLock>, +} + +#[derive(Debug, Default)] +struct StepCollector { + voted: HashSet
, + pub block_votes: HashMap, HashMap>, + messages: HashSet, +} + +impl StepCollector { + /// Returns Some(&Address) when validator is double voting. + fn insert<'a>(&mut self, message: ConsensusMessage, address: &'a Address) -> Option<&'a Address> { + // Do nothing when message was seen. + if self.messages.insert(message.clone()) { + if self.voted.insert(address.clone()) { + self + .block_votes + .entry(message.block_hash) + .or_insert_with(HashMap::new) + .insert(message.signature, address.clone()); + } else { + // Bad validator sent a different message. + return Some(address); + } + } + None + } + + /// Count all votes for the given block hash at this step. + fn count_block(&self, block_hash: &Option) -> usize { + self.block_votes.get(block_hash).map_or(0, HashMap::len) + } + + /// Count all votes collected for the given step. + fn count(&self) -> usize { + self.block_votes.values().map(HashMap::len).sum() + } } #[derive(Debug)] @@ -42,109 +79,105 @@ impl PartialEq for SealSignatures { impl Eq for SealSignatures {} impl VoteCollector { - pub fn new() -> VoteCollector { + pub fn new() -> Self { let mut collector = BTreeMap::new(); - // Insert dummy message to fulfill invariant: "only messages newer than the oldest are inserted". - collector.insert(ConsensusMessage { - signature: H520::default(), - height: 0, - round: 0, - step: Step::Propose, - block_hash: None - }, - Address::default()); + // Insert dummy entry to fulfill invariant: "only messages newer than the oldest are inserted". + collector.insert(VoteStep::new(0, 0, Step::Propose), Default::default()); VoteCollector { votes: RwLock::new(collector) } } /// Insert vote if it is newer than the oldest one. - pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option
{ - self.votes.write().insert(message, voter) + pub fn vote<'a>(&self, message: ConsensusMessage, voter: &'a Address) -> Option<&'a Address> { + self + .votes + .write() + .entry(message.vote_step.clone()) + .or_insert_with(Default::default) + .insert(message, voter) } + /// Checks if the message should be ignored. pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool { - self.votes.read().get(message).map_or(false, |a| { - trace!(target: "poa", "Known message from {}: {:?}.", a, message); - true - }) || { + self + .votes + .read() + .get(&message.vote_step) + .map_or(false, |c| { + let is_known = c.messages.contains(message); + if is_known { trace!(target: "poa", "Known message: {:?}.", message); } + is_known + }) + || { let guard = self.votes.read(); - let is_old = guard.keys().next().map_or(true, |oldest| message <= oldest); + let is_old = guard.keys().next().map_or(true, |oldest| message.vote_step <= *oldest); if is_old { trace!(target: "poa", "Old message {:?}.", message); } is_old } } /// Throws out messages older than message, leaves message as marker for the oldest. - pub fn throw_out_old(&self, message: &ConsensusMessage) { + pub fn throw_out_old(&self, vote_step: &VoteStep) { let mut guard = self.votes.write(); - let new_collector = guard.split_off(message); + let new_collector = guard.split_off(vote_step); *guard = new_collector; } - pub fn seal_signatures(&self, height: Height, round: Round, block_hash: H256) -> Option { - let bh = Some(block_hash); - let (proposal, votes) = { + /// Collects the signatures used to seal a block. + pub fn seal_signatures(&self, height: Height, round: Round, block_hash: &H256) -> Option { + let ref bh = Some(*block_hash); + let precommit_step = VoteStep::new(height, round, Step::Precommit); + let maybe_seal = { let guard = self.votes.read(); - let mut current_signatures = guard.keys().skip_while(|m| !m.is_block_hash(height, round, Step::Propose, bh)); - let proposal = current_signatures.next().cloned(); - let votes = current_signatures - .skip_while(|m| !m.is_block_hash(height, round, Step::Precommit, bh)) - .filter(|m| m.is_block_hash(height, round, Step::Precommit, bh)) - .cloned() - .collect::>(); - (proposal, votes) + guard + .get(&VoteStep::new(height, round, Step::Propose)) + .and_then(|c| c.block_votes.get(bh)) + .and_then(|proposals| proposals.keys().next()) + .map(|proposal| SealSignatures { + proposal: proposal.clone(), + votes: guard + .get(&precommit_step) + .and_then(|c| c.block_votes.get(bh)) + .map(|precommits| precommits.keys().cloned().collect()) + .unwrap_or_else(Vec::new), + }) + .and_then(|seal| if seal.votes.is_empty() { None } else { Some(seal) }) }; - if votes.is_empty() { - return None; + if maybe_seal.is_some() { + // Remove messages that are no longer relevant. + self.throw_out_old(&precommit_step); } - // Remove messages that are no longer relevant. - votes.last().map(|m| self.throw_out_old(m)); - let mut votes_vec: Vec<_> = votes.into_iter().map(|m| m.signature).collect(); - votes_vec.sort(); - proposal.map(|p| SealSignatures { - proposal: p.signature, - votes: votes_vec, - }) + maybe_seal } + /// Count votes which agree with the given message. pub fn count_aligned_votes(&self, message: &ConsensusMessage) -> usize { - let guard = self.votes.read(); - guard.keys() - .skip_while(|m| !m.is_aligned(message)) - // sorted by signature so might not be continuous - .filter(|m| m.is_aligned(message)) - .count() + self + .votes + .read() + .get(&message.vote_step) + .map_or(0, |m| m.count_block(&message.block_hash)) } - pub fn count_step_votes(&self, height: Height, round: Round, step: Step) -> usize { - let guard = self.votes.read(); - let current = guard.iter().skip_while(|&(m, _)| !m.is_step(height, round, step)); - let mut origins = HashSet::new(); - let mut n = 0; - for (message, origin) in current { - if message.is_step(height, round, step) { - if origins.insert(origin) { - n += 1; - } else { - warn!("count_step_votes: Authority {} has cast multiple step votes, this indicates malicious behaviour.", origin) - } - } - } - n + /// Count all votes collected for a given step. + pub fn count_step_votes(&self, vote_step: &VoteStep) -> usize { + self.votes.read().get(vote_step).map_or(0, StepCollector::count) } + /// Get all messages older than the height. pub fn get_up_to(&self, height: Height) -> Vec { let guard = self.votes.read(); guard - .keys() - .filter(|m| m.step.is_pre()) - .take_while(|m| m.height <= height) - .map(|m| ::rlp::encode(m).to_vec()) - .collect() + .iter() + .filter(|&(s, _)| s.step.is_pre()) + .take_while(|&(s, _)| s.height <= height) + .map(|(_, c)| c.messages.iter().map(|m| ::rlp::encode(m).to_vec()).collect::>()) + .fold(Vec::new(), |mut acc, mut messages| { acc.append(&mut messages); acc }) } + /// Retrieve address from which the message was sent from cache. pub fn get(&self, message: &ConsensusMessage) -> Option
{ let guard = self.votes.read(); - guard.get(message).cloned() + guard.get(&message.vote_step).and_then(|c| c.block_votes.get(&message.block_hash)).and_then(|origins| origins.get(&message.signature).cloned()) } } @@ -152,15 +185,15 @@ impl VoteCollector { mod tests { use util::*; use super::*; - use super::super::{Height, Round, BlockHash, Step}; - use super::super::message::ConsensusMessage; + use super::super::{BlockHash, Step}; + use super::super::message::*; - fn random_vote(collector: &VoteCollector, signature: H520, h: Height, r: Round, step: Step, block_hash: Option) -> Option { - full_vote(collector, signature, h, r, step, block_hash, H160::random()) + fn random_vote(collector: &VoteCollector, signature: H520, vote_step: VoteStep, block_hash: Option) -> bool { + full_vote(collector, signature, vote_step, block_hash, &H160::random()).is_none() } - fn full_vote(collector: &VoteCollector, signature: H520, h: Height, r: Round, step: Step, block_hash: Option, address: Address) -> Option { - collector.vote(ConsensusMessage { signature: signature, height: h, round: r, step: step, block_hash: block_hash }, address) + fn full_vote<'a>(collector: &VoteCollector, signature: H520, vote_step: VoteStep, block_hash: Option, address: &'a Address) -> Option<&'a Address> { + collector.vote(ConsensusMessage { signature: signature, vote_step: vote_step, block_hash: block_hash }, address) } #[test] @@ -173,68 +206,71 @@ mod tests { for _ in 0..5 { signatures.push(H520::random()); } + let propose_step = VoteStep::new(h, r, Step::Propose); + let prevote_step = VoteStep::new(h, r, Step::Prevote); + let precommit_step = VoteStep::new(h, r, Step::Precommit); // Wrong height proposal. - random_vote(&collector, signatures[4].clone(), h - 1, r, Step::Propose, bh.clone()); + random_vote(&collector, signatures[4].clone(), VoteStep::new(h - 1, r, Step::Propose), bh.clone()); // Good proposal - random_vote(&collector, signatures[0].clone(), h, r, Step::Propose, bh.clone()); + random_vote(&collector, signatures[0].clone(), propose_step.clone(), bh.clone()); // Wrong block proposal. - random_vote(&collector, signatures[0].clone(), h, r, Step::Propose, Some("0".sha3())); + random_vote(&collector, signatures[0].clone(), propose_step.clone(), Some("0".sha3())); // Wrong block precommit. - random_vote(&collector, signatures[3].clone(), h, r, Step::Precommit, Some("0".sha3())); + random_vote(&collector, signatures[3].clone(), precommit_step.clone(), Some("0".sha3())); // Wrong round proposal. - random_vote(&collector, signatures[0].clone(), h, r - 1, Step::Propose, bh.clone()); + random_vote(&collector, signatures[0].clone(), VoteStep::new(h, r - 1, Step::Propose), bh.clone()); // Prevote. - random_vote(&collector, signatures[0].clone(), h, r, Step::Prevote, bh.clone()); + random_vote(&collector, signatures[0].clone(), prevote_step.clone(), bh.clone()); // Relevant precommit. - random_vote(&collector, signatures[2].clone(), h, r, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[2].clone(), precommit_step.clone(), bh.clone()); // Replcated vote. - random_vote(&collector, signatures[2].clone(), h, r, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[2].clone(), precommit_step.clone(), bh.clone()); // Wrong round precommit. - random_vote(&collector, signatures[4].clone(), h, r + 1, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[4].clone(), VoteStep::new(h, r + 1, Step::Precommit), bh.clone()); // Wrong height precommit. - random_vote(&collector, signatures[3].clone(), h + 1, r, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[3].clone(), VoteStep::new(h + 1, r, Step::Precommit), bh.clone()); // Relevant precommit. - random_vote(&collector, signatures[1].clone(), h, r, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[1].clone(), precommit_step.clone(), bh.clone()); // Wrong round precommit, same signature. - random_vote(&collector, signatures[1].clone(), h, r + 1, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[1].clone(), VoteStep::new(h, r + 1, Step::Precommit), bh.clone()); // Wrong round precommit. - random_vote(&collector, signatures[4].clone(), h, r - 1, Step::Precommit, bh.clone()); + random_vote(&collector, signatures[4].clone(), VoteStep::new(h, r - 1, Step::Precommit), bh.clone()); let seal = SealSignatures { proposal: signatures[0], votes: signatures[1..3].to_vec() }; - assert_eq!(seal, collector.seal_signatures(h, r, bh.unwrap()).unwrap()); + assert_eq!(seal, collector.seal_signatures(h, r, &bh.unwrap()).unwrap()); } #[test] fn count_votes() { let collector = VoteCollector::new(); + let prevote_step = VoteStep::new(3, 2, Step::Prevote); + let precommit_step = VoteStep::new(3, 2, Step::Precommit); // good prevote - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3())); - random_vote(&collector, H520::random(), 3, 1, Step::Prevote, Some("0".sha3())); + random_vote(&collector, H520::random(), prevote_step.clone(), Some("0".sha3())); + random_vote(&collector, H520::random(), VoteStep::new(3, 1, Step::Prevote), Some("0".sha3())); // good precommit - random_vote(&collector, H520::random(), 3, 2, Step::Precommit, Some("0".sha3())); - random_vote(&collector, H520::random(), 3, 3, Step::Precommit, Some("0".sha3())); + random_vote(&collector, H520::random(), precommit_step.clone(), Some("0".sha3())); + random_vote(&collector, H520::random(), VoteStep::new(3, 3, Step::Precommit), Some("0".sha3())); // good prevote - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("1".sha3())); + random_vote(&collector, H520::random(), prevote_step.clone(), Some("1".sha3())); // good prevote let same_sig = H520::random(); - random_vote(&collector, same_sig.clone(), 3, 2, Step::Prevote, Some("1".sha3())); - random_vote(&collector, same_sig, 3, 2, Step::Prevote, Some("1".sha3())); + random_vote(&collector, same_sig.clone(), prevote_step.clone(), Some("1".sha3())); + random_vote(&collector, same_sig, prevote_step.clone(), Some("1".sha3())); // good precommit - random_vote(&collector, H520::random(), 3, 2, Step::Precommit, Some("1".sha3())); + random_vote(&collector, H520::random(), precommit_step.clone(), Some("1".sha3())); // good prevote - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3())); - random_vote(&collector, H520::random(), 2, 2, Step::Precommit, Some("2".sha3())); + random_vote(&collector, H520::random(), prevote_step.clone(), Some("0".sha3())); + random_vote(&collector, H520::random(), VoteStep::new(2, 2, Step::Precommit), Some("2".sha3())); - assert_eq!(collector.count_step_votes(3, 2, Step::Prevote), 4); - assert_eq!(collector.count_step_votes(3, 2, Step::Precommit), 2); + assert_eq!(collector.count_step_votes(&prevote_step), 4); + assert_eq!(collector.count_step_votes(&precommit_step), 2); let message = ConsensusMessage { signature: H520::default(), - height: 3, - round: 2, - step: Step::Prevote, + vote_step: prevote_step, block_hash: Some("1".sha3()) }; assert_eq!(collector.count_aligned_votes(&message), 2); @@ -243,30 +279,29 @@ mod tests { #[test] fn remove_old() { let collector = VoteCollector::new(); - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3())); - random_vote(&collector, H520::random(), 3, 1, Step::Prevote, Some("0".sha3())); - random_vote(&collector, H520::random(), 3, 3, Step::Precommit, Some("0".sha3())); - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("1".sha3())); - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("1".sha3())); - random_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3())); - random_vote(&collector, H520::random(), 2, 2, Step::Precommit, Some("2".sha3())); - - let message = ConsensusMessage { - signature: H520::default(), - height: 3, - round: 2, - step: Step::Precommit, - block_hash: Some("1".sha3()) + let vote = |height, round, step, hash| { + random_vote(&collector, H520::random(), VoteStep::new(height, round, step), hash); }; - collector.throw_out_old(&message); + vote(3, 2, Step::Prevote, Some("0".sha3())); + vote(3, 1, Step::Prevote, Some("0".sha3())); + vote(3, 3, Step::Precommit, Some("0".sha3())); + vote(3, 2, Step::Prevote, Some("1".sha3())); + vote(3, 2, Step::Prevote, Some("1".sha3())); + vote(3, 2, Step::Prevote, Some("0".sha3())); + vote(2, 2, Step::Precommit, Some("2".sha3())); + + collector.throw_out_old(&VoteStep::new(3, 2, Step::Precommit)); assert_eq!(collector.votes.read().len(), 1); } #[test] fn malicious_authority() { let collector = VoteCollector::new(); - full_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3()), Address::default()); - full_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("1".sha3()), Address::default()); - assert_eq!(collector.count_step_votes(3, 2, Step::Prevote), 1); + let vote_step = VoteStep::new(3, 2, Step::Prevote); + // Vote is inserted fine. + assert!(full_vote(&collector, H520::random(), vote_step.clone(), Some("0".sha3()), &Address::default()).is_none()); + // Returns the double voting address. + full_vote(&collector, H520::random(), vote_step.clone(), Some("1".sha3()), &Address::default()).unwrap(); + assert_eq!(collector.count_step_votes(&vote_step), 1); } }