add tendermint message types and deserialization

This commit is contained in:
keorn 2016-09-29 14:44:42 +01:00
parent fd6900bbb3
commit 6cbb859bd2
3 changed files with 206 additions and 84 deletions

View File

@ -0,0 +1,81 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Tendermint message handling.
use super::{Height, Round, BlockHash};
use rlp::{View, DecoderError, Decodable, Decoder, Encodable, RlpStream, Stream};
pub enum ConsensusMessage {
Prevote(Height, Round, BlockHash),
Precommit(Height, Round, BlockHash),
Commit(Height, BlockHash),
}
/// (height, step, ...)
impl Decodable for ConsensusMessage {
fn decode<D>(decoder: &D) -> Result<Self, DecoderError> where D: Decoder {
// Handle according to step.
let rlp = decoder.as_rlp();
if decoder.as_raw().len() != try!(rlp.payload_info()).total() {
return Err(DecoderError::RlpIsTooBig);
}
let height = try!(rlp.val_at(0));
Ok(match try!(rlp.val_at(1)) {
0u8 => ConsensusMessage::Prevote(
height,
try!(rlp.val_at(2)),
try!(rlp.val_at(3))
),
1 => ConsensusMessage::Precommit(
height,
try!(rlp.val_at(2)),
try!(rlp.val_at(3))
),
2 => ConsensusMessage::Commit(
height,
try!(rlp.val_at(2))),
_ => return Err(DecoderError::Custom("Unknown step.")),
})
}
}
impl Encodable for ConsensusMessage {
fn rlp_append(&self, s: &mut RlpStream) {
match *self {
ConsensusMessage::Prevote(h, r, hash) => {
s.begin_list(4);
s.append(&h);
s.append(&0u8);
s.append(&r);
s.append(&hash);
},
ConsensusMessage::Precommit(h, r, hash) => {
s.begin_list(4);
s.append(&h);
s.append(&1u8);
s.append(&r);
s.append(&hash);
},
ConsensusMessage::Commit(h, hash) => {
s.begin_list(3);
s.append(&h);
s.append(&2u8);
s.append(&hash);
},
}
}
}

View File

