diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index ae6e3129a..7da2a9566 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -209,6 +209,9 @@ impl Tendermint { /// Use via step_service to transition steps. fn to_step(&self, step: Step) { + if let Err(io_err) = self.step_service.send_message(step) { + warn!(target: "poa", "Could not proceed to step {}.", io_err) + } *self.step.write() = step; match step { Step::Propose => { @@ -360,9 +363,7 @@ impl Tendermint { if let Some(step) = next_step { trace!(target: "poa", "handle_valid_message: Transition triggered."); - if let Err(io_err) = self.step_service.send_message(step) { - warn!(target: "poa", "Could not proceed to next step {}.", io_err) - } + self.to_step(step); } } } @@ -557,9 +558,7 @@ impl Engine for Tendermint { fn set_signer(&self, address: Address, password: String) { *self.authority.write() = address; *self.password.write() = Some(password); - if let Err(io_err) = self.step_service.send_message(Step::Propose) { - warn!(target: "poa", "Could not reset the round {}.", io_err); - } + self.to_step(Step::Propose); } fn stop(&self) { @@ -605,6 +604,40 @@ impl Engine for Tendermint { true } + /// Equivalent to a timeout: to be used for tests. + fn step(&self) { + let next_step = match *self.step.read() { + Step::Propose => { + trace!(target: "poa", "timeout: Propose timeout."); + Step::Prevote + }, + Step::Prevote if self.has_enough_any_votes() => { + trace!(target: "poa", "timeout: Prevote timeout."); + Step::Precommit + }, + Step::Prevote => { + trace!(target: "poa", "timeout: Prevote timeout without enough votes."); + self.broadcast_old_messages(); + Step::Prevote + }, + Step::Precommit if self.has_enough_any_votes() => { + trace!(target: "poa", "timeout: Precommit timeout."); + self.increment_round(1); + Step::Propose + }, + Step::Precommit => { + trace!(target: "poa", "timeout: Precommit timeout without enough votes."); + self.broadcast_old_messages(); + Step::Precommit + }, + Step::Commit => { + trace!(target: "poa", "timeout: Commit timeout."); + Step::Propose + }, + }; + self.to_step(next_step); + } + fn register_message_channel(&self, message_channel: IoChannel) { trace!(target: "poa", "register_message_channel: Register the IoChannel."); *self.message_channel.lock() = Some(message_channel); @@ -878,16 +911,16 @@ mod tests { let prevote_future = vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h + 1, r, Step::Prevote, proposal); - engine.stop(); // Relays all valid present and future messages. assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(prevote_current))); assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(precommit_current))); assert!(test_io.received.read().contains(&ClientIoMessage::BroadcastMessage(prevote_future))); + engine.stop(); } #[test] - #[ignore] fn seal_submission() { + ::env_logger::init().unwrap(); let (spec, tap) = setup(); let engine = spec.engine.clone(); let mut db_result = get_temp_state_db(); @@ -903,51 +936,11 @@ mod tests { // Propose let (b, mut seal) = propose_default(&spec, v1.clone()); let proposal = Some(b.header().bare_hash()); - - // Register IoHandler remembers messages. - let io_service = IoService::::start().unwrap(); - let test_io = TestIo::new(); - io_service.register_handler(test_io.clone()).unwrap(); - engine.register_message_channel(io_service.channel()); - - // Prevote. - vote(&engine, |mh| tap.sign(v1, None, mh).map(H520::from), h, r, Step::Prevote, proposal); - - vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h, r, Step::Prevote, proposal); - vote(&engine, |mh| tap.sign(v1, None, mh).map(H520::from), h, r, Step::Precommit, proposal); - vote(&engine, |mh| tap.sign(v0, None, mh).map(H520::from), h, r, Step::Precommit, proposal); - - // Wait a bit for async stuff. - engine.stop(); - io_service.stop(); - seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v0, v1); - println!("should {:?}, {:?}", proposal.unwrap(), seal); - println!("{:?}", *test_io.received.read()); - assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal))); - } - - #[test] - #[ignore] - fn skips_to_future_round() { - let (spec, tap) = setup(); - let engine = spec.engine.clone(); - let mut db_result = get_temp_state_db(); - let mut db = db_result.take(); - spec.ensure_db_good(&mut db, &TrieFactory::new(TrieSpec::Secure)).unwrap(); - let v0 = insert_and_register(&tap, &engine, "0"); - let v1 = insert_and_register(&tap, &engine, "1"); - - let h = 1; - let r = 2; - - // Propose - let (b, mut seal) = propose_default(&spec, v1.clone()); - let proposal = Some(b.header().bare_hash()); + let test_io = TestIo::new(); // Register IoHandler remembers messages. let io_service = IoService::::start().unwrap(); - let test_io = TestIo::new(); io_service.register_handler(test_io.clone()).unwrap(); engine.register_message_channel(io_service.channel()); @@ -960,13 +953,11 @@ mod tests { // Wait a bit for async stuff. ::std::thread::sleep(::std::time::Duration::from_millis(100)); - engine.stop(); - io_service.stop(); - seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v0, v1); - //assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal.clone()))); - //seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v1, v0); - println!("have {:?}", *test_io.received.read()); + + seal[2] = precommit_signatures(&tap, h, r, Some(b.header().bare_hash()), v1, v0); println!("should {:?}, {:?}", proposal.unwrap(), seal); + println!("{:?}", *test_io.received.read()); assert!(test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal))); + engine.stop(); } } diff --git a/ethcore/src/engines/tendermint/transition.rs b/ethcore/src/engines/tendermint/transition.rs index 4c54714a5..5e50fe6e2 100644 --- a/ethcore/src/engines/tendermint/transition.rs +++ b/ethcore/src/engines/tendermint/transition.rs @@ -17,9 +17,10 @@ //! Tendermint timeout handling. use std::sync::Weak; +use time::Duration; use io::{IoContext, IoHandler, TimerToken}; use super::{Tendermint, Step}; -use time::Duration; +use engines::Engine; pub struct TransitionHandler { pub engine: Weak, @@ -71,48 +72,10 @@ impl IoHandler for TransitionHandler { } } - fn timeout(&self, io: &IoContext, timer: TimerToken) { + fn timeout(&self, _io: &IoContext, timer: TimerToken) { if timer == ENGINE_TIMEOUT_TOKEN { if let Some(engine) = self.engine.upgrade() { - let next_step = match *engine.step.read() { - Step::Propose => { - trace!(target: "poa", "timeout: Propose timeout."); - set_timeout(io, engine.our_params.timeouts.prevote); - Some(Step::Prevote) - }, - Step::Prevote if engine.has_enough_any_votes() => { - trace!(target: "poa", "timeout: Prevote timeout."); - set_timeout(io, engine.our_params.timeouts.precommit); - Some(Step::Precommit) - }, - Step::Prevote => { - trace!(target: "poa", "timeout: Prevote timeout without enough votes."); - set_timeout(io, engine.our_params.timeouts.prevote); - engine.broadcast_old_messages(); - None - }, - Step::Precommit if engine.has_enough_any_votes() => { - trace!(target: "poa", "timeout: Precommit timeout."); - set_timeout(io, engine.our_params.timeouts.propose); - engine.increment_round(1); - Some(Step::Propose) - }, - Step::Precommit => { - trace!(target: "poa", "timeout: Precommit timeout without enough votes."); - set_timeout(io, engine.our_params.timeouts.precommit); - engine.broadcast_old_messages(); - None - }, - Step::Commit => { - trace!(target: "poa", "timeout: Commit timeout."); - set_timeout(io, engine.our_params.timeouts.propose); - Some(Step::Propose) - }, - }; - - if let Some(s) = next_step { - engine.to_step(s) - } + engine.step(); } } } @@ -128,7 +91,6 @@ impl IoHandler for TransitionHandler { Step::Precommit => set_timeout(io, engine.our_params.timeouts.precommit), Step::Commit => set_timeout(io, engine.our_params.timeouts.commit), }; - engine.to_step(*next_step); } } }