move transition message to to_step

This commit is contained in:
keorn 2016-12-10 16:50:23 +01:00
parent e5f8044cad
commit 239ba61a99
2 changed files with 50 additions and 97 deletions

View File

@ -209,6 +209,9 @@ impl Tendermint {
/// Use via step_service to transition steps. /// Use via step_service to transition steps.
fn to_step(&self, step: Step) { fn to_step(&self, step: Step) {
if let Err(io_err) = self.step_service.send_message(step) {
warn!(target: "poa", "Could not proceed to step {}.", io_err)
}
*self.step.write() = step; *self.step.write() = step;
match step { match step {
Step::Propose => { Step::Propose => {
@ -360,9 +363,7 @@ impl Tendermint {
if let Some(step) = next_step { if let Some(step) = next_step {
trace!(target: "poa", "handle_valid_message: Transition triggered."); trace!(target: "poa", "handle_valid_message: Transition triggered.");
if let Err(io_err) = self.step_service.send_message(step) { self.to_step(step);
warn!(target: "poa", "Could not proceed to next step {}.", io_err)
}
} }
} }
} }
@ -557,9 +558,7 @@ impl Engine for Tendermint {
fn set_signer(&self, address: Address, password: String) { fn set_signer(&self, address: Address, password: String) {
*self.authority.write() = address; *self.authority.write() = address;
*self.password.write() = Some(password); *self.password.write() = Some(password);
if let Err(io_err) = self.step_service.send_message(Step::Propose) { self.to_step(Step::Propose);
warn!(target: "poa", "Could not reset the round {}.", io_err);
}
} }
fn stop(&self) { fn stop(&self) {
@ -605,6 +604,40 @@ impl Engine for Tendermint {
true true
} }
/// Equivalent to a timeout: to be used for tests.
fn step(&self) {
let next_step = match *self.step.read() {
Step::Propose => {
trace!(target: "poa", "timeout: Propose timeout.");
Step::Prevote
},
Step::Prevote if self.has_enough_any_votes() => {
trace!(target: "poa", "timeout: Prevote timeout.");
Step::Precommit
},
Step::Prevote => {
trace!(target: "poa", "timeout: Prevote timeout without enough votes.");
self.broadcast_old_messages();
Step::Prevote
},
Step::Precommit if self.has_enough_any_votes() => {
trace!(target: "poa", "timeout: Precommit timeout.");
self.increment_round(1);
Step::Propose
},
Step::Precommit => {
trace!(target: "poa", "timeout: Precommit timeout without enough votes.");
self.broadcast_old_messages();
Step::Precommit
},
Step::Commit => {
trace!(target: "poa", "timeout: Commit timeout.");
Step::Propose
},
};
self.to_step(next_step);
}
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) { fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {
trace!(target: "poa", "register_message_channel: Register the IoChannel."); trace!(target: "poa", "register_message_channel: Register the IoChannel.");
*self.message_channel.lock() = Some(message_channel); *self.message_channel.lock() = Some(message_channel);
@ -878,16 +911,16 @@ mod tests {
let prevote_future = vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h + 1, r, Step::Prevote, proposal); let prevote_future = vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h + 1, r, Step::Prevote, proposal);
engine.stop();
// Relays all valid present and future messages. // Relays all valid present and future messages.
assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(prevote_current))); assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(prevote_current)));
assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(precommit_current))); assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(precommit_current)));
assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(prevote_future))); assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(prevote_future)));
engine.stop();
} }
#[test] #[test]
#[ignore]
fn seal_submission() { fn seal_submission() {
::env_logger::init().unwrap();
let (spec, tap) = setup(); let (spec, tap) = setup();
let engine = spec.engine.clone(); let engine = spec.engine.clone();
let mut db_result = get_temp_state_db(); let mut db_result = get_temp_state_db();
@ -904,50 +937,10 @@ mod tests {
let (b, mut seal) = propose_default(&spec, v1.clone()); let (b, mut seal) = propose_default(&spec, v1.clone());
let proposal = Some(b.header().bare_hash()); let proposal = Some(b.header().bare_hash());
// Register IoHandler remembers messages.
let io_service = IoService::<ClientIoMessage>::start().unwrap();
let test_io = TestIo::new(); let test_io = TestIo::new();
io_service.register_handler(test_io.clone()).unwrap();
engine.register_message_channel(io_service.channel());
// Prevote.
vote(&engine, |mh| tap.sign(v1, None, mh).map(H520::from), h, r, Step::Prevote, proposal);
vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h, r, Step::Prevote, proposal);
vote(&engine, |mh| tap.sign(v1, None, mh).map(H520::from), h, r, Step::Precommit, proposal);
vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h, r, Step::Precommit, proposal);
// Wait a bit for async stuff.
engine.stop();
io_service.stop();
seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v0, v1);
println!("should {:?}, {:?}", proposal.unwrap(), seal);
println!("{:?}", *test_io.received.read());
assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal)));
}
#[test]
#[ignore]
fn skips_to_future_round() {
let (spec, tap) = setup();
let engine = spec.engine.clone();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(&mut db, &TrieFactory::new(TrieSpec::Secure)).unwrap();
let v0 = insert_and_register(&tap, &engine, "0");
let v1 = insert_and_register(&tap, &engine, "1");
let h = 1;
let r = 2;
// Propose
let (b, mut seal) = propose_default(&spec, v1.clone());
let proposal = Some(b.header().bare_hash());
// Register IoHandler remembers messages. // Register IoHandler remembers messages.
let io_service = IoService::<ClientIoMessage>::start().unwrap(); let io_service = IoService::<ClientIoMessage>::start().unwrap();
let test_io = TestIo::new();
io_service.register_handler(test_io.clone()).unwrap(); io_service.register_handler(test_io.clone()).unwrap();
engine.register_message_channel(io_service.channel()); engine.register_message_channel(io_service.channel());
@ -960,13 +953,11 @@ mod tests {
// Wait a bit for async stuff. // Wait a bit for async stuff.
::std::thread::sleep(::std::time::Duration::from_millis(100)); ::std::thread::sleep(::std::time::Duration::from_millis(100));
engine.stop();
io_service.stop(); seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v1, v0);
seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v0, v1);
//assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal.clone())));
//seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v1, v0);
println!("have {:?}", *test_io.received.read());
println!("should {:?}, {:?}", proposal.unwrap(), seal); println!("should {:?}, {:?}", proposal.unwrap(), seal);
println!("{:?}", *test_io.received.read());
assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal))); assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal)));
engine.stop();
} }
} }

