diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 1168872c1..28aae2b10 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -354,14 +354,15 @@ impl Engine for Tendermint { fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> { let message: ConsensusMessage = try!(rlp.as_val()); // Check if the message is known. - if self.votes.is_known(&message) { + if !self.votes.is_known(&message) { let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3()))); if !self.is_authority(&sender) { try!(Err(EngineError::NotAuthorized(sender))); } - self.votes.vote(message.clone(), sender); trace!(target: "poa", "handle_message: Processing new authorized message: {:?}", &message); + self.votes.vote(message.clone(), sender); + self.broadcast_message(rlp.as_raw().to_vec()); let is_newer_than_lock = match *self.lock_change.read() { Some(ref lock) => &message > lock, @@ -381,8 +382,6 @@ impl Engine for Tendermint { self.increment_round(1); Some(Step::Propose) } else { - // Remove old messages. - self.votes.throw_out_old(&message); Some(Step::Commit) } }, @@ -504,7 +503,6 @@ mod tests { use spec::Spec; use engines::{Engine, EngineError}; use super::*; - use super::params::TendermintParams; use super::message::*; /// Accounts inserted with "1" and "2" are authorities. First proposer is "0". @@ -712,5 +710,6 @@ mod tests { ::std::thread::sleep(::std::time::Duration::from_millis(500)); assert_eq!(test_io.received.read()[5], ClientIoMessage::SubmitSeal(proposal.unwrap(), seal)); + println!("{:?}", *test_io.received.read()); } } diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs index 923b3c9a7..095b0fa37 100644 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ b/ethcore/src/engines/tendermint/vote_collector.rs @@ -58,12 +58,14 @@ impl VoteCollector { /// Insert vote if it is newer than the oldest one. pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option
{ - if { + let is_new = { let guard = self.votes.read(); guard.keys().next().map_or(true, |oldest| &message > oldest) - } { + }; + if is_new { self.votes.write().insert(message, voter) } else { + trace!(target: "poa", "vote: Old message ignored {:?}.", message); None } } @@ -80,17 +82,22 @@ impl VoteCollector { } pub fn seal_signatures(&self, height: Height, round: Round, block_hash: H256) -> Option { - let guard = self.votes.read(); let bh = Some(block_hash); - let mut current_signatures = guard.keys() - .skip_while(|m| !m.is_block_hash(height, round, Step::Propose, bh)); - current_signatures.next().map(|proposal| SealSignatures { - proposal: proposal.signature, - votes: current_signatures - .skip_while(|m| !m.is_block_hash(height, round, Step::Precommit, bh)) - .filter(|m| m.is_block_hash(height, round, Step::Precommit, bh)) - .map(|m| m.signature.clone()) - .collect() + let (proposal, votes) = { + let guard = self.votes.read(); + let mut current_signatures = guard.keys().skip_while(|m| !m.is_block_hash(height, round, Step::Propose, bh)); + let proposal = current_signatures.next().cloned(); + let votes = current_signatures + .skip_while(|m| !m.is_block_hash(height, round, Step::Precommit, bh)) + .filter(|m| m.is_block_hash(height, round, Step::Precommit, bh)) + .cloned() + .collect::>(); + (proposal, votes) + }; + votes.last().map(|m| self.throw_out_old(m)); + proposal.map(|p| SealSignatures { + proposal: p.signature, + votes: votes.into_iter().map(|m| m.signature).collect() }) }