diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs index 5e9cbb1a3..f957c7785 100644 --- a/ethcore/src/engines/tendermint/message.rs +++ b/ethcore/src/engines/tendermint/message.rs @@ -114,7 +114,7 @@ impl Decodable for ConsensusMessage { false => Some(block_message), } }) - } + } } impl Encodable for ConsensusMessage { diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 28b67fa8c..494e83873 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -34,7 +34,7 @@ use ethkey::{recover, public_to_address}; use account_provider::AccountProvider; use block::*; use spec::CommonParams; -use engines::{Engine, EngineError, ProposeCollect}; +use engines::{Engine, EngineError}; use blockchain::extras::BlockDetails; use views::HeaderView; use evm::Schedule; @@ -82,9 +82,9 @@ pub struct Tendermint { proposed_block: Mutex>, /// Channel for updating the sealing. message_channel: Mutex>>, - /// Last round when PoLC was seen. - last_lock_round: RwLock, - /// Proposed block. + /// Message for the last PoLC. + last_lock: RwLock>, + /// Bare hash of the proposed block, used for seal submission. proposal: RwLock> } @@ -105,7 +105,7 @@ impl Tendermint { votes: VoteCollector::new(), proposed_block: Mutex::new(None), message_channel: Mutex::new(None), - last_lock_round: AtomicUsize::new(0), + last_lock: RwLock::new(None), proposal: RwLock::new(None) }); let handler = TransitionHandler { engine: Arc::downgrade(&engine) }; @@ -140,18 +140,28 @@ impl Tendermint { } } + fn generate_message(&self, block_hash: Option) -> ConsensusMessage { + Ok(signature) = ap.sign(*author, None, block_hash(header)) + ConsensusMessage { signatue + + } + fn to_step(&self, step: Step) { *self.step.write() = step; match step { Step::Propose => { - self.proposal.write() = None; + *self.proposal.write() = None; self.update_sealing() }, Step::Prevote => { self.broadcast_message() }, Step::Precommit => { - self.broadcast_message() + let message = match self.last_lock.read() { + Some(m) => + None => ConsensusMessage { signature: signature, height + } + self.broadcast_message(::rlp::encode(message)) }, Step::Commit => { // Commit the block using a complete signature set. @@ -162,8 +172,11 @@ impl Tendermint { ::rlp::encode(proposer).to_vec(), ::rlp::encode(&votes).to_vec() ]; - self.submit_seal(self.proposal.read(), seal) + if let Some(block_hash) = *self.proposal.read() { + self.submit_seal(block_hash, seal); + } } + *self.last_lock.write() = None; }, } } @@ -203,7 +216,7 @@ impl Tendermint { } fn has_enough_aligned_votes(&self, message: &ConsensusMessage) -> bool { - self.votes.aligned_signatures(&message).len() > self.threshold() + self.votes.aligned_votes(&message).len() > self.threshold() } } @@ -258,9 +271,7 @@ impl Engine for Tendermint { /// Get the address to be used as authority. fn on_new_block(&self, block: &mut ExecutedBlock) { - if let Some(mut authority) = self.authority.try_write() { - *authority = *block.header().author() - } + *self.authority.write() = *block.header().author() } /// Set the correct round in the seal. @@ -280,7 +291,7 @@ impl Engine for Tendermint { let header = block.header(); let author = header.author(); if let Ok(signature) = ap.sign(*author, None, block_hash(header)) { - self.proposal.write() = Some(block.hash()); + *self.proposal.write() = Some(header.bare_hash()); Some(vec![ ::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(), ::rlp::encode(&H520::from(signature)).to_vec(), @@ -304,32 +315,41 @@ impl Engine for Tendermint { try!(Err(BlockError::InvalidSeal)); } - // Check if the message is known and should be handled right now. - if self.votes.vote(message.clone(), sender).is_none() && self.is_current(&message) { - let next_step = match *self.step.read() { - Step::Precommit if self.has_enough_aligned_votes(&message) => { - if message.block_hash.is_none() { - self.round.fetch_add(1, AtomicOrdering::SeqCst); - Some(Step::Propose) - } else { - Some(Step::Commit) - } - }, - Step::Precommit if self.has_enough_step_votes(&message) => { - self.round.store(message.round, AtomicOrdering::SeqCst); - Some(Step::Precommit) - }, - Step::Prevote if self.has_enough_aligned_votes(&message) => Some(Step::Precommit), - Step::Prevote if self.has_enough_step_votes(&message) => { - self.round.store(message.round, AtomicOrdering::SeqCst); - Some(Step::Prevote) - }, - _ => None, - }; + // Check if the message is known. + if self.votes.vote(message.clone(), sender).is_none() { + let is_newer_than_lock = self.last_lock.read().map_or(true, |lock| message > lock); + if is_newer_than_lock + && message.step == Step::Prevote + && self.has_enough_aligned_votes(&message) { + *self.last_lock.write() = Some(message); + } + // Check if it can affect step transition. + if self.is_current(&message) { + let next_step = match *self.step.read() { + Step::Precommit if self.has_enough_aligned_votes(&message) => { + if message.block_hash.is_none() { + self.round.fetch_add(1, AtomicOrdering::SeqCst); + Some(Step::Propose) + } else { + Some(Step::Commit) + } + }, + Step::Precommit if self.has_enough_step_votes(&message) => { + self.round.store(message.round, AtomicOrdering::SeqCst); + Some(Step::Precommit) + }, + Step::Prevote if self.has_enough_aligned_votes(&message) => Some(Step::Precommit), + Step::Prevote if self.has_enough_step_votes(&message) => { + self.round.store(message.round, AtomicOrdering::SeqCst); + Some(Step::Prevote) + }, + _ => None, + }; - if let Some(step) = next_step { - if let Err(io_err) = self.step_service.send_message(step) { - warn!(target: "poa", "Could not proceed to next step {}.", io_err) + if let Some(step) = next_step { + if let Err(io_err) = self.step_service.send_message(step) { + warn!(target: "poa", "Could not proceed to next step {}.", io_err) + } } } } diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs index 075fda641..e7ea0a5d5 100644 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ b/ethcore/src/engines/tendermint/vote_collector.rs @@ -36,19 +36,29 @@ impl VoteCollector { self.votes.write().insert(message, voter) } - pub fn seal_signatures(&self, height: Height, round: Round, block_hash: Option) -> (H520, Vec) { + pub fn seal_signatures(&self, height: Height, round: Round, block_hash: Option) -> Option<(&H520, &[H520])> { + self.votes + .read() + .keys() + .cloned() + // Get only Propose and Precommits. + .filter(|m| m.is_aligned(height, round, block_hash) && m.step != Step::Prevote) + .map(|m| m.signature) + .collect::>() + .split_first() + } + + pub fn aligned_votes(&self, message: &ConsensusMessage) -> Vec<&ConsensusMessage> { self.votes .read() .keys() // Get only Propose and Precommits. - .filter(|m| m.is_aligned(height, round, block_hash) && m.step != Step::Prevote) - .map(|m| m.signature) + .filter(|m| m.is_aligned(message.height, message.round, message.block_hash) && m.step == message.step) .collect() - .split_first() } - pub fn aligned_signatures(&self, message: &ConsensusMessage) -> Vec { - self.seal_signatures(message.height, message.round, message.block_hash) + pub fn aligned_signatures(&self, message: &ConsensusMessage) -> &[H520] { + self.seal_signatures(message.height, message.round, message.block_hash).map_or(&[], |s| s.1) } pub fn count_step_votes(&self, height: Height, round: Round, step: Step) -> usize {