diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs
new file mode 100644
index 000000000..86691b476
--- /dev/null
+++ b/ethcore/src/engines/tendermint/message.rs
@@ -0,0 +1,81 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+//! Tendermint message handling.
+
+use super::{Height, Round, BlockHash};
+use rlp::{View, DecoderError, Decodable, Decoder, Encodable, RlpStream, Stream};
+
+pub enum ConsensusMessage {
+ Prevote(Height, Round, BlockHash),
+ Precommit(Height, Round, BlockHash),
+ Commit(Height, BlockHash),
+}
+
+/// (height, step, ...)
+impl Decodable for ConsensusMessage {
+ fn decode(decoder: &D) -> Result where D: Decoder {
+ // Handle according to step.
+ let rlp = decoder.as_rlp();
+ if decoder.as_raw().len() != try!(rlp.payload_info()).total() {
+ return Err(DecoderError::RlpIsTooBig);
+ }
+ let height = try!(rlp.val_at(0));
+ Ok(match try!(rlp.val_at(1)) {
+ 0u8 => ConsensusMessage::Prevote(
+ height,
+ try!(rlp.val_at(2)),
+ try!(rlp.val_at(3))
+ ),
+ 1 => ConsensusMessage::Precommit(
+ height,
+ try!(rlp.val_at(2)),
+ try!(rlp.val_at(3))
+ ),
+ 2 => ConsensusMessage::Commit(
+ height,
+ try!(rlp.val_at(2))),
+ _ => return Err(DecoderError::Custom("Unknown step.")),
+ })
+ }
+}
+
+impl Encodable for ConsensusMessage {
+ fn rlp_append(&self, s: &mut RlpStream) {
+ match *self {
+ ConsensusMessage::Prevote(h, r, hash) => {
+ s.begin_list(4);
+ s.append(&h);
+ s.append(&0u8);
+ s.append(&r);
+ s.append(&hash);
+ },
+ ConsensusMessage::Precommit(h, r, hash) => {
+ s.begin_list(4);
+ s.append(&h);
+ s.append(&1u8);
+ s.append(&r);
+ s.append(&hash);
+ },
+ ConsensusMessage::Commit(h, hash) => {
+ s.begin_list(3);
+ s.append(&h);
+ s.append(&2u8);
+ s.append(&hash);
+ },
+ }
+ }
+}
diff --git a/ethcore/src/engines/tendermint.rs b/ethcore/src/engines/tendermint/mod.rs
similarity index 88%
rename from ethcore/src/engines/tendermint.rs
rename to ethcore/src/engines/tendermint/mod.rs
index b88874b84..4ebaca347 100644
--- a/ethcore/src/engines/tendermint.rs
+++ b/ethcore/src/engines/tendermint/mod.rs
@@ -16,8 +16,10 @@
//! Tendermint BFT consensus engine with round robin proof-of-authority.
+mod message;
+mod timeout;
+
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
-use std::sync::Weak;
use common::*;
use rlp::{UntrustedRlp, View, encode};
use ethkey::{recover, public_to_address};
@@ -27,8 +29,9 @@ use spec::CommonParams;
use engines::{Engine, EngineError, ProposeCollect};
use evm::Schedule;
use ethjson;
-use io::{IoContext, IoHandler, TimerToken, IoService};
-use time::get_time;
+use io::IoService;
+use self::message::ConsensusMessage;
+use self::timeout::{TimerHandler, NextStep, DefaultTimeouts};
/// `Tendermint` params.
#[derive(Debug, Clone)]
@@ -63,9 +66,16 @@ enum Step {
/// Precommit step storing the precommit vote and accumulating seal.
Precommit(ProposeCollect, Seal),
/// Commit step storing a complete valid seal.
- Commit(H256, Seal)
+ Commit(BlockHash, Seal)
}
+pub type Height = usize;
+pub type Round = usize;
+pub type BlockHash = H256;
+
+pub type AtomicMs = AtomicUsize;
+type Seal = Vec;
+
impl From for TendermintParams {
fn from(p: ethjson::spec::TendermintParams) -> Self {
let val: Vec<_> = p.validators.into_iter().map(Into::into).collect();
@@ -79,15 +89,12 @@ impl From for TendermintParams {
}
}
-#[derive(Clone)]
-struct StepMessage;
-
/// Engine using `Tendermint` consensus algorithm, suitable for EVM chain.
pub struct Tendermint {
params: CommonParams,
our_params: TendermintParams,
builtins: BTreeMap,
- timeout_service: IoService,
+ timeout_service: IoService,
/// Consensus round.
r: AtomicUsize,
/// Consensus step.
@@ -98,25 +105,21 @@ pub struct Tendermint {
proposer_nonce: AtomicUsize,
}
-struct TimerHandler {
- engine: Weak,
-}
-
impl Tendermint {
/// Create a new instance of Tendermint engine
pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap) -> Arc {
let engine = Arc::new(
Tendermint {
params: params,
- timeout: AtomicUsize::new(our_params.timeouts.propose),
+ timeout: AtomicUsize::new(our_params.timeouts.propose()),
our_params: our_params,
builtins: builtins,
- timeout_service: IoService::::start().expect("Error creating engine timeout service"),
+ timeout_service: IoService::::start().expect("Error creating engine timeout service"),
r: AtomicUsize::new(0),
s: RwLock::new(Step::Propose),
proposer_nonce: AtomicUsize::new(0)
});
- let handler = TimerHandler { engine: Arc::downgrade(&engine) };
+ let handler = TimerHandler::new(Arc::downgrade(&engine));
engine.timeout_service.register_handler(Arc::new(handler)).expect("Error creating engine timeout service");
engine
}
@@ -134,7 +137,7 @@ impl Tendermint {
self.our_params.validators.contains(address)
}
- fn new_vote(&self, proposal: H256) -> ProposeCollect {
+ fn new_vote(&self, proposal: BlockHash) -> ProposeCollect {
ProposeCollect::new(proposal,
self.our_params.validators.iter().cloned().collect(),
self.threshold())
@@ -163,7 +166,7 @@ impl Tendermint {
Ok(message.as_raw().to_vec())
}
- fn to_prevote(&self, proposal: H256) {
+ fn to_prevote(&self, proposal: BlockHash) {
trace!(target: "tendermint", "step: entering prevote");
println!("step: entering prevote");
// Proceed to the prevote step.
@@ -195,7 +198,7 @@ impl Tendermint {
Ok(message.as_raw().to_vec())
}
- fn to_precommit(&self, proposal: H256) {
+ fn to_precommit(&self, proposal: BlockHash) {
trace!(target: "tendermint", "step: entering precommit");
println!("step: entering precommit");
self.to_step(Step::Precommit(self.new_vote(proposal), Vec::new()));
@@ -349,72 +352,6 @@ impl Engine for Tendermint {
}
}
-/// Base timeout of each step in ms.
-#[derive(Debug, Clone)]
-struct DefaultTimeouts {
- propose: Ms,
- prevote: Ms,
- precommit: Ms,
- commit: Ms
-}
-
-impl Default for DefaultTimeouts {
- fn default() -> Self {
- DefaultTimeouts {
- propose: 1000,
- prevote: 1000,
- precommit: 1000,
- commit: 1000
- }
- }
-}
-
-type Ms = usize;
-type Seal = Vec;
-type AtomicMs = AtomicUsize;
-
-/// Timer token representing the consensus step timeouts.
-pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0;
-
-impl IoHandler for TimerHandler {
- fn initialize(&self, io: &IoContext) {
- if let Some(engine) = self.engine.upgrade() {
- io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout");
- }
- }
-
- fn timeout(&self, io: &IoContext, timer: TimerToken) {
- if timer == ENGINE_TIMEOUT_TOKEN {
- if let Some(engine) = self.engine.upgrade() {
- println!("Timeout: {:?}", get_time());
- // Can you release entering a clause?
- let next_step = match *engine.s.try_read().unwrap() {
- Step::Propose => Step::Propose,
- Step::Prevote(_) => Step::Propose,
- Step::Precommit(_, _) => Step::Propose,
- Step::Commit(_, _) => {
- engine.r.fetch_add(1, AtomicOrdering::Relaxed);
- Step::Propose
- },
- };
- match next_step {
- Step::Propose => engine.to_propose(),
- _ => (),
- }
- io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
- }
- }
- }
-
- fn message(&self, io: &IoContext, _net_message: &StepMessage) {
- if let Some(engine) = self.engine.upgrade() {
- println!("Message: {:?}", get_time().sec);
- io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer.");
- io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
- }
- }
-}
-
#[cfg(test)]
mod tests {
use common::*;
diff --git a/ethcore/src/engines/tendermint/timeout.rs b/ethcore/src/engines/tendermint/timeout.rs
new file mode 100644
index 000000000..979c08a39
--- /dev/null
+++ b/ethcore/src/engines/tendermint/timeout.rs
@@ -0,0 +1,104 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+//! Tendermint BFT consensus engine with round robin proof-of-authority.
+
+use std::sync::atomic::{Ordering as AtomicOrdering};
+use std::sync::Weak;
+use io::{IoContext, IoHandler, TimerToken};
+use super::{Tendermint, Step};
+use time::get_time;
+
+pub struct TimerHandler {
+ engine: Weak,
+}
+
+impl TimerHandler {
+ pub fn new(engine: Weak) -> Self {
+ TimerHandler { engine: engine }
+ }
+}
+
+/// Base timeout of each step in ms.
+#[derive(Debug, Clone)]
+pub struct DefaultTimeouts {
+ propose: Ms,
+ prevote: Ms,
+ precommit: Ms,
+ commit: Ms
+}
+
+impl DefaultTimeouts {
+ pub fn propose(&self) -> usize { self.propose }
+}
+
+impl Default for DefaultTimeouts {
+ fn default() -> Self {
+ DefaultTimeouts {
+ propose: 1000,
+ prevote: 1000,
+ precommit: 1000,
+ commit: 1000
+ }
+ }
+}
+
+type Ms = usize;
+
+#[derive(Clone)]
+pub struct NextStep;
+
+/// Timer token representing the consensus step timeouts.
+pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0;
+
+impl IoHandler for TimerHandler {
+ fn initialize(&self, io: &IoContext) {
+ if let Some(engine) = self.engine.upgrade() {
+ io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout");
+ }
+ }
+
+ fn timeout(&self, io: &IoContext, timer: TimerToken) {
+ if timer == ENGINE_TIMEOUT_TOKEN {
+ if let Some(engine) = self.engine.upgrade() {
+ println!("Timeout: {:?}", get_time());
+ // Can you release entering a clause?
+ let next_step = match *engine.s.try_read().unwrap() {
+ Step::Propose => Step::Propose,
+ Step::Prevote(_) => Step::Propose,
+ Step::Precommit(_, _) => Step::Propose,
+ Step::Commit(_, _) => {
+ engine.r.fetch_add(1, AtomicOrdering::Relaxed);
+ Step::Propose
+ },
+ };
+ match next_step {
+ Step::Propose => engine.to_propose(),
+ _ => (),
+ }
+ io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
+ }
+ }
+ }
+
+ fn message(&self, io: &IoContext, _net_message: &NextStep) {
+ if let Some(engine) = self.engine.upgrade() {
+ println!("Message: {:?}", get_time().sec);
+ io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer.");
+ io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
+ }
+ }
+}