step transition messaging

This commit is contained in:
keorn 2016-11-16 12:43:21 +00:00
parent 8ac989cbeb
commit 2fa34fd6a8
3 changed files with 39 additions and 27 deletions

View File

@ -38,13 +38,14 @@ use engines::{Engine, EngineError, ProposeCollect};
use blockchain::extras::BlockDetails; use blockchain::extras::BlockDetails;
use views::HeaderView; use views::HeaderView;
use evm::Schedule; use evm::Schedule;
use io::IoService; use io::{IoService, IoChannel};
use service::ClientIoMessage;
use self::message::ConsensusMessage; use self::message::ConsensusMessage;
use self::timeout::{TimerHandler, NextStep}; use self::timeout::{TimerHandler, NextStep};
use self::params::TendermintParams; use self::params::TendermintParams;
use self::vote_collector::VoteCollector; use self::vote_collector::VoteCollector;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum Step { pub enum Step {
Propose, Propose,
Prevote, Prevote,
@ -64,7 +65,7 @@ pub struct Tendermint {
params: CommonParams, params: CommonParams,
our_params: TendermintParams, our_params: TendermintParams,
builtins: BTreeMap<Address, Builtin>, builtins: BTreeMap<Address, Builtin>,
timeout_service: IoService<NextStep>, step_service: IoService<NextStep>,
/// Address to be used as authority. /// Address to be used as authority.
authority: RwLock<Address>, authority: RwLock<Address>,
/// Blockchain height. /// Blockchain height.
@ -77,6 +78,8 @@ pub struct Tendermint {
proposer_nonce: AtomicUsize, proposer_nonce: AtomicUsize,
/// Vote accumulator. /// Vote accumulator.
votes: VoteCollector, votes: VoteCollector,
/// Proposed block held until seal is gathered.
proposed_block: Mutex<Option<ExecutedBlock>>,
/// Channel for updating the sealing. /// Channel for updating the sealing.
message_channel: Mutex<Option<IoChannel<ClientIoMessage>>> message_channel: Mutex<Option<IoChannel<ClientIoMessage>>>
} }
@ -89,20 +92,30 @@ impl Tendermint {
params: params, params: params,
our_params: our_params, our_params: our_params,
builtins: builtins, builtins: builtins,
timeout_service: try!(IoService::<NextStep>::start()), step_service: try!(IoService::<NextStep>::start()),
authority: RwLock::new(Address::default()), authority: RwLock::new(Address::default()),
height: AtomicUsize::new(0), height: AtomicUsize::new(0),
round: AtomicUsize::new(0), round: AtomicUsize::new(0),
step: RwLock::new(Step::Propose), step: RwLock::new(Step::Propose),
proposer_nonce: AtomicUsize::new(0), proposer_nonce: AtomicUsize::new(0),
votes: VoteCollector::new(), votes: VoteCollector::new(),
proposed_block: Mutex::new(None),
message_channel: Mutex::new(None) message_channel: Mutex::new(None)
}); });
let handler = TimerHandler { engine: Arc::downgrade(&engine) }; let handler = TimerHandler { engine: Arc::downgrade(&engine) };
try!(engine.timeout_service.register_handler(Arc::new(handler))); try!(engine.step_service.register_handler(Arc::new(handler)));
Ok(engine) Ok(engine)
} }
fn update_sealing(&self) {
if let Some(ref channel) = *self.message_channel.lock() {
match channel.send(ClientIoMessage::UpdateSealing) {
Ok(_) => trace!(target: "poa", "timeout: UpdateSealing message sent."),
Err(err) => warn!(target: "poa", "timeout: Could not send a sealing message {}.", err),
}
}
}
fn nonce_proposer(&self, proposer_nonce: usize) -> &Address { fn nonce_proposer(&self, proposer_nonce: usize) -> &Address {
let ref p = self.our_params; let ref p = self.our_params;
p.authorities.get(proposer_nonce % p.authority_n).unwrap() p.authorities.get(proposer_nonce % p.authority_n).unwrap()
@ -191,7 +204,6 @@ impl Engine for Tendermint {
} }
/// Set the correct round in the seal. /// Set the correct round in the seal.
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, block: &mut ExecutedBlock) { fn on_close_block(&self, block: &mut ExecutedBlock) {
let round = self.round.load(AtomicOrdering::SeqCst); let round = self.round.load(AtomicOrdering::SeqCst);
block.fields_mut().header.set_seal(vec![::rlp::encode(&round).to_vec(), Vec::new(), Vec::new()]); block.fields_mut().header.set_seal(vec![::rlp::encode(&round).to_vec(), Vec::new(), Vec::new()]);

View File

@ -16,9 +16,10 @@
//! Tendermint timeout handling. //! Tendermint timeout handling.
use util::Mutex;
use std::sync::atomic::{Ordering as AtomicOrdering}; use std::sync::atomic::{Ordering as AtomicOrdering};
use std::sync::Weak; use std::sync::Weak;
use io::{IoContext, IoHandler, TimerToken}; use io::{IoContext, IoHandler, TimerToken, IoChannel};
use super::{Tendermint, Step}; use super::{Tendermint, Step};
use time::{get_time, Duration}; use time::{get_time, Duration};
use service::ClientIoMessage; use service::ClientIoMessage;
@ -59,7 +60,7 @@ impl Default for TendermintTimeouts {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct NextStep; pub struct NextStep(Step);
/// Timer token representing the consensus step timeouts. /// Timer token representing the consensus step timeouts.
pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
@ -69,15 +70,6 @@ fn set_timeout(io: &IoContext<NextStep>, timeout: Duration) {
.unwrap_or_else(|e| warn!(target: "poa", "Failed to set consensus step timeout: {}.", e)) .unwrap_or_else(|e| warn!(target: "poa", "Failed to set consensus step timeout: {}.", e))
} }
fn update_sealing(io_channel: Mutex<Option<IoChannel<ClientIoMessage>>>) {
if let Some(ref channel) = *io_channel.lock() {
match channel.send(ClientIoMessage::UpdateSealing) {
Ok(_) => trace!(target: "poa", "timeout: UpdateSealing message sent for round {}.", engine.round.load(AtomicOrdering::SeqCst)),
Err(err) => trace!(target: "poa", "timeout: Could not send a sealing message {} for round {}.", err, engine.round.load(AtomicOrdering::SeqCst)),
}
}
}
impl IoHandler<NextStep> for TimerHandler { impl IoHandler<NextStep> for TimerHandler {
fn initialize(&self, io: &IoContext<NextStep>) { fn initialize(&self, io: &IoContext<NextStep>) {
if let Some(engine) = self.engine.upgrade() { if let Some(engine) = self.engine.upgrade() {
@ -112,18 +104,29 @@ impl IoHandler<NextStep> for TimerHandler {
}; };
if let Some(step) = next_step { if let Some(step) = next_step {
*engine.step.write() = step *engine.step.write() = step;
if step == Step::Propose {
engine.update_sealing();
}
} }
} }
} }
} }
fn message(&self, io: &IoContext<NextStep>, _net_message: &NextStep) { fn message(&self, io: &IoContext<NextStep>, message: &NextStep) {
if let Some(engine) = self.engine.upgrade() { if let Some(engine) = self.engine.upgrade() {
println!("Message: {:?}", get_time().sec); io.clear_timer(ENGINE_TIMEOUT_TOKEN);
io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); let NextStep(next_step) = *message;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") *engine.step.write() = next_step;
match next_step {
Step::Propose => {
engine.update_sealing();
set_timeout(io, engine.our_params.timeouts.propose)
},
Step::Prevote => set_timeout(io, engine.our_params.timeouts.prevote),
Step::Precommit => set_timeout(io, engine.our_params.timeouts.precommit),
Step::Commit => set_timeout(io, engine.our_params.timeouts.commit),
};
} }
} }
} }

View File

@ -39,8 +39,5 @@ pub use self::engine::Engine;
pub use self::state::State; pub use self::state::State;
pub use self::ethash::{Ethash, EthashParams}; pub use self::ethash::{Ethash, EthashParams};
pub use self::basic_authority::{BasicAuthority, BasicAuthorityParams}; pub use self::basic_authority::{BasicAuthority, BasicAuthorityParams};
<<<<<<< HEAD
pub use self::tendermint::{Tendermint, TendermintParams};
=======
pub use self::authority_round::{AuthorityRound, AuthorityRoundParams}; pub use self::authority_round::{AuthorityRound, AuthorityRoundParams};
>>>>>>> parity/master pub use self::tendermint::{Tendermint, TendermintParams};