diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs index 0a7bcb1df..5e9cbb1a3 100644 --- a/ethcore/src/engines/tendermint/message.rs +++ b/ethcore/src/engines/tendermint/message.rs @@ -20,7 +20,7 @@ use util::*; use super::{Height, Round, BlockHash, Step}; use rlp::{View, DecoderError, Decodable, Decoder, Encodable, RlpStream, Stream}; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct ConsensusMessage { pub signature: H520, pub height: Height, @@ -30,12 +30,16 @@ pub struct ConsensusMessage { } impl ConsensusMessage { + pub fn is_height(&self, height: Height) -> bool { + self.height == height + } + pub fn is_round(&self, height: Height, round: Round) -> bool { self.height == height && self.round == round } - pub fn is_step(&self, height: Height, round: Round, step: &Step) -> bool { - self.height == height && self.round == round && &self.step == step + pub fn is_step(&self, height: Height, round: Round, step: Step) -> bool { + self.height == height && self.round == round && self.step == step } pub fn is_aligned(&self, height: Height, round: Round, block_hash: Option) -> bool { @@ -79,7 +83,7 @@ impl Decodable for Step { match try!(decoder.as_rlp().as_val()) { 0u8 => Ok(Step::Prevote), 1 => Ok(Step::Precommit), - _ => Err(DecoderError::Custom("Unknown step.")), + _ => Err(DecoderError::Custom("Invalid step.")), } } } diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index bb87b8ab2..a42615693 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -41,11 +41,11 @@ use evm::Schedule; use io::{IoService, IoChannel}; use service::ClientIoMessage; use self::message::ConsensusMessage; -use self::timeout::{TransitionHandler, NextStep}; +use self::timeout::TransitionHandler; use self::params::TendermintParams; use self::vote_collector::VoteCollector; -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Step { Propose, Prevote, @@ -65,7 +65,7 @@ pub struct Tendermint { params: CommonParams, our_params: TendermintParams, builtins: BTreeMap, - step_service: IoService, + step_service: IoService, /// Address to be used as authority. authority: RwLock
, /// Blockchain height. @@ -92,7 +92,7 @@ impl Tendermint { params: params, our_params: our_params, builtins: builtins, - step_service: try!(IoService::::start()), + step_service: try!(IoService::::start()), authority: RwLock::new(Address::default()), height: AtomicUsize::new(0), round: AtomicUsize::new(0), @@ -125,6 +125,14 @@ impl Tendermint { } } + fn to_step(&self, step: Step) { + *self.step.write() = step; + match step { + Step::Propose => self.update_sealing(), + _ => {}, + } + } + fn nonce_proposer(&self, proposer_nonce: usize) -> &Address { let ref p = self.our_params; p.authorities.get(proposer_nonce % p.authority_n).unwrap() @@ -148,11 +156,19 @@ impl Tendermint { } fn is_current(&self, message: &ConsensusMessage) -> bool { - message.is_step(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), &self.step.read()) + message.is_height(self.height.load(AtomicOrdering::SeqCst)) } fn has_enough_any_votes(&self) -> bool { - self.votes.count_step_votes(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), &self.step.read()) > self.threshold() + self.votes.count_step_votes(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), *self.step.read()) > self.threshold() + } + + fn has_enough_step_votes(&self, message: &ConsensusMessage) -> bool { + self.votes.count_step_votes(message.height, message.round, message.step) > self.threshold() + } + + fn has_enough_aligned_votes(&self, message: &ConsensusMessage) -> bool { + self.votes.aligned_signatures(&message).len() > self.threshold() } } @@ -272,19 +288,38 @@ impl Engine for Tendermint { try!(Err(BlockError::InvalidSeal)); } - // Check if the message affects the current step. + self.votes.vote(message.clone(), sender); + + // Check if the message should be handled right now. if self.is_current(&message) { - match *self.step.read() { - Step::Prevote => { - let votes = self.votes.aligned_signatures(&message); - if votes.len() > self.threshold() { - } - }, - Step::Precommit => {}, - _ => {}, + let next_step = match *self.step.read() { + Step::Precommit if self.has_enough_aligned_votes(&message) => { + if message.block_hash.is_none() { + self.round.fetch_add(1, AtomicOrdering::SeqCst); + Some(Step::Propose) + } else { + Some(Step::Commit) + } + }, + Step::Precommit if self.has_enough_step_votes(&message) => { + self.round.store(message.round, AtomicOrdering::SeqCst); + Some(Step::Precommit) + }, + Step::Prevote if self.has_enough_aligned_votes(&message) => Some(Step::Precommit), + Step::Prevote if self.has_enough_step_votes(&message) => { + self.round.store(message.round, AtomicOrdering::SeqCst); + Some(Step::Prevote) + }, + _ => None, + }; + + if let Some(step) = next_step { + if let Err(io_err) = self.step_service.send_message(step) { + warn!(target: "poa", "Could not proceed to next step {}.", io_err) } + } } - self.votes.vote(message, sender); + Err(BlockError::InvalidSeal.into()) } diff --git a/ethcore/src/engines/tendermint/timeout.rs b/ethcore/src/engines/tendermint/timeout.rs index 789755077..329145984 100644 --- a/ethcore/src/engines/tendermint/timeout.rs +++ b/ethcore/src/engines/tendermint/timeout.rs @@ -57,25 +57,22 @@ impl Default for TendermintTimeouts { } } -#[derive(Clone)] -pub struct NextStep(Step); - /// Timer token representing the consensus step timeouts. pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; -fn set_timeout(io: &IoContext, timeout: Duration) { +fn set_timeout(io: &IoContext, timeout: Duration) { io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout.num_milliseconds() as u64) .unwrap_or_else(|e| warn!(target: "poa", "Failed to set consensus step timeout: {}.", e)) } -impl IoHandler for TransitionHandler { - fn initialize(&self, io: &IoContext) { +impl IoHandler for TransitionHandler { + fn initialize(&self, io: &IoContext) { if let Some(engine) = self.engine.upgrade() { set_timeout(io, engine.our_params.timeouts.propose) } } - fn timeout(&self, io: &IoContext, timer: TimerToken) { + fn timeout(&self, io: &IoContext, timer: TimerToken) { if timer == ENGINE_TIMEOUT_TOKEN { if let Some(engine) = self.engine.upgrade() { let next_step = match *engine.step.read() { @@ -102,32 +99,24 @@ impl IoHandler for TransitionHandler { }; if let Some(step) = next_step { - *engine.step.write() = step.clone(); - if step == Step::Propose { - engine.update_sealing(); - } + engine.to_step(step) } } } } - fn message(&self, io: &IoContext, message: &NextStep) { + fn message(&self, io: &IoContext, next_step: &Step) { if let Some(engine) = self.engine.upgrade() { - match io.clear_timer(ENGINE_TIMEOUT_TOKEN) { - Ok(_) => {}, - Err(io_err) => warn!(target: "poa", "Could not remove consensus timer {}.", io_err), - }; - let NextStep(next_step) = message.clone(); - *engine.step.write() = next_step.clone(); - match next_step { - Step::Propose => { - engine.update_sealing(); - set_timeout(io, engine.our_params.timeouts.propose) - }, + if let Err(io_err) = io.clear_timer(ENGINE_TIMEOUT_TOKEN) { + warn!(target: "poa", "Could not remove consensus timer {}.", io_err) + } + match *next_step { + Step::Propose => set_timeout(io, engine.our_params.timeouts.propose), Step::Prevote => set_timeout(io, engine.our_params.timeouts.prevote), Step::Precommit => set_timeout(io, engine.our_params.timeouts.precommit), Step::Commit => set_timeout(io, engine.our_params.timeouts.commit), }; + engine.to_step(*next_step); } } } diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs index c5db224b0..29906f999 100644 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ b/ethcore/src/engines/tendermint/vote_collector.rs @@ -50,7 +50,7 @@ impl VoteCollector { self.seal_signatures(message.height, message.round, message.block_hash) } - pub fn count_step_votes(&self, height: Height, round: Round, step: &Step) -> usize { + pub fn count_step_votes(&self, height: Height, round: Round, step: Step) -> usize { self.votes .read() .keys()