From 6cbb859bd2fb5dd4cb15efbb2036c5c1dce8e388 Mon Sep 17 00:00:00 2001 From: keorn Date: Thu, 29 Sep 2016 14:44:42 +0100 Subject: [PATCH] add tendermint message types and deserialization --- ethcore/src/engines/tendermint/message.rs | 81 ++++++++++++++ .../{tendermint.rs => tendermint/mod.rs} | 105 ++++-------------- ethcore/src/engines/tendermint/timeout.rs | 104 +++++++++++++++++ 3 files changed, 206 insertions(+), 84 deletions(-) create mode 100644 ethcore/src/engines/tendermint/message.rs rename ethcore/src/engines/{tendermint.rs => tendermint/mod.rs} (88%) create mode 100644 ethcore/src/engines/tendermint/timeout.rs diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs new file mode 100644 index 000000000..86691b476 --- /dev/null +++ b/ethcore/src/engines/tendermint/message.rs @@ -0,0 +1,81 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Tendermint message handling. + +use super::{Height, Round, BlockHash}; +use rlp::{View, DecoderError, Decodable, Decoder, Encodable, RlpStream, Stream}; + +pub enum ConsensusMessage { + Prevote(Height, Round, BlockHash), + Precommit(Height, Round, BlockHash), + Commit(Height, BlockHash), +} + +/// (height, step, ...) +impl Decodable for ConsensusMessage { + fn decode(decoder: &D) -> Result where D: Decoder { + // Handle according to step. + let rlp = decoder.as_rlp(); + if decoder.as_raw().len() != try!(rlp.payload_info()).total() { + return Err(DecoderError::RlpIsTooBig); + } + let height = try!(rlp.val_at(0)); + Ok(match try!(rlp.val_at(1)) { + 0u8 => ConsensusMessage::Prevote( + height, + try!(rlp.val_at(2)), + try!(rlp.val_at(3)) + ), + 1 => ConsensusMessage::Precommit( + height, + try!(rlp.val_at(2)), + try!(rlp.val_at(3)) + ), + 2 => ConsensusMessage::Commit( + height, + try!(rlp.val_at(2))), + _ => return Err(DecoderError::Custom("Unknown step.")), + }) + } +} + +impl Encodable for ConsensusMessage { + fn rlp_append(&self, s: &mut RlpStream) { + match *self { + ConsensusMessage::Prevote(h, r, hash) => { + s.begin_list(4); + s.append(&h); + s.append(&0u8); + s.append(&r); + s.append(&hash); + }, + ConsensusMessage::Precommit(h, r, hash) => { + s.begin_list(4); + s.append(&h); + s.append(&1u8); + s.append(&r); + s.append(&hash); + }, + ConsensusMessage::Commit(h, hash) => { + s.begin_list(3); + s.append(&h); + s.append(&2u8); + s.append(&hash); + }, + } + } +} diff --git a/ethcore/src/engines/tendermint.rs b/ethcore/src/engines/tendermint/mod.rs similarity index 88% rename from ethcore/src/engines/tendermint.rs rename to ethcore/src/engines/tendermint/mod.rs index b88874b84..4ebaca347 100644 --- a/ethcore/src/engines/tendermint.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -16,8 +16,10 @@ //! Tendermint BFT consensus engine with round robin proof-of-authority. +mod message; +mod timeout; + use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; -use std::sync::Weak; use common::*; use rlp::{UntrustedRlp, View, encode}; use ethkey::{recover, public_to_address}; @@ -27,8 +29,9 @@ use spec::CommonParams; use engines::{Engine, EngineError, ProposeCollect}; use evm::Schedule; use ethjson; -use io::{IoContext, IoHandler, TimerToken, IoService}; -use time::get_time; +use io::IoService; +use self::message::ConsensusMessage; +use self::timeout::{TimerHandler, NextStep, DefaultTimeouts}; /// `Tendermint` params. #[derive(Debug, Clone)] @@ -63,9 +66,16 @@ enum Step { /// Precommit step storing the precommit vote and accumulating seal. Precommit(ProposeCollect, Seal), /// Commit step storing a complete valid seal. - Commit(H256, Seal) + Commit(BlockHash, Seal) } +pub type Height = usize; +pub type Round = usize; +pub type BlockHash = H256; + +pub type AtomicMs = AtomicUsize; +type Seal = Vec; + impl From for TendermintParams { fn from(p: ethjson::spec::TendermintParams) -> Self { let val: Vec<_> = p.validators.into_iter().map(Into::into).collect(); @@ -79,15 +89,12 @@ impl From for TendermintParams { } } -#[derive(Clone)] -struct StepMessage; - /// Engine using `Tendermint` consensus algorithm, suitable for EVM chain. pub struct Tendermint { params: CommonParams, our_params: TendermintParams, builtins: BTreeMap, - timeout_service: IoService, + timeout_service: IoService, /// Consensus round. r: AtomicUsize, /// Consensus step. @@ -98,25 +105,21 @@ pub struct Tendermint { proposer_nonce: AtomicUsize, } -struct TimerHandler { - engine: Weak, -} - impl Tendermint { /// Create a new instance of Tendermint engine pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap) -> Arc { let engine = Arc::new( Tendermint { params: params, - timeout: AtomicUsize::new(our_params.timeouts.propose), + timeout: AtomicUsize::new(our_params.timeouts.propose()), our_params: our_params, builtins: builtins, - timeout_service: IoService::::start().expect("Error creating engine timeout service"), + timeout_service: IoService::::start().expect("Error creating engine timeout service"), r: AtomicUsize::new(0), s: RwLock::new(Step::Propose), proposer_nonce: AtomicUsize::new(0) }); - let handler = TimerHandler { engine: Arc::downgrade(&engine) }; + let handler = TimerHandler::new(Arc::downgrade(&engine)); engine.timeout_service.register_handler(Arc::new(handler)).expect("Error creating engine timeout service"); engine } @@ -134,7 +137,7 @@ impl Tendermint { self.our_params.validators.contains(address) } - fn new_vote(&self, proposal: H256) -> ProposeCollect { + fn new_vote(&self, proposal: BlockHash) -> ProposeCollect { ProposeCollect::new(proposal, self.our_params.validators.iter().cloned().collect(), self.threshold()) @@ -163,7 +166,7 @@ impl Tendermint { Ok(message.as_raw().to_vec()) } - fn to_prevote(&self, proposal: H256) { + fn to_prevote(&self, proposal: BlockHash) { trace!(target: "tendermint", "step: entering prevote"); println!("step: entering prevote"); // Proceed to the prevote step. @@ -195,7 +198,7 @@ impl Tendermint { Ok(message.as_raw().to_vec()) } - fn to_precommit(&self, proposal: H256) { + fn to_precommit(&self, proposal: BlockHash) { trace!(target: "tendermint", "step: entering precommit"); println!("step: entering precommit"); self.to_step(Step::Precommit(self.new_vote(proposal), Vec::new())); @@ -349,72 +352,6 @@ impl Engine for Tendermint { } } -/// Base timeout of each step in ms. -#[derive(Debug, Clone)] -struct DefaultTimeouts { - propose: Ms, - prevote: Ms, - precommit: Ms, - commit: Ms -} - -impl Default for DefaultTimeouts { - fn default() -> Self { - DefaultTimeouts { - propose: 1000, - prevote: 1000, - precommit: 1000, - commit: 1000 - } - } -} - -type Ms = usize; -type Seal = Vec; -type AtomicMs = AtomicUsize; - -/// Timer token representing the consensus step timeouts. -pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0; - -impl IoHandler for TimerHandler { - fn initialize(&self, io: &IoContext) { - if let Some(engine) = self.engine.upgrade() { - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout"); - } - } - - fn timeout(&self, io: &IoContext, timer: TimerToken) { - if timer == ENGINE_TIMEOUT_TOKEN { - if let Some(engine) = self.engine.upgrade() { - println!("Timeout: {:?}", get_time()); - // Can you release entering a clause? - let next_step = match *engine.s.try_read().unwrap() { - Step::Propose => Step::Propose, - Step::Prevote(_) => Step::Propose, - Step::Precommit(_, _) => Step::Propose, - Step::Commit(_, _) => { - engine.r.fetch_add(1, AtomicOrdering::Relaxed); - Step::Propose - }, - }; - match next_step { - Step::Propose => engine.to_propose(), - _ => (), - } - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") - } - } - } - - fn message(&self, io: &IoContext, _net_message: &StepMessage) { - if let Some(engine) = self.engine.upgrade() { - println!("Message: {:?}", get_time().sec); - io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") - } - } -} - #[cfg(test)] mod tests { use common::*; diff --git a/ethcore/src/engines/tendermint/timeout.rs b/ethcore/src/engines/tendermint/timeout.rs new file mode 100644 index 000000000..979c08a39 --- /dev/null +++ b/ethcore/src/engines/tendermint/timeout.rs @@ -0,0 +1,104 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Tendermint BFT consensus engine with round robin proof-of-authority. + +use std::sync::atomic::{Ordering as AtomicOrdering}; +use std::sync::Weak; +use io::{IoContext, IoHandler, TimerToken}; +use super::{Tendermint, Step}; +use time::get_time; + +pub struct TimerHandler { + engine: Weak, +} + +impl TimerHandler { + pub fn new(engine: Weak) -> Self { + TimerHandler { engine: engine } + } +} + +/// Base timeout of each step in ms. +#[derive(Debug, Clone)] +pub struct DefaultTimeouts { + propose: Ms, + prevote: Ms, + precommit: Ms, + commit: Ms +} + +impl DefaultTimeouts { + pub fn propose(&self) -> usize { self.propose } +} + +impl Default for DefaultTimeouts { + fn default() -> Self { + DefaultTimeouts { + propose: 1000, + prevote: 1000, + precommit: 1000, + commit: 1000 + } + } +} + +type Ms = usize; + +#[derive(Clone)] +pub struct NextStep; + +/// Timer token representing the consensus step timeouts. +pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0; + +impl IoHandler for TimerHandler { + fn initialize(&self, io: &IoContext) { + if let Some(engine) = self.engine.upgrade() { + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout"); + } + } + + fn timeout(&self, io: &IoContext, timer: TimerToken) { + if timer == ENGINE_TIMEOUT_TOKEN { + if let Some(engine) = self.engine.upgrade() { + println!("Timeout: {:?}", get_time()); + // Can you release entering a clause? + let next_step = match *engine.s.try_read().unwrap() { + Step::Propose => Step::Propose, + Step::Prevote(_) => Step::Propose, + Step::Precommit(_, _) => Step::Propose, + Step::Commit(_, _) => { + engine.r.fetch_add(1, AtomicOrdering::Relaxed); + Step::Propose + }, + }; + match next_step { + Step::Propose => engine.to_propose(), + _ => (), + } + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") + } + } + } + + fn message(&self, io: &IoContext, _net_message: &NextStep) { + if let Some(engine) = self.engine.upgrade() { + println!("Message: {:?}", get_time().sec); + io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") + } + } +}