From e475d0bf4ca97e72deed10781fb2352b0f10ae18 Mon Sep 17 00:00:00 2001 From: keorn Date: Wed, 31 Aug 2016 18:18:02 +0200 Subject: [PATCH] initial timeouts --- ethcore/res/tendermint.json | 1 - ethcore/src/engines/propose_collect.rs | 11 +- ethcore/src/engines/tendermint.rs | 162 ++++++++++++++++++++----- ethcore/src/service.rs | 4 + json/src/spec/tendermint.rs | 4 - 5 files changed, 146 insertions(+), 36 deletions(-) diff --git a/ethcore/res/tendermint.json b/ethcore/res/tendermint.json index 126761233..8aa8f24f7 100644 --- a/ethcore/res/tendermint.json +++ b/ethcore/res/tendermint.json @@ -4,7 +4,6 @@ "Tendermint": { "params": { "gasLimitBoundDivisor": "0x0400", - "durationLimit": "0x0d", "validators" : [ "0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e", "0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1" diff --git a/ethcore/src/engines/propose_collect.rs b/ethcore/src/engines/propose_collect.rs index ee4aa3810..46defd557 100644 --- a/ethcore/src/engines/propose_collect.rs +++ b/ethcore/src/engines/propose_collect.rs @@ -49,10 +49,13 @@ impl ProposeCollect { /// Vote on hash using the signed hash, true if vote counted. pub fn vote(&self, voter: Address) -> bool { - if self.votes.try_read().unwrap().contains(&voter) { return false; } - if !self.voters.contains(&voter) { return false; } - self.votes.try_write().unwrap().insert(voter); - true + match self.votes.try_read().unwrap().contains(&voter) || !self.voters.contains(&voter) { + true => false, + false => { + self.votes.try_write().unwrap().insert(voter); + true + }, + } } /// Some winner if voting threshold was reached. diff --git a/ethcore/src/engines/tendermint.rs b/ethcore/src/engines/tendermint.rs index 116a0ce16..8ccbbf95a 100644 --- a/ethcore/src/engines/tendermint.rs +++ b/ethcore/src/engines/tendermint.rs @@ -17,7 +17,6 @@ //! Tendermint BFT consensus engine with round robin proof-of-authority. use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; -use std::time::Duration; use common::*; use account_provider::AccountProvider; use block::*; @@ -25,28 +24,54 @@ use spec::CommonParams; use engines::{Engine, EngineError, ProposeCollect}; use evm::Schedule; use ethjson; +use io::{IoContext, IoHandler, TimerToken}; +use service::{ClientIoMessage, ENGINE_TIMEOUT_TOKEN}; +use time::get_time; /// `Tendermint` params. #[derive(Debug)] pub struct TendermintParams { /// Gas limit divisor. pub gas_limit_bound_divisor: U256, - /// Block duration. - pub duration_limit: u64, /// List of validators. pub validators: Vec
, /// Number of validators. pub validator_n: usize, /// Timeout durations for different steps. - timeouts: Timeouts, + timeouts: DefaultTimeouts, /// Consensus round. r: u64, /// Consensus step. s: RwLock, + /// Current step timeout in ms. + timeout: AtomicTimerToken, /// Used to swith proposer. proposer_nonce: AtomicUsize, } +impl Default for TendermintParams { + fn default() -> Self { + let validators = vec!["0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e".into(), "0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1".into()]; + let val_n = validators.len(); + let propose_timeout = 3000; + TendermintParams { + gas_limit_bound_divisor: 0x0400.into(), + validators: validators, + validator_n: val_n, + timeouts: DefaultTimeouts { + propose: propose_timeout, + prevote: 3000, + precommit: 3000, + commit: 3000 + }, + r: 0, + s: RwLock::new(Step::Propose), + timeout: AtomicUsize::new(propose_timeout), + proposer_nonce: AtomicUsize::new(0) + } + } +} + #[derive(Debug)] enum Step { Propose, @@ -58,27 +83,62 @@ enum Step { } #[derive(Debug)] -struct Timeouts { - propose: Duration, - prevote: Duration, - precommit: Duration, - commit: Duration +struct DefaultTimeouts { + propose: TimerToken, + prevote: TimerToken, + precommit: TimerToken, + commit: TimerToken } type Seal = Vec; +type AtomicTimerToken = AtomicUsize; + +impl IoHandler for Tendermint { + fn initialize(&self, io: &IoContext) { + io.register_timer(ENGINE_TIMEOUT_TOKEN, self.our_params.timeout.load(AtomicOrdering::Relaxed) as u64).expect("Error registering engine timeout"); + } + + fn timeout(&self, io: &IoContext, timer: TimerToken) { + if timer == ENGINE_TIMEOUT_TOKEN { + println!("Timeout: {:?}", get_time().sec); + io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer."); + match *self.our_params.s.try_read().unwrap() { + Step::Propose => self.to_propose(), + Step::Prevote(ref proposal) => self.to_precommit(proposal.hash.clone()), + Step::Precommit(_, _) => self.to_propose(), + Step::Commit(_) => self.to_propose(), + }; + io.register_timer(ENGINE_TIMEOUT_TOKEN, 3000).expect("Failed to start new consensus timer.") + } + } + + fn message(&self, io: &IoContext, net_message: &ClientIoMessage) { + if let &ClientIoMessage::ConsensusStep(next_timeout) = net_message { + println!("Message: {:?}", get_time().sec); + io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer."); + io.register_timer(ENGINE_TIMEOUT_TOKEN, next_timeout).expect("Failed to start new consensus timer.") + } + } +} impl From for TendermintParams { fn from(p: ethjson::spec::TendermintParams) -> Self { let val: Vec<_> = p.validators.into_iter().map(Into::into).collect(); let val_n = val.len(); + let propose_timeout = 3; TendermintParams { gas_limit_bound_divisor: p.gas_limit_bound_divisor.into(), - duration_limit: p.duration_limit.into(), validators: val, validator_n: val_n, - timeouts: Timeouts { propose: Duration::from_secs(3), prevote: Duration::from_secs(3), precommit: Duration::from_secs(3), commit: Duration::from_secs(3) }, + timeouts: DefaultTimeouts { + propose: propose_timeout, + prevote: 3, + precommit: 3, + commit: 3 + }, r: 0, s: RwLock::new(Step::Propose), + timeout: AtomicUsize::new(propose_timeout), proposer_nonce: AtomicUsize::new(0) } } @@ -120,6 +180,12 @@ impl Tendermint { self.threshold()) } + fn to_propose(&self) { + self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); + let mut guard = self.our_params.s.write(); + *guard = Step::Propose; + } + fn propose_message(&self, message: UntrustedRlp) -> Result { // Check if message is for correct step. match *self.our_params.s.try_read().unwrap() { @@ -127,11 +193,14 @@ impl Tendermint { _ => try!(Err(EngineError::WrongStep)), } let proposal = try!(message.as_val()); - self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); + self.to_prevote(proposal); + Ok(message.as_raw().to_vec()) + } + + fn to_prevote(&self, proposal: H256) { let mut guard = self.our_params.s.write(); // Proceed to the prevote step. *guard = Step::Prevote(self.new_vote(proposal)); - Ok(message.as_raw().to_vec()) } fn prevote_message(&self, sender: Address, message: UntrustedRlp) -> Result { @@ -142,13 +211,8 @@ impl Tendermint { if vote.hash == try!(message.as_val()) { vote.vote(sender); // Move to next step is prevote is won. - if vote.is_won() { - let mut guard = self.our_params.s.write(); - *guard = Step::Precommit(self.new_vote(vote.hash), Vec::new()); - Ok(message.as_raw().to_vec()) - } else { - Ok(message.as_raw().to_vec()) - } + if vote.is_won() { self.to_precommit(vote.hash); } + Ok(message.as_raw().to_vec()) } else { try!(Err(EngineError::WrongVote)) } @@ -157,6 +221,11 @@ impl Tendermint { } } + fn to_precommit(&self, proposal: H256) { + let mut guard = self.our_params.s.write(); + *guard = Step::Precommit(self.new_vote(proposal), Vec::new()); + } + fn precommit_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result { // Check if message is for correct step. match *self.our_params.s.try_write().unwrap() { @@ -165,13 +234,8 @@ impl Tendermint { if vote.hash == try!(message.as_val()) { if vote.vote(sender) { seal.push(encode(&signature).to_vec()); } // Commit if precommit is won. - if vote.is_won() { - let mut guard = self.our_params.s.write(); - *guard = Step::Commit(seal.clone()); - Ok(message.as_raw().to_vec()) - } else { - Ok(message.as_raw().to_vec()) - } + if vote.is_won() { self.to_commit(seal.clone()); } + Ok(message.as_raw().to_vec()) } else { try!(Err(EngineError::WrongVote)) } @@ -180,6 +244,11 @@ impl Tendermint { } } + fn to_commit(&self, seal: Seal) { + let mut guard = self.our_params.s.write(); + *guard = Step::Commit(seal); + } + fn threshold(&self) -> usize { self.our_params.validator_n*2/3 } @@ -294,16 +363,40 @@ impl Engine for Tendermint { #[cfg(test)] mod tests { use common::*; + use std::thread::sleep; + use std::time::{Duration, Instant}; use block::*; use tests::helpers::*; + use service::{ClientService, ClientIoMessage}; + use devtools::RandomTempPath; + use client::ClientConfig; + use miner::Miner; use account_provider::AccountProvider; use spec::Spec; use engines::Engine; + use super::{Tendermint, TendermintParams}; /// Create a new test chain spec with `Tendermint` consensus engine. /// Account "0".sha3() and "1".sha3() are a validators. fn new_test_tendermint() -> Spec { Spec::load(include_bytes!("../../res/tendermint.json")) } + fn new_test_client_service() -> ClientService { + let temp_path = RandomTempPath::new(); + let mut path = temp_path.as_path().to_owned(); + path.push("pruning"); + path.push("db"); + + let spec = get_test_spec(); + let service = ClientService::start( + ClientConfig::default(), + &spec, + &path, + &path, + Arc::new(Miner::with_spec(&spec)), + ); + service.unwrap() + } + fn propose_default(engine: &Arc, proposer: Address) -> Result { let mut s = RlpStream::new_list(3); let header = Header::default(); @@ -399,4 +492,19 @@ mod tests { fn handle_message() { false; } + + #[test] + fn timeout_switching() { + let service = new_test_client_service(); + let engine = new_test_tendermint().engine; + let tender = Tendermint::new(engine.params().clone(), TendermintParams::default(), BTreeMap::new()); + service.register_io_handler(Arc::new(tender)); + + println!("Waiting for timeout"); + sleep(Duration::from_secs(10)); + + let message_channel = service.io().channel(); + message_channel.send(ClientIoMessage::ConsensusStep(1000)); + sleep(Duration::from_secs(5)); + } } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 355c7d580..7a3da2691 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -43,6 +43,8 @@ pub enum ClientIoMessage { FeedStateChunk(H256, Bytes), /// Feed a block chunk to the snapshot service FeedBlockChunk(H256, Bytes), + /// Signal consensus step timeout. + ConsensusStep(u64), } /// Client service setup. Creates and registers client and network services with the IO subsystem. @@ -143,6 +145,8 @@ struct ClientIoHandler { const CLIENT_TICK_TIMER: TimerToken = 0; const CLIENT_TICK_MS: u64 = 5000; +/// Timer token representing the consensus step timeouts. +pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 1; impl IoHandler for ClientIoHandler { fn initialize(&self, io: &IoContext) { diff --git a/json/src/spec/tendermint.rs b/json/src/spec/tendermint.rs index c3294810c..97c30fbb2 100644 --- a/json/src/spec/tendermint.rs +++ b/json/src/spec/tendermint.rs @@ -25,9 +25,6 @@ pub struct TendermintParams { /// Gas limit divisor. #[serde(rename="gasLimitBoundDivisor")] pub gas_limit_bound_divisor: Uint, - /// Block duration. - #[serde(rename="durationLimit")] - pub duration_limit: Uint, /// Valid authorities pub validators: Vec
, } @@ -49,7 +46,6 @@ mod tests { let s = r#"{ "params": { "gasLimitBoundDivisor": "0x0400", - "durationLimit": "0x0d", "validators" : ["0xc6d9d2cd449a754c494264e1809c50e34d64562b"] } }"#;