@ -16,8 +16,10 @@
//! Tendermint BFT consensus engine with round robin proof-of-authority.
mod message;
mod timeout;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Weak;
use common::*;
use rlp::{UntrustedRlp, View, encode};
use ethkey::{recover, public_to_address};
@ -27,8 +29,9 @@ use spec::CommonParams;
use engines::{Engine, EngineError, ProposeCollect};
use evm::Schedule;
use ethjson;
use io::{IoContext, IoHandler, TimerToken, IoService};
use time::get_time;
use io::IoService;
use self::message::ConsensusMessage;
use self::timeout::{TimerHandler, NextStep, DefaultTimeouts};
/// `Tendermint` params.
#[derive(Debug, Clone)]
@ -63,9 +66,16 @@ enum Step {
/// Precommit step storing the precommit vote and accumulating seal.
Precommit(ProposeCollect, Seal),
/// Commit step storing a complete valid seal.
Commit(H256, Seal)
Commit(BlockHash, Seal)
}
pub type Height = usize;
pub type Round = usize;
pub type BlockHash = H256;
pub type AtomicMs = AtomicUsize;
type Seal = Vec<Bytes>;
impl From<ethjson::spec::TendermintParams> for TendermintParams {
fn from(p: ethjson::spec::TendermintParams) -> Self {
let val: Vec<_> = p.validators.into_iter().map(Into::into).collect();
@ -79,15 +89,12 @@ impl From<ethjson::spec::TendermintParams> for TendermintParams {
}
}
#[derive(Clone)]
struct StepMessage;
/// Engine using `Tendermint` consensus algorithm, suitable for EVM chain.
pub struct Tendermint {
params: CommonParams,
our_params: TendermintParams,
builtins: BTreeMap<Address, Builtin>,
timeout_service: IoService<StepMessage>,
timeout_service: IoService<NextStep>,
/// Consensus round.
r: AtomicUsize,
/// Consensus step.
@ -98,25 +105,21 @@ pub struct Tendermint {
proposer_nonce: AtomicUsize,
}
struct TimerHandler {
engine: Weak<Tendermint>,
}
impl Tendermint {
/// Create a new instance of Tendermint engine
pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap<Address, Builtin>) -> Arc<Self> {
let engine = Arc::new(
Tendermint {
params: params,
timeout: AtomicUsize::new(our_params.timeouts.propose),
timeout: AtomicUsize::new(our_params.timeouts.propose()),
our_params: our_params,
builtins: builtins,
timeout_service: IoService::<StepMessage>::start().expect("Error creating engine timeout service"),
timeout_service: IoService::<NextStep>::start().expect("Error creating engine timeout service"),
r: AtomicUsize::new(0),
s: RwLock::new(Step::Propose),
proposer_nonce: AtomicUsize::new(0)
});
let handler = TimerHandler { engine: Arc::downgrade(&engine) };
let handler = TimerHandler::new(Arc::downgrade(&engine));
engine.timeout_service.register_handler(Arc::new(handler)).expect("Error creating engine timeout service");
engine
}
@ -134,7 +137,7 @@ impl Tendermint {
self.our_params.validators.contains(address)
}
fn new_vote(&self, proposal: H256) -> ProposeCollect {
fn new_vote(&self, proposal: BlockHash) -> ProposeCollect {
ProposeCollect::new(proposal,
self.our_params.validators.iter().cloned().collect(),
self.threshold())
@ -163,7 +166,7 @@ impl Tendermint {
Ok(message.as_raw().to_vec())
}
fn to_prevote(&self, proposal: H256) {
fn to_prevote(&self, proposal: BlockHash) {
trace!(target: "tendermint", "step: entering prevote");
println!("step: entering prevote");
// Proceed to the prevote step.
@ -195,7 +198,7 @@ impl Tendermint {
Ok(message.as_raw().to_vec())
}
fn to_precommit(&self, proposal: H256) {
fn to_precommit(&self, proposal: BlockHash) {
trace!(target: "tendermint", "step: entering precommit");
println!("step: entering precommit");
self.to_step(Step::Precommit(self.new_vote(proposal), Vec::new()));
@ -349,72 +352,6 @@ impl Engine for Tendermint {
}
}
/// Base timeout of each step in ms.
#[derive(Debug, Clone)]
struct DefaultTimeouts {
propose: Ms,
prevote: Ms,
precommit: Ms,
commit: Ms
}
impl Default for DefaultTimeouts {
fn default() -> Self {
DefaultTimeouts {
propose: 1000,
prevote: 1000,
precommit: 1000,
commit: 1000
}
}
}
type Ms = usize;
type Seal = Vec<Bytes>;
type AtomicMs = AtomicUsize;
/// Timer token representing the consensus step timeouts.
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0;
impl IoHandler<StepMessage> for TimerHandler {
fn initialize(&self, io: &IoContext<StepMessage>) {
if let Some(engine) = self.engine.upgrade() {
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout");
}
}
fn timeout(&self, io: &IoContext<StepMessage>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
println!("Timeout: {:?}", get_time());
// Can you release entering a clause?
let next_step = match *engine.s.try_read().unwrap() {
Step::Propose => Step::Propose,
Step::Prevote(_) => Step::Propose,
Step::Precommit(_, _) => Step::Propose,
Step::Commit(_, _) => {
engine.r.fetch_add(1, AtomicOrdering::Relaxed);
Step::Propose
},
};
match next_step {
Step::Propose => engine.to_propose(),
_ => (),
}
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
}
}
}
fn message(&self, io: &IoContext<StepMessage>, _net_message: &StepMessage) {
if let Some(engine) = self.engine.upgrade() {
println!("Message: {:?}", get_time().sec);
io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer.");
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
}
}
}
#[cfg(test)]
mod tests {
use common::*;

View File

@ -0,0 +1,104 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Tendermint BFT consensus engine with round robin proof-of-authority.
use std::sync::atomic::{Ordering as AtomicOrdering};
use std::sync::Weak;
use io::{IoContext, IoHandler, TimerToken};
use super::{Tendermint, Step};
use time::get_time;
pub struct TimerHandler {
engine: Weak<Tendermint>,
}
impl TimerHandler {
pub fn new(engine: Weak<Tendermint>) -> Self {
TimerHandler { engine: engine }
}
}
/// Base timeout of each step in ms.
#[derive(Debug, Clone)]
pub struct DefaultTimeouts {
propose: Ms,
prevote: Ms,
precommit: Ms,
commit: Ms
}
impl DefaultTimeouts {
pub fn propose(&self) -> usize { self.propose }
}
impl Default for DefaultTimeouts {
fn default() -> Self {
DefaultTimeouts {
propose: 1000,
prevote: 1000,
precommit: 1000,
commit: 1000
}
}
}
type Ms = usize;
#[derive(Clone)]
pub struct NextStep;
/// Timer token representing the consensus step timeouts.
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0;
impl IoHandler<NextStep> for TimerHandler {
fn initialize(&self, io: &IoContext<NextStep>) {
if let Some(engine) = self.engine.upgrade() {
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout");
}
}
fn timeout(&self, io: &IoContext<NextStep>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
println!("Timeout: {:?}", get_time());
// Can you release entering a clause?
let next_step = match *engine.s.try_read().unwrap() {
Step::Propose => Step::Propose,
Step::Prevote(_) => Step::Propose,
Step::Precommit(_, _) => Step::Propose,
Step::Commit(_, _) => {
engine.r.fetch_add(1, AtomicOrdering::Relaxed);
Step::Propose
},
};
match next_step {
Step::Propose => engine.to_propose(),
_ => (),
}
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
}
}
}
fn message(&self, io: &IoContext<NextStep>, _net_message: &NextStep) {
if let Some(engine) = self.engine.upgrade() {
println!("Message: {:?}", get_time().sec);
io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer.");
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.")
}
}
}