initial timeouts

This commit is contained in:
keorn 2016-08-31 18:18:02 +02:00
parent d7499044e3
commit e475d0bf4c
5 changed files with 146 additions and 36 deletions

View File

@ -4,7 +4,6 @@
"Tendermint": { "Tendermint": {
"params": { "params": {
"gasLimitBoundDivisor": "0x0400", "gasLimitBoundDivisor": "0x0400",
"durationLimit": "0x0d",
"validators" : [ "validators" : [
"0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e", "0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e",
"0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1" "0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1"

View File

@ -49,10 +49,13 @@ impl ProposeCollect {
/// Vote on hash using the signed hash, true if vote counted. /// Vote on hash using the signed hash, true if vote counted.
pub fn vote(&self, voter: Address) -> bool { pub fn vote(&self, voter: Address) -> bool {
if self.votes.try_read().unwrap().contains(&voter) { return false; } match self.votes.try_read().unwrap().contains(&voter) || !self.voters.contains(&voter) {
if !self.voters.contains(&voter) { return false; } true => false,
false => {
self.votes.try_write().unwrap().insert(voter); self.votes.try_write().unwrap().insert(voter);
true true
},
}
} }
/// Some winner if voting threshold was reached. /// Some winner if voting threshold was reached.

View File

@ -17,7 +17,6 @@
//! Tendermint BFT consensus engine with round robin proof-of-authority. //! Tendermint BFT consensus engine with round robin proof-of-authority.
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::time::Duration;
use common::*; use common::*;
use account_provider::AccountProvider; use account_provider::AccountProvider;
use block::*; use block::*;
@ -25,28 +24,54 @@ use spec::CommonParams;
use engines::{Engine, EngineError, ProposeCollect}; use engines::{Engine, EngineError, ProposeCollect};
use evm::Schedule; use evm::Schedule;
use ethjson; use ethjson;
use io::{IoContext, IoHandler, TimerToken};
use service::{ClientIoMessage, ENGINE_TIMEOUT_TOKEN};
use time::get_time;
/// `Tendermint` params. /// `Tendermint` params.
#[derive(Debug)] #[derive(Debug)]
pub struct TendermintParams { pub struct TendermintParams {
/// Gas limit divisor. /// Gas limit divisor.
pub gas_limit_bound_divisor: U256, pub gas_limit_bound_divisor: U256,
/// Block duration.
pub duration_limit: u64,
/// List of validators. /// List of validators.
pub validators: Vec<Address>, pub validators: Vec<Address>,
/// Number of validators. /// Number of validators.
pub validator_n: usize, pub validator_n: usize,
/// Timeout durations for different steps. /// Timeout durations for different steps.
timeouts: Timeouts, timeouts: DefaultTimeouts,
/// Consensus round. /// Consensus round.
r: u64, r: u64,
/// Consensus step. /// Consensus step.
s: RwLock<Step>, s: RwLock<Step>,
/// Current step timeout in ms.
timeout: AtomicTimerToken,
/// Used to swith proposer. /// Used to swith proposer.
proposer_nonce: AtomicUsize, proposer_nonce: AtomicUsize,
} }
impl Default for TendermintParams {
fn default() -> Self {
let validators = vec!["0x7d577a597b2742b498cb5cf0c26cdcd726d39e6e".into(), "0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1".into()];
let val_n = validators.len();
let propose_timeout = 3000;
TendermintParams {
gas_limit_bound_divisor: 0x0400.into(),
validators: validators,
validator_n: val_n,
timeouts: DefaultTimeouts {
propose: propose_timeout,
prevote: 3000,
precommit: 3000,
commit: 3000
},
r: 0,
s: RwLock::new(Step::Propose),
timeout: AtomicUsize::new(propose_timeout),
proposer_nonce: AtomicUsize::new(0)
}
}
}
#[derive(Debug)] #[derive(Debug)]
enum Step { enum Step {
Propose, Propose,
@ -58,27 +83,62 @@ enum Step {
} }
#[derive(Debug)] #[derive(Debug)]
struct Timeouts { struct DefaultTimeouts {
propose: Duration, propose: TimerToken,
prevote: Duration, prevote: TimerToken,
precommit: Duration, precommit: TimerToken,
commit: Duration commit: TimerToken
} }
type Seal = Vec<Bytes>; type Seal = Vec<Bytes>;
type AtomicTimerToken = AtomicUsize;
impl IoHandler<ClientIoMessage> for Tendermint {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(ENGINE_TIMEOUT_TOKEN, self.our_params.timeout.load(AtomicOrdering::Relaxed) as u64).expect("Error registering engine timeout");
}
fn timeout(&self, io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
println!("Timeout: {:?}", get_time().sec);
io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer.");
match *self.our_params.s.try_read().unwrap() {
Step::Propose => self.to_propose(),
Step::Prevote(ref proposal) => self.to_precommit(proposal.hash.clone()),
Step::Precommit(_, _) => self.to_propose(),
Step::Commit(_) => self.to_propose(),
};
io.register_timer(ENGINE_TIMEOUT_TOKEN, 3000).expect("Failed to start new consensus timer.")
}
}
fn message(&self, io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) {
if let &ClientIoMessage::ConsensusStep(next_timeout) = net_message {
println!("Message: {:?}", get_time().sec);
io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer.");
io.register_timer(ENGINE_TIMEOUT_TOKEN, next_timeout).expect("Failed to start new consensus timer.")
}
}
}
impl From<ethjson::spec::TendermintParams> for TendermintParams { impl From<ethjson::spec::TendermintParams> for TendermintParams {
fn from(p: ethjson::spec::TendermintParams) -> Self { fn from(p: ethjson::spec::TendermintParams) -> Self {
let val: Vec<_> = p.validators.into_iter().map(Into::into).collect(); let val: Vec<_> = p.validators.into_iter().map(Into::into).collect();
let val_n = val.len(); let val_n = val.len();
let propose_timeout = 3;
TendermintParams { TendermintParams {
gas_limit_bound_divisor: p.gas_limit_bound_divisor.into(), gas_limit_bound_divisor: p.gas_limit_bound_divisor.into(),
duration_limit: p.duration_limit.into(),
validators: val, validators: val,
validator_n: val_n, validator_n: val_n,
timeouts: Timeouts { propose: Duration::from_secs(3), prevote: Duration::from_secs(3), precommit: Duration::from_secs(3), commit: Duration::from_secs(3) }, timeouts: DefaultTimeouts {
propose: propose_timeout,
prevote: 3,
precommit: 3,
commit: 3
},
r: 0, r: 0,
s: RwLock::new(Step::Propose), s: RwLock::new(Step::Propose),
timeout: AtomicUsize::new(propose_timeout),
proposer_nonce: AtomicUsize::new(0) proposer_nonce: AtomicUsize::new(0)
} }
} }
@ -120,6 +180,12 @@ impl Tendermint {
self.threshold()) self.threshold())
} }
fn to_propose(&self) {
self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed);
let mut guard = self.our_params.s.write();
*guard = Step::Propose;
}
fn propose_message(&self, message: UntrustedRlp) -> Result<Bytes, Error> { fn propose_message(&self, message: UntrustedRlp) -> Result<Bytes, Error> {
// Check if message is for correct step. // Check if message is for correct step.
match *self.our_params.s.try_read().unwrap() { match *self.our_params.s.try_read().unwrap() {
@ -127,11 +193,14 @@ impl Tendermint {
_ => try!(Err(EngineError::WrongStep)), _ => try!(Err(EngineError::WrongStep)),
} }
let proposal = try!(message.as_val()); let proposal = try!(message.as_val());
self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); self.to_prevote(proposal);
Ok(message.as_raw().to_vec())
}
fn to_prevote(&self, proposal: H256) {
let mut guard = self.our_params.s.write(); let mut guard = self.our_params.s.write();
// Proceed to the prevote step. // Proceed to the prevote step.
*guard = Step::Prevote(self.new_vote(proposal)); *guard = Step::Prevote(self.new_vote(proposal));
Ok(message.as_raw().to_vec())
} }
fn prevote_message(&self, sender: Address, message: UntrustedRlp) -> Result<Bytes, Error> { fn prevote_message(&self, sender: Address, message: UntrustedRlp) -> Result<Bytes, Error> {
@ -142,13 +211,8 @@ impl Tendermint {
if vote.hash == try!(message.as_val()) { if vote.hash == try!(message.as_val()) {
vote.vote(sender); vote.vote(sender);
// Move to next step is prevote is won. // Move to next step is prevote is won.
if vote.is_won() { if vote.is_won() { self.to_precommit(vote.hash); }
let mut guard = self.our_params.s.write();
*guard = Step::Precommit(self.new_vote(vote.hash), Vec::new());
Ok(message.as_raw().to_vec()) Ok(message.as_raw().to_vec())
} else {
Ok(message.as_raw().to_vec())
}
} else { } else {
try!(Err(EngineError::WrongVote)) try!(Err(EngineError::WrongVote))
} }
@ -157,6 +221,11 @@ impl Tendermint {
} }
} }
fn to_precommit(&self, proposal: H256) {
let mut guard = self.our_params.s.write();
*guard = Step::Precommit(self.new_vote(proposal), Vec::new());
}
fn precommit_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result<Bytes, Error> { fn precommit_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result<Bytes, Error> {
// Check if message is for correct step. // Check if message is for correct step.
match *self.our_params.s.try_write().unwrap() { match *self.our_params.s.try_write().unwrap() {
@ -165,13 +234,8 @@ impl Tendermint {
if vote.hash == try!(message.as_val()) { if vote.hash == try!(message.as_val()) {
if vote.vote(sender) { seal.push(encode(&signature).to_vec()); } if vote.vote(sender) { seal.push(encode(&signature).to_vec()); }
// Commit if precommit is won. // Commit if precommit is won.
if vote.is_won() { if vote.is_won() { self.to_commit(seal.clone()); }
let mut guard = self.our_params.s.write();
*guard = Step::Commit(seal.clone());
Ok(message.as_raw().to_vec()) Ok(message.as_raw().to_vec())
} else {
Ok(message.as_raw().to_vec())
}
} else { } else {
try!(Err(EngineError::WrongVote)) try!(Err(EngineError::WrongVote))
} }
@ -180,6 +244,11 @@ impl Tendermint {
} }
} }
fn to_commit(&self, seal: Seal) {
let mut guard = self.our_params.s.write();
*guard = Step::Commit(seal);
}
fn threshold(&self) -> usize { fn threshold(&self) -> usize {
self.our_params.validator_n*2/3 self.our_params.validator_n*2/3
} }
@ -294,16 +363,40 @@ impl Engine for Tendermint {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use common::*; use common::*;
use std::thread::sleep;
use std::time::{Duration, Instant};
use block::*; use block::*;
use tests::helpers::*; use tests::helpers::*;
use service::{ClientService, ClientIoMessage};
use devtools::RandomTempPath;
use client::ClientConfig;
use miner::Miner;
use account_provider::AccountProvider; use account_provider::AccountProvider;
use spec::Spec; use spec::Spec;
use engines::Engine; use engines::Engine;
use super::{Tendermint, TendermintParams};
/// Create a new test chain spec with `Tendermint` consensus engine. /// Create a new test chain spec with `Tendermint` consensus engine.
/// Account "0".sha3() and "1".sha3() are a validators. /// Account "0".sha3() and "1".sha3() are a validators.
fn new_test_tendermint() -> Spec { Spec::load(include_bytes!("../../res/tendermint.json")) } fn new_test_tendermint() -> Spec { Spec::load(include_bytes!("../../res/tendermint.json")) }
fn new_test_client_service() -> ClientService {
let temp_path = RandomTempPath::new();
let mut path = temp_path.as_path().to_owned();
path.push("pruning");
path.push("db");
let spec = get_test_spec();
let service = ClientService::start(
ClientConfig::default(),
&spec,
&path,
&path,
Arc::new(Miner::with_spec(&spec)),
);
service.unwrap()
}
fn propose_default(engine: &Arc<Engine>, proposer: Address) -> Result<Bytes, Error> { fn propose_default(engine: &Arc<Engine>, proposer: Address) -> Result<Bytes, Error> {
let mut s = RlpStream::new_list(3); let mut s = RlpStream::new_list(3);
let header = Header::default(); let header = Header::default();
@ -399,4 +492,19 @@ mod tests {
fn handle_message() { fn handle_message() {
false; false;
} }
#[test]
fn timeout_switching() {
let service = new_test_client_service();
let engine = new_test_tendermint().engine;
let tender = Tendermint::new(engine.params().clone(), TendermintParams::default(), BTreeMap::new());
service.register_io_handler(Arc::new(tender));
println!("Waiting for timeout");
sleep(Duration::from_secs(10));
let message_channel = service.io().channel();
message_channel.send(ClientIoMessage::ConsensusStep(1000));
sleep(Duration::from_secs(5));
}
} }

View File

@ -43,6 +43,8 @@ pub enum ClientIoMessage {
FeedStateChunk(H256, Bytes), FeedStateChunk(H256, Bytes),
/// Feed a block chunk to the snapshot service /// Feed a block chunk to the snapshot service
FeedBlockChunk(H256, Bytes), FeedBlockChunk(H256, Bytes),
/// Signal consensus step timeout.
ConsensusStep(u64),
} }
/// Client service setup. Creates and registers client and network services with the IO subsystem. /// Client service setup. Creates and registers client and network services with the IO subsystem.
@ -143,6 +145,8 @@ struct ClientIoHandler {
const CLIENT_TICK_TIMER: TimerToken = 0; const CLIENT_TICK_TIMER: TimerToken = 0;
const CLIENT_TICK_MS: u64 = 5000; const CLIENT_TICK_MS: u64 = 5000;
/// Timer token representing the consensus step timeouts.
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 1;
impl IoHandler<ClientIoMessage> for ClientIoHandler { impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<ClientIoMessage>) { fn initialize(&self, io: &IoContext<ClientIoMessage>) {

View File

@ -25,9 +25,6 @@ pub struct TendermintParams {
/// Gas limit divisor. /// Gas limit divisor.
#[serde(rename="gasLimitBoundDivisor")] #[serde(rename="gasLimitBoundDivisor")]
pub gas_limit_bound_divisor: Uint, pub gas_limit_bound_divisor: Uint,
/// Block duration.
#[serde(rename="durationLimit")]
pub duration_limit: Uint,
/// Valid authorities /// Valid authorities
pub validators: Vec<Address>, pub validators: Vec<Address>,
} }
@ -49,7 +46,6 @@ mod tests {
let s = r#"{ let s = r#"{
"params": { "params": {
"gasLimitBoundDivisor": "0x0400", "gasLimitBoundDivisor": "0x0400",
"durationLimit": "0x0d",
"validators" : ["0xc6d9d2cd449a754c494264e1809c50e34d64562b"] "validators" : ["0xc6d9d2cd449a754c494264e1809c50e34d64562b"]
} }
}"#; }"#;