From 9563ccfbd299ce6eded385c31f94e96bf96cdfce Mon Sep 17 00:00:00 2001 From: keorn Date: Thu, 17 Nov 2016 12:18:20 +0000 Subject: [PATCH] message broadcasting methods --- ethcore/src/engines/mod.rs | 2 +- ethcore/src/engines/tendermint/mod.rs | 83 ++++++++++--------- .../src/engines/tendermint/vote_collector.rs | 7 +- 3 files changed, 48 insertions(+), 44 deletions(-) diff --git a/ethcore/src/engines/mod.rs b/ethcore/src/engines/mod.rs index 64e5cb16d..283014a50 100644 --- a/ethcore/src/engines/mod.rs +++ b/ethcore/src/engines/mod.rs @@ -159,7 +159,7 @@ pub trait Engine : Sync + Send { /// Handle any potential consensus messages; /// updating consensus state and potentially issuing a new one. - fn handle_message(&self, _message: UntrustedRlp) -> Result { Err(EngineError::UnexpectedMessage.into()) } + fn handle_message(&self, _message: UntrustedRlp) -> Result<(), Error> { Err(EngineError::UnexpectedMessage.into()) } // TODO: builtin contract routing - to do this properly, it will require removing the built-in configuration-reading logic // from Spec into here and removing the Spec::builtins field. diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index a42615693..78e08db1b 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -125,11 +125,37 @@ impl Tendermint { } } + fn broadcast_message(&self, message: Bytes) { + if let Some(ref channel) = *self.message_channel.lock() { + match channel.send(ClientIoMessage::BroadcastMessage(message)) { + Ok(_) => trace!(target: "poa", "timeout: BroadcastMessage message sent."), + Err(err) => warn!(target: "poa", "timeout: Could not send a sealing message {}.", err), + } + } + } + fn to_step(&self, step: Step) { *self.step.write() = step; match step { Step::Propose => self.update_sealing(), - _ => {}, + Step::Prevote => { + self.broadcast_message() + }, + Step::Precommit => { + self.broadcast_message() + }, + Step::Commit => { + // Commit the block using a complete signature set. + let round = self.round.load(AtomicOrdering::SeqCst); + if let Some((proposer, votes)) = self.votes.seal_signatures(header.number() as Height, round, Some(block_hash(header))) { + let seal = vec![ + ::rlp::encode(&round).to_vec(), + ::rlp::encode(proposer).to_vec(), + ::rlp::encode(&votes).to_vec() + ]; + self.submit_seal(seal) + } + }, } } @@ -240,47 +266,27 @@ impl Engine for Tendermint { } /// Attempt to seal the block internally using all available signatures. - /// - /// None is returned if not enough signatures can be collected. fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option> { - if let (Some(ap), Some(step)) = (accounts, self.step.try_read()) { + if let Some(ap) = accounts { let header = block.header(); let author = header.author(); - match *step { - Step::Commit => { - // Commit the block using a complete signature set. - let round = self.round.load(AtomicOrdering::SeqCst); - if let Some((proposer, votes)) = self.votes.seal_signatures(header.number() as Height, round, Some(block_hash(header))).split_first() { - if votes.len() + 1 > self.threshold() { - return Some(vec![ - ::rlp::encode(&round).to_vec(), - ::rlp::encode(proposer).to_vec(), - ::rlp::encode(&votes).to_vec() - ]); - } - } - }, - Step::Propose if self.is_proposer(author) => - // Seal block with propose signature. - - if let Ok(signature) = ap.sign(*author, None, block_hash(header)) { - return Some(vec![ - ::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(), - ::rlp::encode(&H520::from(signature)).to_vec(), - Vec::new() - ]) - } else { - trace!(target: "poa", "generate_seal: FAIL: accounts secret key unavailable"); - }, - _ => {}, + if let Ok(signature) = ap.sign(*author, None, block_hash(header)) { + Some(vec![ + ::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(), + ::rlp::encode(&H520::from(signature)).to_vec(), + Vec::new() + ]) + } else { + warn!(target: "poa", "generate_seal: FAIL: accounts secret key unavailable"); + None } } else { - trace!(target: "poa", "generate_seal: FAIL: accounts not provided"); + warn!(target: "poa", "generate_seal: FAIL: accounts not provided"); + None } - None } - fn handle_message(&self, rlp: UntrustedRlp) -> Result { + fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> { let message: ConsensusMessage = try!(rlp.as_val()); let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3()))); // TODO: Do not admit old messages. @@ -288,10 +294,8 @@ impl Engine for Tendermint { try!(Err(BlockError::InvalidSeal)); } - self.votes.vote(message.clone(), sender); - - // Check if the message should be handled right now. - if self.is_current(&message) { + // Check if the message is known and should be handled right now. + if self.votes.vote(message.clone(), sender).is_none() && self.is_current(&message) { let next_step = match *self.step.read() { Step::Precommit if self.has_enough_aligned_votes(&message) => { if message.block_hash.is_none() { @@ -319,8 +323,7 @@ impl Engine for Tendermint { } } } - - Err(BlockError::InvalidSeal.into()) + Ok(()) } fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> Result<(), Error> { diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs index 29906f999..075fda641 100644 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ b/ethcore/src/engines/tendermint/vote_collector.rs @@ -32,11 +32,11 @@ impl VoteCollector { VoteCollector { votes: RwLock::new(BTreeMap::new()) } } - pub fn vote(&self, message: ConsensusMessage, voter: Address) { - self.votes.write().insert(message, voter); + pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option
{ + self.votes.write().insert(message, voter) } - pub fn seal_signatures(&self, height: Height, round: Round, block_hash: Option) -> Vec { + pub fn seal_signatures(&self, height: Height, round: Round, block_hash: Option) -> (H520, Vec) { self.votes .read() .keys() @@ -44,6 +44,7 @@ impl VoteCollector { .filter(|m| m.is_aligned(height, round, block_hash) && m.step != Step::Prevote) .map(|m| m.signature) .collect() + .split_first() } pub fn aligned_signatures(&self, message: &ConsensusMessage) -> Vec {