View File

@ -17,9 +17,10 @@
//! Tendermint timeout handling. //! Tendermint timeout handling.
use std::sync::Weak; use std::sync::Weak;
use time::Duration;
use io::{IoContext, IoHandler, TimerToken}; use io::{IoContext, IoHandler, TimerToken};
use super::{Tendermint, Step}; use super::{Tendermint, Step};
use time::Duration; use engines::Engine;
pub struct TransitionHandler { pub struct TransitionHandler {
pub engine: Weak<Tendermint>, pub engine: Weak<Tendermint>,
@ -71,48 +72,10 @@ impl IoHandler<Step> for TransitionHandler {
} }
} }
fn timeout(&self, io: &IoContext<Step>, timer: TimerToken) { fn timeout(&self, _io: &IoContext<Step>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN { if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() { if let Some(engine) = self.engine.upgrade() {
let next_step = match *engine.step.read() { engine.step();
Step::Propose => {
trace!(target: "poa", "timeout: Propose timeout.");
set_timeout(io, engine.our_params.timeouts.prevote);
Some(Step::Prevote)
},
Step::Prevote if engine.has_enough_any_votes() => {
trace!(target: "poa", "timeout: Prevote timeout.");
set_timeout(io, engine.our_params.timeouts.precommit);
Some(Step::Precommit)
},
Step::Prevote => {
trace!(target: "poa", "timeout: Prevote timeout without enough votes.");
set_timeout(io, engine.our_params.timeouts.prevote);
engine.broadcast_old_messages();
None
},
Step::Precommit if engine.has_enough_any_votes() => {
trace!(target: "poa", "timeout: Precommit timeout.");
set_timeout(io, engine.our_params.timeouts.propose);
engine.increment_round(1);
Some(Step::Propose)
},
Step::Precommit => {
trace!(target: "poa", "timeout: Precommit timeout without enough votes.");
set_timeout(io, engine.our_params.timeouts.precommit);
engine.broadcast_old_messages();
None
},
Step::Commit => {
trace!(target: "poa", "timeout: Commit timeout.");
set_timeout(io, engine.our_params.timeouts.propose);
Some(Step::Propose)
},
};
if let Some(s) = next_step {
engine.to_step(s)
}
} }
} }
} }
@ -128,7 +91,6 @@ impl IoHandler<Step> for TransitionHandler {
Step::Precommit => set_timeout(io, engine.our_params.timeouts.precommit), Step::Precommit => set_timeout(io, engine.our_params.timeouts.precommit),
Step::Commit => set_timeout(io, engine.our_params.timeouts.commit), Step::Commit => set_timeout(io, engine.our_params.timeouts.commit),
}; };
engine.to_step(*next_step);
} }
} }
} }