diff --git a/ethcore/res/tendermint.json b/ethcore/res/tendermint.json
index 126761233..8aa8f24f7 100644
--- a/ethcore/res/tendermint.json
+++ b/ethcore/res/tendermint.json
@@ -4,7 +4,6 @@
"Tendermint": {
"params": {
"gasLimitBoundDivisor": "0x0400",
- "durationLimit": "0x0d",
"validators" : [
"0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e",
"0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1"
diff --git a/ethcore/src/engines/propose_collect.rs b/ethcore/src/engines/propose_collect.rs
index ee4aa3810..46defd557 100644
--- a/ethcore/src/engines/propose_collect.rs
+++ b/ethcore/src/engines/propose_collect.rs
@@ -49,10 +49,13 @@ impl ProposeCollect {
/// Vote on hash using the signed hash, true if vote counted.
pub fn vote(&self, voter: Address) -> bool {
- if self.votes.try_read().unwrap().contains(&voter) { return false; }
- if !self.voters.contains(&voter) { return false; }
- self.votes.try_write().unwrap().insert(voter);
- true
+ match self.votes.try_read().unwrap().contains(&voter) || !self.voters.contains(&voter) {
+ true => false,
+ false => {
+ self.votes.try_write().unwrap().insert(voter);
+ true
+ },
+ }
}
/// Some winner if voting threshold was reached.
diff --git a/ethcore/src/engines/tendermint.rs b/ethcore/src/engines/tendermint.rs
index 116a0ce16..8ccbbf95a 100644
--- a/ethcore/src/engines/tendermint.rs
+++ b/ethcore/src/engines/tendermint.rs
@@ -17,7 +17,6 @@
//! Tendermint BFT consensus engine with round robin proof-of-authority.
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
-use std::time::Duration;
use common::*;
use account_provider::AccountProvider;
use block::*;
@@ -25,28 +24,54 @@ use spec::CommonParams;
use engines::{Engine, EngineError, ProposeCollect};
use evm::Schedule;
use ethjson;
+use io::{IoContext, IoHandler, TimerToken};
+use service::{ClientIoMessage, ENGINE_TIMEOUT_TOKEN};
+use time::get_time;
/// `Tendermint` params.
#[derive(Debug)]
pub struct TendermintParams {
/// Gas limit divisor.
pub gas_limit_bound_divisor: U256,
- /// Block duration.
- pub duration_limit: u64,
/// List of validators.
pub validators: Vec
,
/// Number of validators.
pub validator_n: usize,
/// Timeout durations for different steps.
- timeouts: Timeouts,
+ timeouts: DefaultTimeouts,
/// Consensus round.
r: u64,
/// Consensus step.
s: RwLock,
+ /// Current step timeout in ms.
+ timeout: AtomicTimerToken,
/// Used to swith proposer.
proposer_nonce: AtomicUsize,
}
+impl Default for TendermintParams {
+ fn default() -> Self {
+ let validators = vec!["0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e".into(), "0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1".into()];
+ let val_n = validators.len();
+ let propose_timeout = 3000;
+ TendermintParams {
+ gas_limit_bound_divisor: 0x0400.into(),
+ validators: validators,
+ validator_n: val_n,
+ timeouts: DefaultTimeouts {
+ propose: propose_timeout,
+ prevote: 3000,
+ precommit: 3000,
+ commit: 3000
+ },
+ r: 0,
+ s: RwLock::new(Step::Propose),
+ timeout: AtomicUsize::new(propose_timeout),
+ proposer_nonce: AtomicUsize::new(0)
+ }
+ }
+}
+
#[derive(Debug)]
enum Step {
Propose,
@@ -58,27 +83,62 @@ enum Step {
}
#[derive(Debug)]
-struct Timeouts {
- propose: Duration,
- prevote: Duration,
- precommit: Duration,
- commit: Duration
+struct DefaultTimeouts {
+ propose: TimerToken,
+ prevote: TimerToken,
+ precommit: TimerToken,
+ commit: TimerToken
}
type Seal = Vec;
+type AtomicTimerToken = AtomicUsize;
+
+impl IoHandler for Tendermint {
+ fn initialize(&self, io: &IoContext) {
+ io.register_timer(ENGINE_TIMEOUT_TOKEN, self.our_params.timeout.load(AtomicOrdering::Relaxed) as u64).expect("Error registering engine timeout");
+ }
+
+ fn timeout(&self, io: &IoContext, timer: TimerToken) {
+ if timer == ENGINE_TIMEOUT_TOKEN {
+ println!("Timeout: {:?}", get_time().sec);
+ io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer.");
+ match *self.our_params.s.try_read().unwrap() {
+ Step::Propose => self.to_propose(),
+ Step::Prevote(ref proposal) => self.to_precommit(proposal.hash.clone()),
+ Step::Precommit(_, _) => self.to_propose(),
+ Step::Commit(_) => self.to_propose(),
+ };
+ io.register_timer(ENGINE_TIMEOUT_TOKEN, 3000).expect("Failed to start new consensus timer.")
+ }
+ }
+
+ fn message(&self, io: &IoContext, net_message: &ClientIoMessage) {
+ if let &ClientIoMessage::ConsensusStep(next_timeout) = net_message {
+ println!("Message: {:?}", get_time().sec);
+ io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer.");
+ io.register_timer(ENGINE_TIMEOUT_TOKEN, next_timeout).expect("Failed to start new consensus timer.")
+ }
+ }
+}
impl From for TendermintParams {
fn from(p: ethjson::spec::TendermintParams) -> Self {
let val: Vec<_> = p.validators.into_iter().map(Into::into).collect();
let val_n = val.len();
+ let propose_timeout = 3;
TendermintParams {
gas_limit_bound_divisor: p.gas_limit_bound_divisor.into(),
- duration_limit: p.duration_limit.into(),
validators: val,
validator_n: val_n,
- timeouts: Timeouts { propose: Duration::from_secs(3), prevote: Duration::from_secs(3), precommit: Duration::from_secs(3), commit: Duration::from_secs(3) },
+ timeouts: DefaultTimeouts {
+ propose: propose_timeout,
+ prevote: 3,
+ precommit: 3,
+ commit: 3
+ },
r: 0,
s: RwLock::new(Step::Propose),
+ timeout: AtomicUsize::new(propose_timeout),
proposer_nonce: AtomicUsize::new(0)
}
}
@@ -120,6 +180,12 @@ impl Tendermint {
self.threshold())
}
+ fn to_propose(&self) {
+ self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed);
+ let mut guard = self.our_params.s.write();
+ *guard = Step::Propose;
+ }
+
fn propose_message(&self, message: UntrustedRlp) -> Result {
// Check if message is for correct step.
match *self.our_params.s.try_read().unwrap() {
@@ -127,11 +193,14 @@ impl Tendermint {
_ => try!(Err(EngineError::WrongStep)),
}
let proposal = try!(message.as_val());
- self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed);
+ self.to_prevote(proposal);
+ Ok(message.as_raw().to_vec())
+ }
+
+ fn to_prevote(&self, proposal: H256) {
let mut guard = self.our_params.s.write();
// Proceed to the prevote step.
*guard = Step::Prevote(self.new_vote(proposal));
- Ok(message.as_raw().to_vec())
}
fn prevote_message(&self, sender: Address, message: UntrustedRlp) -> Result {
@@ -142,13 +211,8 @@ impl Tendermint {
if vote.hash == try!(message.as_val()) {
vote.vote(sender);
// Move to next step is prevote is won.
- if vote.is_won() {
- let mut guard = self.our_params.s.write();
- *guard = Step::Precommit(self.new_vote(vote.hash), Vec::new());
- Ok(message.as_raw().to_vec())
- } else {
- Ok(message.as_raw().to_vec())
- }
+ if vote.is_won() { self.to_precommit(vote.hash); }
+ Ok(message.as_raw().to_vec())
} else {
try!(Err(EngineError::WrongVote))
}
@@ -157,6 +221,11 @@ impl Tendermint {
}
}
+ fn to_precommit(&self, proposal: H256) {
+ let mut guard = self.our_params.s.write();
+ *guard = Step::Precommit(self.new_vote(proposal), Vec::new());
+ }
+
fn precommit_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result {
// Check if message is for correct step.
match *self.our_params.s.try_write().unwrap() {
@@ -165,13 +234,8 @@ impl Tendermint {
if vote.hash == try!(message.as_val()) {
if vote.vote(sender) { seal.push(encode(&signature).to_vec()); }
// Commit if precommit is won.
- if vote.is_won() {
- let mut guard = self.our_params.s.write();
- *guard = Step::Commit(seal.clone());
- Ok(message.as_raw().to_vec())
- } else {
- Ok(message.as_raw().to_vec())
- }
+ if vote.is_won() { self.to_commit(seal.clone()); }
+ Ok(message.as_raw().to_vec())
} else {
try!(Err(EngineError::WrongVote))
}
@@ -180,6 +244,11 @@ impl Tendermint {
}
}
+ fn to_commit(&self, seal: Seal) {
+ let mut guard = self.our_params.s.write();
+ *guard = Step::Commit(seal);
+ }
+
fn threshold(&self) -> usize {
self.our_params.validator_n*2/3
}
@@ -294,16 +363,40 @@ impl Engine for Tendermint {
#[cfg(test)]
mod tests {
use common::*;
+ use std::thread::sleep;
+ use std::time::{Duration, Instant};
use block::*;
use tests::helpers::*;
+ use service::{ClientService, ClientIoMessage};
+ use devtools::RandomTempPath;
+ use client::ClientConfig;
+ use miner::Miner;
use account_provider::AccountProvider;
use spec::Spec;
use engines::Engine;
+ use super::{Tendermint, TendermintParams};
/// Create a new test chain spec with `Tendermint` consensus engine.
/// Account "0".sha3() and "1".sha3() are a validators.
fn new_test_tendermint() -> Spec { Spec::load(include_bytes!("../../res/tendermint.json")) }
+ fn new_test_client_service() -> ClientService {
+ let temp_path = RandomTempPath::new();
+ let mut path = temp_path.as_path().to_owned();
+ path.push("pruning");
+ path.push("db");
+
+ let spec = get_test_spec();
+ let service = ClientService::start(
+ ClientConfig::default(),
+ &spec,
+ &path,
+ &path,
+ Arc::new(Miner::with_spec(&spec)),
+ );
+ service.unwrap()
+ }
+
fn propose_default(engine: &Arc, proposer: Address) -> Result {
let mut s = RlpStream::new_list(3);
let header = Header::default();
@@ -399,4 +492,19 @@ mod tests {
fn handle_message() {
false;
}
+
+ #[test]
+ fn timeout_switching() {
+ let service = new_test_client_service();
+ let engine = new_test_tendermint().engine;
+ let tender = Tendermint::new(engine.params().clone(), TendermintParams::default(), BTreeMap::new());
+ service.register_io_handler(Arc::new(tender));
+
+ println!("Waiting for timeout");
+ sleep(Duration::from_secs(10));
+
+ let message_channel = service.io().channel();
+ message_channel.send(ClientIoMessage::ConsensusStep(1000));
+ sleep(Duration::from_secs(5));
+ }
}
diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs
index 355c7d580..7a3da2691 100644
--- a/ethcore/src/service.rs
+++ b/ethcore/src/service.rs
@@ -43,6 +43,8 @@ pub enum ClientIoMessage {
FeedStateChunk(H256, Bytes),
/// Feed a block chunk to the snapshot service
FeedBlockChunk(H256, Bytes),
+ /// Signal consensus step timeout.
+ ConsensusStep(u64),
}
/// Client service setup. Creates and registers client and network services with the IO subsystem.
@@ -143,6 +145,8 @@ struct ClientIoHandler {
const CLIENT_TICK_TIMER: TimerToken = 0;
const CLIENT_TICK_MS: u64 = 5000;
+/// Timer token representing the consensus step timeouts.
+pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 1;
impl IoHandler for ClientIoHandler {
fn initialize(&self, io: &IoContext) {
diff --git a/json/src/spec/tendermint.rs b/json/src/spec/tendermint.rs
index c3294810c..97c30fbb2 100644
--- a/json/src/spec/tendermint.rs
+++ b/json/src/spec/tendermint.rs
@@ -25,9 +25,6 @@ pub struct TendermintParams {
/// Gas limit divisor.
#[serde(rename="gasLimitBoundDivisor")]
pub gas_limit_bound_divisor: Uint,
- /// Block duration.
- #[serde(rename="durationLimit")]
- pub duration_limit: Uint,
/// Valid authorities
pub validators: Vec,
}
@@ -49,7 +46,6 @@ mod tests {
let s = r#"{
"params": {
"gasLimitBoundDivisor": "0x0400",
- "durationLimit": "0x0d",
"validators" : ["0xc6d9d2cd449a754c494264e1809c50e34d64562b"]
}
}"#;