diff --git a/ethcore/src/engines/tendermint.rs b/ethcore/src/engines/tendermint.rs index 7333cf70d..66b469923 100644 --- a/ethcore/src/engines/tendermint.rs +++ b/ethcore/src/engines/tendermint.rs @@ -17,6 +17,7 @@ //! Tendermint BFT consensus engine with round robin proof-of-authority. use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::Weak; use common::*; use account_provider::AccountProvider; use block::*; @@ -24,8 +25,7 @@ use spec::CommonParams; use engines::{Engine, EngineError, ProposeCollect}; use evm::Schedule; use ethjson; -use io::{IoContext, IoHandler, TimerToken}; -use service::{ClientIoMessage, ENGINE_TIMEOUT_TOKEN}; +use io::{IoContext, IoHandler, TimerToken, IoService}; use time::get_time; /// `Tendermint` params. @@ -39,14 +39,6 @@ pub struct TendermintParams { pub validator_n: usize, /// Timeout durations for different steps. timeouts: DefaultTimeouts, - /// Consensus round. - r: u64, - /// Consensus step. - s: RwLock, - /// Current step timeout in ms. - timeout: AtomicTimerToken, - /// Used to swith proposer. - proposer_nonce: AtomicUsize, } impl Default for TendermintParams { @@ -64,10 +56,6 @@ impl Default for TendermintParams { precommit: 3000, commit: 3000 }, - r: 0, - s: RwLock::new(Step::Propose), - timeout: AtomicUsize::new(propose_timeout), - proposer_nonce: AtomicUsize::new(0) } } } @@ -82,88 +70,70 @@ enum Step { Commit(Seal) } -#[derive(Debug)] -struct DefaultTimeouts { - propose: TimerToken, - prevote: TimerToken, - precommit: TimerToken, - commit: TimerToken -} - -type Seal = Vec; -type AtomicTimerToken = AtomicUsize; - -impl IoHandler for Tendermint { - fn initialize(&self, io: &IoContext) { - io.register_timer(ENGINE_TIMEOUT_TOKEN, self.our_params.timeout.load(AtomicOrdering::Relaxed) as u64).expect("Error registering engine timeout"); - } - - fn timeout(&self, io: &IoContext, timer: TimerToken) { - if timer == ENGINE_TIMEOUT_TOKEN { - println!("Timeout: {:?}", get_time().sec); - io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer."); - match *self.our_params.s.try_read().unwrap() { - Step::Propose => self.to_propose(), - Step::Prevote(ref proposal) => self.to_precommit(proposal.hash.clone()), - Step::Precommit(_, _) => self.to_propose(), - Step::Commit(_) => self.to_propose(), - }; - //io.register_timer(ENGINE_TIMEOUT_TOKEN, 3000).expect("Failed to start new consensus timer.") - } - } - - fn message(&self, io: &IoContext, net_message: &ClientIoMessage) { - if let &ClientIoMessage::ConsensusStep(next_timeout) = net_message { - println!("Message: {:?}", get_time().sec); - io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer."); - io.register_timer(ENGINE_TIMEOUT_TOKEN, next_timeout).expect("Failed to start new consensus timer.") - } - } -} - impl From for TendermintParams { fn from(p: ethjson::spec::TendermintParams) -> Self { let val: Vec<_> = p.validators.into_iter().map(Into::into).collect(); let val_n = val.len(); - let propose_timeout = 3; + let propose_timeout = 3000; TendermintParams { gas_limit_bound_divisor: p.gas_limit_bound_divisor.into(), validators: val, validator_n: val_n, timeouts: DefaultTimeouts { propose: propose_timeout, - prevote: 3, - precommit: 3, - commit: 3 + prevote: 3000, + precommit: 3000, + commit: 3000 }, - r: 0, - s: RwLock::new(Step::Propose), - timeout: AtomicUsize::new(propose_timeout), - proposer_nonce: AtomicUsize::new(0) } } } +#[derive(Clone)] +struct StepMessage; + /// Engine using `Tendermint` consensus algorithm, suitable for EVM chain. pub struct Tendermint { params: CommonParams, our_params: TendermintParams, builtins: BTreeMap, + timeout_service: IoService, + /// Consensus round. + r: u64, + /// Consensus step. + s: RwLock, + /// Current step timeout in ms. + timeout: AtomicMs, + /// Used to swith proposer. + proposer_nonce: AtomicUsize, +} + +struct TimerHandler { + engine: Weak, } impl Tendermint { /// Create a new instance of Tendermint engine - pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap) -> Self { - Tendermint { - params: params, - our_params: our_params, - builtins: builtins, - } + pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap) -> Arc { + let engine = Arc::new( + Tendermint { + params: params, + timeout: AtomicUsize::new(our_params.timeouts.propose), + our_params: our_params, + builtins: builtins, + timeout_service: IoService::::start().expect("Error creating engine timeout service"), + r: 0, + s: RwLock::new(Step::Propose), + proposer_nonce: AtomicUsize::new(0) + }); + let handler = TimerHandler { engine: Arc::downgrade(&engine) }; + engine.timeout_service.register_handler(Arc::new(handler)).expect("Error creating engine timeout service"); + engine } fn proposer(&self) -> Address { let ref p = self.our_params; - p.validators.get(p.proposer_nonce.load(AtomicOrdering::Relaxed)%p.validator_n).unwrap().clone() + p.validators.get(self.proposer_nonce.load(AtomicOrdering::Relaxed)%p.validator_n).unwrap().clone() } fn is_proposer(&self, address: &Address) -> bool { @@ -180,15 +150,21 @@ impl Tendermint { self.threshold()) } + fn to_step(&self, step: Step) { + let mut guard = self.s.try_write().unwrap(); + *guard = step; + } + fn to_propose(&self) { - self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); - let mut guard = self.our_params.s.write(); - *guard = Step::Propose; + trace!(target: "tendermint", "step: entering propose"); + println!("step: entering propose"); + self.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); + self.to_step(Step::Propose); } fn propose_message(&self, message: UntrustedRlp) -> Result { // Check if message is for correct step. - match *self.our_params.s.try_read().unwrap() { + match *self.s.try_read().unwrap() { Step::Propose => (), _ => try!(Err(EngineError::WrongStep)), } @@ -198,20 +174,24 @@ impl Tendermint { } fn to_prevote(&self, proposal: H256) { - let mut guard = self.our_params.s.write(); + trace!(target: "tendermint", "step: entering prevote"); + println!("step: entering prevote"); // Proceed to the prevote step. - *guard = Step::Prevote(self.new_vote(proposal)); + self.to_step(Step::Prevote(self.new_vote(proposal))); } fn prevote_message(&self, sender: Address, message: UntrustedRlp) -> Result { // Check if message is for correct step. - match *self.our_params.s.try_write().unwrap() { + match *self.s.try_write().unwrap() { Step::Prevote(ref mut vote) => { // Vote if message is about the right block. if vote.hash == try!(message.as_val()) { vote.vote(sender); // Move to next step is prevote is won. - if vote.is_won() { self.to_precommit(vote.hash); } + if vote.is_won() { + //self.our_params.timeouts.precommit + self.to_precommit(vote.hash); + } Ok(message.as_raw().to_vec()) } else { try!(Err(EngineError::WrongVote)) @@ -222,13 +202,14 @@ impl Tendermint { } fn to_precommit(&self, proposal: H256) { - let mut guard = self.our_params.s.write(); - *guard = Step::Precommit(self.new_vote(proposal), Vec::new()); + trace!(target: "tendermint", "step: entering precommit"); + println!("step: entering precommit"); + self.to_step(Step::Precommit(self.new_vote(proposal), Vec::new())); } fn precommit_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result { // Check if message is for correct step. - match *self.our_params.s.try_write().unwrap() { + match *self.s.try_write().unwrap() { Step::Precommit(ref mut vote, ref mut seal) => { // Vote and accumulate seal if message is about the right block. if vote.hash == try!(message.as_val()) { @@ -245,13 +226,18 @@ impl Tendermint { } fn to_commit(&self, seal: Seal) { - let mut guard = self.our_params.s.write(); - *guard = Step::Commit(seal); + trace!(target: "tendermint", "step: entering commit"); + println!("step: entering commit"); + self.to_step(Step::Commit(seal)); } fn threshold(&self) -> usize { self.our_params.validator_n*2/3 } + + fn next_timeout(&self) -> u64 { + self.timeout.load(AtomicOrdering::Relaxed) as u64 + } } impl Engine for Tendermint { @@ -305,7 +291,7 @@ impl Engine for Tendermint { fn handle_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result { // Check if correct round. - if self.our_params.r != try!(message.val_at(0)) { try!(Err(EngineError::WrongRound)) } + if self.r != try!(message.val_at(0)) { try!(Err(EngineError::WrongRound)) } // Handle according to step. match try!(message.val_at(1)) { 0u8 if self.is_proposer(&sender) => self.propose_message(try!(message.at(2))), @@ -359,17 +345,55 @@ impl Engine for Tendermint { } } +/// Base timeout of each step in ms. +#[derive(Debug)] +struct DefaultTimeouts { + propose: Ms, + prevote: Ms, + precommit: Ms, + commit: Ms +} + +type Ms = usize; +type Seal = Vec; +type AtomicMs = AtomicUsize; + +/// Timer token representing the consensus step timeouts. +pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0; + +impl IoHandler for TimerHandler { + fn initialize(&self, io: &IoContext) { + if let Some(engine) = self.engine.upgrade() { + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout"); + } + } + + fn timeout(&self, io: &IoContext, timer: TimerToken) { + if timer == ENGINE_TIMEOUT_TOKEN { + if let Some(engine) = self.engine.upgrade() { + println!("Timeout: {:?}", get_time()); + engine.to_propose(); + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") + } + } + } + + fn message(&self, io: &IoContext, _net_message: &StepMessage) { + if let Some(engine) = self.engine.upgrade() { + println!("Message: {:?}", get_time().sec); + io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") + } + } +} + #[cfg(test)] mod tests { use common::*; use std::thread::sleep; - use std::time::{Duration, Instant}; + use std::time::{Duration}; use block::*; use tests::helpers::*; - use service::{ClientService, ClientIoMessage}; - use devtools::RandomTempPath; - use client::ClientConfig; - use miner::Miner; use account_provider::AccountProvider; use spec::Spec; use engines::Engine; @@ -379,23 +403,6 @@ mod tests { /// Account "0".sha3() and "1".sha3() are a validators. fn new_test_tendermint() -> Spec { Spec::load(include_bytes!("../../res/tendermint.json")) } - fn new_test_client_service() -> ClientService { - let temp_path = RandomTempPath::new(); - let mut path = temp_path.as_path().to_owned(); - path.push("pruning"); - path.push("db"); - - let spec = get_test_spec(); - let service = ClientService::start( - ClientConfig::default(), - &spec, - &path, - &path, - Arc::new(Miner::with_spec(&spec)), - ); - service.unwrap() - } - fn propose_default(engine: &Arc, proposer: Address) -> Result { let mut s = RlpStream::new_list(3); let header = Header::default(); @@ -406,6 +413,10 @@ mod tests { engine.handle_message(proposer, H520::default(), propose_rlp) } + fn default_block() -> Vec { + vec![160, 39, 191, 179, 126, 80, 124, 233, 13, 161, 65, 48, 114, 4, 177, 198, 186, 36, 25, 67, 128, 97, 53, 144, 172, 80, 202, 75, 29, 113, 152, 255, 101] + } + #[test] fn has_valid_metadata() { let engine = new_test_tendermint().engine; @@ -474,36 +485,38 @@ mod tests { assert!(propose_default(&engine, not_proposer_addr).is_err()); let proposer_addr = tap.insert_account("1".sha3(), "1").unwrap(); - assert_eq!(vec![160, 39, 191, 179, 126, 80, 124, 233, 13, 161, 65, 48, 114, 4, 177, 198, 186, 36, 25, 67, 128, 97, 53, 144, 172, 80, 202, 75, 29, 113, 152, 255, 101], - propose_default(&engine, proposer_addr).unwrap()); + assert_eq!(default_block(), propose_default(&engine, proposer_addr).unwrap()); assert!(propose_default(&engine, proposer_addr).is_err()); assert!(propose_default(&engine, not_proposer_addr).is_err()); } + #[test] + fn proposer_switching() { + let engine = new_test_tendermint().engine; + let tap = AccountProvider::transient_provider(); + + let not_proposer_addr = tap.insert_account("0".sha3(), "0").unwrap(); + assert!(propose_default(&engine, not_proposer_addr).is_err()); + + sleep(Duration::from_secs(3)); + + assert_eq!(default_block(), propose_default(&engine, not_proposer_addr).unwrap()); + } + #[test] fn prevote_step() { let engine = new_test_tendermint().engine; propose_default(&engine, Address::default()); } - #[test] - fn handle_message() { - false; - } - #[test] fn timeout_switching() { - let service = new_test_client_service(); let engine = new_test_tendermint().engine; let tender = Tendermint::new(engine.params().clone(), TendermintParams::default(), BTreeMap::new()); - service.register_io_handler(Arc::new(tender)); println!("Waiting for timeout"); - sleep(Duration::from_secs(10)); + sleep(Duration::from_secs(60)); - let message_channel = service.io().channel(); - message_channel.send(ClientIoMessage::ConsensusStep(1000)); - sleep(Duration::from_secs(5)); } } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index e378e63de..e2e4772a4 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -46,8 +46,6 @@ pub enum ClientIoMessage { FeedStateChunk(H256, Bytes), /// Feed a block chunk to the snapshot service FeedBlockChunk(H256, Bytes), - /// Signal consensus step timeout. - ConsensusStep(u64), } /// Client service setup. Creates and registers client and network services with the IO subsystem. @@ -148,8 +146,6 @@ struct ClientIoHandler { const CLIENT_TICK_TIMER: TimerToken = 0; const CLIENT_TICK_MS: u64 = 5000; -/// Timer token representing the consensus step timeouts. -pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 1; impl IoHandler for ClientIoHandler { fn initialize(&self, io: &IoContext) { diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index d5f357097..96bb5354b 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -137,7 +137,7 @@ impl Spec { ethjson::spec::Engine::InstantSeal => Arc::new(InstantSeal::new(params, builtins)), ethjson::spec::Engine::Ethash(ethash) => Arc::new(ethereum::Ethash::new(params, From::from(ethash.params), builtins)), ethjson::spec::Engine::BasicAuthority(basic_authority) => Arc::new(BasicAuthority::new(params, From::from(basic_authority.params), builtins)), - ethjson::spec::Engine::Tendermint(tendermint) => Arc::new(Tendermint::new(params, From::from(tendermint.params), builtins)), + ethjson::spec::Engine::Tendermint(tendermint) => Tendermint::new(params, From::from(tendermint.params), builtins), } }