transition rules

This commit is contained in:
keorn 2016-11-16 18:01:09 +00:00
parent 51bbad66d0
commit 802d5c669d
4 changed files with 72 additions and 44 deletions

View File

@ -20,7 +20,7 @@ use util::*;
use super::{Height, Round, BlockHash, Step};
use rlp::{View, DecoderError, Decodable, Decoder, Encodable, RlpStream, Stream};
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ConsensusMessage {
pub signature: H520,
pub height: Height,
@ -30,12 +30,16 @@ pub struct ConsensusMessage {
}
impl ConsensusMessage {
pub fn is_height(&self, height: Height) -> bool {
self.height == height
}
pub fn is_round(&self, height: Height, round: Round) -> bool {
self.height == height && self.round == round
}
pub fn is_step(&self, height: Height, round: Round, step: &Step) -> bool {
self.height == height && self.round == round && &self.step == step
pub fn is_step(&self, height: Height, round: Round, step: Step) -> bool {
self.height == height && self.round == round && self.step == step
}
pub fn is_aligned(&self, height: Height, round: Round, block_hash: Option<H256>) -> bool {
@ -79,7 +83,7 @@ impl Decodable for Step {
match try!(decoder.as_rlp().as_val()) {
0u8 => Ok(Step::Prevote),
1 => Ok(Step::Precommit),
_ => Err(DecoderError::Custom("Unknown step.")),
_ => Err(DecoderError::Custom("Invalid step.")),
}
}
}

View File

@ -41,11 +41,11 @@ use evm::Schedule;
use io::{IoService, IoChannel};
use service::ClientIoMessage;
use self::message::ConsensusMessage;
use self::timeout::{TransitionHandler, NextStep};
use self::timeout::TransitionHandler;
use self::params::TendermintParams;
use self::vote_collector::VoteCollector;
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Step {
Propose,
Prevote,
@ -65,7 +65,7 @@ pub struct Tendermint {
params: CommonParams,
our_params: TendermintParams,
builtins: BTreeMap<Address, Builtin>,
step_service: IoService<NextStep>,
step_service: IoService<Step>,
/// Address to be used as authority.
authority: RwLock<Address>,
/// Blockchain height.
@ -92,7 +92,7 @@ impl Tendermint {
params: params,
our_params: our_params,
builtins: builtins,
step_service: try!(IoService::<NextStep>::start()),
step_service: try!(IoService::<Step>::start()),
authority: RwLock::new(Address::default()),
height: AtomicUsize::new(0),
round: AtomicUsize::new(0),
@ -125,6 +125,14 @@ impl Tendermint {
}
}
fn to_step(&self, step: Step) {
*self.step.write() = step;
match step {
Step::Propose => self.update_sealing(),
_ => {},
}
}
fn nonce_proposer(&self, proposer_nonce: usize) -> &Address {
let ref p = self.our_params;
p.authorities.get(proposer_nonce % p.authority_n).unwrap()
@ -148,11 +156,19 @@ impl Tendermint {
}
fn is_current(&self, message: &ConsensusMessage) -> bool {
message.is_step(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), &self.step.read())
message.is_height(self.height.load(AtomicOrdering::SeqCst))
}
fn has_enough_any_votes(&self) -> bool {
self.votes.count_step_votes(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), &self.step.read()) > self.threshold()
self.votes.count_step_votes(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), *self.step.read()) > self.threshold()
}
fn has_enough_step_votes(&self, message: &ConsensusMessage) -> bool {
self.votes.count_step_votes(message.height, message.round, message.step) > self.threshold()
}
fn has_enough_aligned_votes(&self, message: &ConsensusMessage) -> bool {
self.votes.aligned_signatures(&message).len() > self.threshold()
}
}
@ -272,19 +288,38 @@ impl Engine for Tendermint {
try!(Err(BlockError::InvalidSeal));
}
// Check if the message affects the current step.
self.votes.vote(message.clone(), sender);
// Check if the message should be handled right now.
if self.is_current(&message) {
match *self.step.read() {
Step::Prevote => {
let votes = self.votes.aligned_signatures(&message);
if votes.len() > self.threshold() {
let next_step = match *self.step.read() {
Step::Precommit if self.has_enough_aligned_votes(&message) => {
if message.block_hash.is_none() {
self.round.fetch_add(1, AtomicOrdering::SeqCst);
Some(Step::Propose)
} else {
Some(Step::Commit)
}
},
Step::Precommit => {},
_ => {},
Step::Precommit if self.has_enough_step_votes(&message) => {
self.round.store(message.round, AtomicOrdering::SeqCst);
Some(Step::Precommit)
},
Step::Prevote if self.has_enough_aligned_votes(&message) => Some(Step::Precommit),
Step::Prevote if self.has_enough_step_votes(&message) => {
self.round.store(message.round, AtomicOrdering::SeqCst);
Some(Step::Prevote)
},
_ => None,
};
if let Some(step) = next_step {
if let Err(io_err) = self.step_service.send_message(step) {
warn!(target: "poa", "Could not proceed to next step {}.", io_err)
}
}
self.votes.vote(message, sender);
}
Err(BlockError::InvalidSeal.into())
}

View File

@ -57,25 +57,22 @@ impl Default for TendermintTimeouts {
}
}
#[derive(Clone)]
pub struct NextStep(Step);
/// Timer token representing the consensus step timeouts.
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
fn set_timeout(io: &IoContext<NextStep>, timeout: Duration) {
fn set_timeout(io: &IoContext<Step>, timeout: Duration) {
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, timeout.num_milliseconds() as u64)
.unwrap_or_else(|e| warn!(target: "poa", "Failed to set consensus step timeout: {}.", e))
}
impl IoHandler<NextStep> for TransitionHandler {
fn initialize(&self, io: &IoContext<NextStep>) {
impl IoHandler<Step> for TransitionHandler {
fn initialize(&self, io: &IoContext<Step>) {
if let Some(engine) = self.engine.upgrade() {
set_timeout(io, engine.our_params.timeouts.propose)
}
}
fn timeout(&self, io: &IoContext<NextStep>, timer: TimerToken) {
fn timeout(&self, io: &IoContext<Step>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
let next_step = match *engine.step.read() {
@ -102,32 +99,24 @@ impl IoHandler<NextStep> for TransitionHandler {
};
if let Some(step) = next_step {
*engine.step.write() = step.clone();
if step == Step::Propose {
engine.update_sealing();
}
engine.to_step(step)
}
}
}
}
fn message(&self, io: &IoContext<NextStep>, message: &NextStep) {
fn message(&self, io: &IoContext<Step>, next_step: &Step) {
if let Some(engine) = self.engine.upgrade() {
match io.clear_timer(ENGINE_TIMEOUT_TOKEN) {
Ok(_) => {},
Err(io_err) => warn!(target: "poa", "Could not remove consensus timer {}.", io_err),
};
let NextStep(next_step) = message.clone();
*engine.step.write() = next_step.clone();
match next_step {
Step::Propose => {
engine.update_sealing();
set_timeout(io, engine.our_params.timeouts.propose)
},
if let Err(io_err) = io.clear_timer(ENGINE_TIMEOUT_TOKEN) {
warn!(target: "poa", "Could not remove consensus timer {}.", io_err)
}
match *next_step {
Step::Propose => set_timeout(io, engine.our_params.timeouts.propose),
Step::Prevote => set_timeout(io, engine.our_params.timeouts.prevote),
Step::Precommit => set_timeout(io, engine.our_params.timeouts.precommit),
Step::Commit => set_timeout(io, engine.our_params.timeouts.commit),
};
engine.to_step(*next_step);
}
}
}

View File

@ -50,7 +50,7 @@ impl VoteCollector {
self.seal_signatures(message.height, message.round, message.block_hash)
}
pub fn count_step_votes(&self, height: Height, round: Round, step: &Step) -> usize {
pub fn count_step_votes(&self, height: Height, round: Round, step: Step) -> usize {
self.votes
.read()
.keys()