From f7a01b87b5963ba54cbfdae67c1726f93b1ef05b Mon Sep 17 00:00:00 2001 From: keorn Date: Sun, 4 Dec 2016 19:43:24 +0000 Subject: [PATCH] better gossip, better proposal collection --- ethcore/src/engines/tendermint/mod.rs | 135 ++++++++++-------- ethcore/src/engines/tendermint/transition.rs | 2 +- .../src/engines/tendermint/vote_collector.rs | 22 ++- 3 files changed, 84 insertions(+), 75 deletions(-) diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index bc29f78dd..5244861cf 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -154,6 +154,7 @@ impl Tendermint { // TODO: memoize the rlp for consecutive broadcasts let message = ConsensusMessage::new(signature, h, r, *s, block_hash); self.votes.vote(message.clone(), *authority); + debug!(target: "poa", "Generated a message for height {:?}.", message); self.handle_valid_message(&message); Some(message_rlp) @@ -176,13 +177,20 @@ impl Tendermint { } fn broadcast_old_messages(&self) { - if let Some(ref lc) = *self.lock_change.read() { - for m in self.votes.get_older_than(lc).into_iter() { - self.broadcast_message(m); - } + for m in self.votes.get_up_to(self.height.load(AtomicOrdering::SeqCst)).into_iter() { + self.broadcast_message(m); } } + fn to_height(&self, height: Height) { + debug!(target: "poa", "Transitioning to height {}.", height); + self.last_lock.store(0, AtomicOrdering::SeqCst); + self.height.store(height, AtomicOrdering::SeqCst); + self.round.store(0, AtomicOrdering::SeqCst); + *self.lock_change.write() = None; + } + + /// Use via step_service to transition steps. fn to_step(&self, step: Step) { *self.step.write() = step; match step { @@ -209,22 +217,26 @@ impl Tendermint { self.generate_and_broadcast_message(block_hash); }, Step::Commit => { - debug!(target: "poa", "to_step: Commit."); + trace!(target: "poa", "to_step: Commit."); // Commit the block using a complete signature set. let round = self.round.load(AtomicOrdering::SeqCst); + let height = self.height.load(AtomicOrdering::SeqCst); if let Some(block_hash) = *self.proposal.read() { - if let Some(seal) = self.votes.seal_signatures(self.height.load(AtomicOrdering::SeqCst), round, block_hash) { - let seal = vec![ - ::rlp::encode(&round).to_vec(), - ::rlp::encode(&seal.proposal).to_vec(), - ::rlp::encode(&seal.votes).to_vec() - ]; - self.submit_seal(block_hash, seal); + // Generate seal and remove old votes. + if let Some(seal) = self.votes.seal_signatures(height, round, block_hash) { + trace!(target: "poa", "to_step: Collected seal: {:?}", seal); + if self.is_proposer(&*self.authority.read()).is_ok() { + let seal = vec![ + ::rlp::encode(&round).to_vec(), + ::rlp::encode(&seal.proposal).to_vec(), + ::rlp::encode(&seal.votes).to_vec() + ]; + self.submit_seal(block_hash, seal); + } } else { warn!(target: "poa", "Proposal was not found!"); } } - *self.lock_change.write() = None; }, } } @@ -237,10 +249,11 @@ impl Tendermint { n > self.our_params.authority_n * 2/3 } - /// Round proposer switching. - fn is_proposer(&self, address: &Address) -> Result<(), EngineError> { + /// Check if address is a proposer for given round. + fn is_round_proposer(&self, height: Height, round: Round, address: &Address) -> Result<(), EngineError> { let ref p = self.our_params; - let proposer_nonce = self.height.load(AtomicOrdering::SeqCst) + self.round.load(AtomicOrdering::SeqCst); + let proposer_nonce = height + round; + trace!(target: "poa", "is_proposer: Proposer nonce: {}", proposer_nonce); let proposer = p.authorities.get(proposer_nonce % p.authority_n).expect("There are authority_n authorities; taking number modulo authority_n gives number in authority_n range; qed"); if proposer == address { Ok(()) @@ -249,6 +262,11 @@ impl Tendermint { } } + /// Check if address is the current proposer. + fn is_proposer(&self, address: &Address) -> Result<(), EngineError> { + self.is_round_proposer(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), address) + } + fn is_height(&self, message: &ConsensusMessage) -> bool { message.is_height(self.height.load(AtomicOrdering::SeqCst)) } @@ -258,15 +276,12 @@ impl Tendermint { } fn increment_round(&self, n: Round) { - debug!(target: "poa", "increment_round: New round."); + trace!(target: "poa", "increment_round: New round."); self.round.fetch_add(n, AtomicOrdering::SeqCst); } - fn reset_round(&self) { - debug!(target: "poa", "reset_round: New height."); - self.last_lock.store(0, AtomicOrdering::SeqCst); - self.height.fetch_add(1, AtomicOrdering::SeqCst); - self.round.store(0, AtomicOrdering::SeqCst); + fn new_height(&self) { + self.to_height(self.height.load(AtomicOrdering::SeqCst) + 1); } fn should_unlock(&self, lock_change_round: Round) -> bool { @@ -295,7 +310,6 @@ impl Tendermint { } fn handle_valid_message(&self, message: &ConsensusMessage) { - trace!(target: "poa", "handle_valid_message: Processing valid message: {:?}", message); let is_newer_than_lock = match *self.lock_change.read() { Some(ref lock) => message > lock, None => true, @@ -304,7 +318,7 @@ impl Tendermint { && message.step == Step::Prevote && message.block_hash.is_some() && self.has_enough_aligned_votes(message) { - debug!(target: "poa", "handle_valid_message: Lock change."); + trace!(target: "poa", "handle_valid_message: Lock change."); *self.lock_change.write() = Some(message.clone()); } // Check if it can affect the step transition. @@ -376,11 +390,6 @@ impl Engine for Tendermint { }); } - /// Get the address to be used as authority. - fn on_new_block(&self, block: &mut ExecutedBlock) { - *self.authority.write() = *block.header().author(); - } - /// Should this node participate. fn is_sealer(&self, address: &Address) -> Option { Some(self.is_authority(address)) @@ -399,14 +408,15 @@ impl Engine for Tendermint { let height = header.number() as Height; let round = self.round.load(AtomicOrdering::SeqCst); let bh = Some(header.bare_hash()); - let vote_info = message_info_rlp(height, round, Step::Propose, bh); + let vote_info = message_info_rlp(height, round, Step::Propose, bh.clone()); if let Ok(signature) = ap.sign(*author, self.password.read().clone(), vote_info.sha3()).map(H520::from) { // Insert Propose vote. + debug!(target: "poa", "Submitting proposal {} at height {} round {}.", header.bare_hash(), height, round); self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), *author); // Remember proposal for later seal submission. - *self.proposal.write() = Some(header.bare_hash()); + *self.proposal.write() = bh; Some(vec![ - ::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(), + ::rlp::encode(&round).to_vec(), ::rlp::encode(&signature).to_vec(), ::rlp::EMPTY_LIST_RLP.to_vec() ]) @@ -422,8 +432,7 @@ impl Engine for Tendermint { fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> { let message: ConsensusMessage = try!(rlp.as_val()); - // Check if the message is known. - if !self.votes.is_known(&message) { + if !self.votes.is_old_or_known(&message) { let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3()))); if !self.is_authority(&sender) { try!(Err(EngineError::NotAuthorized(sender))); @@ -431,7 +440,10 @@ impl Engine for Tendermint { self.votes.vote(message.clone(), sender); self.broadcast_message(rlp.as_raw().to_vec()); + trace!(target: "poa", "Handling a valid message: {:?}", message); self.handle_valid_message(&message); + } else { + trace!(target: "poa", "handle_message: Old or known message ignored {:?}.", message); } Ok(()) } @@ -483,9 +495,14 @@ impl Engine for Tendermint { try!(Err(BlockError::InvalidSeal)); } } - + + if self.is_above_threshold(signature_count) { + // Skip ahead if block is from the future. + if proposal.height > self.height.load(AtomicOrdering::SeqCst) { + self.to_height(proposal.height); + } // Check if its a proposal if there is not enough precommits. - if !self.is_above_threshold(signature_count) { + } else { let signatures_len = signatures_field.len(); // Proposal has to have an empty signature list. if signatures_len != 1 { @@ -495,11 +512,13 @@ impl Engine for Tendermint { found: signatures_len }))); } - try!(self.is_proposer(&proposer)); - *self.proposal.write() = proposal.block_hash.clone(); - self.votes.vote(proposal, proposer); + try!(self.is_round_proposer(proposal.height, proposal.round, &proposer)); + if self.is_round(&proposal) { + debug!(target: "poa", "Received a new proposal for height {}, round {} from {}.", proposal.height, proposal.round, proposer); + *self.proposal.write() = proposal.block_hash.clone(); + self.votes.vote(proposal, proposer); + } } - Ok(()) } @@ -548,12 +567,22 @@ impl Engine for Tendermint { fn is_new_best_block(&self, _best_total_difficulty: U256, best_header: HeaderView, _parent_details: &BlockDetails, new_header: &HeaderView) -> bool { trace!(target: "poa", "new_header: {}, best_header: {}", new_header.number(), best_header.number()); - if new_header.number() > best_header.number() { - true + let new_number = new_header.number(); + let best_number = best_header.number(); + if new_number != best_number { + new_number > best_number } else { - let new_signatures = new_header.seal().get(2).expect("Tendermint seal should have three elements.").len(); - let best_signatures = best_header.seal().get(2).expect("Tendermint seal should have three elements.").len(); - new_signatures > best_signatures + let new_seal = new_header.seal(); + let best_seal = best_header.seal(); + let new_signatures = new_seal.get(2).expect("Tendermint seal should have three elements.").len(); + let best_signatures = best_seal.get(2).expect("Tendermint seal should have three elements.").len(); + if new_signatures > best_signatures { + true + } else { + let new_round: Round = ::rlp::Rlp::new(&new_seal.get(0).expect("Tendermint seal should have three elements.")).as_val(); + let best_round: Round = ::rlp::Rlp::new(&best_seal.get(0).expect("Tendermint seal should have three elements.")).as_val(); + new_round > best_round + } } } @@ -820,20 +849,4 @@ mod tests { let second = test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal)); assert!(first ^ second); } - - #[test] - fn timeout_transitioning() { - let (spec, tap) = setup(); - let engine = spec.engine.clone(); - let mut db_result = get_temp_state_db(); - let mut db = db_result.take(); - spec.ensure_db_good(&mut db, &TrieFactory::new(TrieSpec::Secure)).unwrap(); - - println!("{:?}", ::rlp::EMPTY_LIST_RLP.to_vec().len()); - println!("{:?}", ::rlp::encode(&vec![H520::default()]).to_vec().len()); - let v = insert_and_register(&tap, &engine, "0"); - - println!("done"); - - } } diff --git a/ethcore/src/engines/tendermint/transition.rs b/ethcore/src/engines/tendermint/transition.rs index 1ca230b3e..22d4d9498 100644 --- a/ethcore/src/engines/tendermint/transition.rs +++ b/ethcore/src/engines/tendermint/transition.rs @@ -106,7 +106,7 @@ impl IoHandler for TransitionHandler { Step::Commit => { trace!(target: "poa", "timeout: Commit timeout."); set_timeout(io, engine.our_params.timeouts.propose); - engine.reset_round(); + engine.new_height(); Some(Step::Propose) }, }; diff --git a/ethcore/src/engines/tendermint/vote_collector.rs b/ethcore/src/engines/tendermint/vote_collector.rs index 8bb271a35..271ff5f9a 100644 --- a/ethcore/src/engines/tendermint/vote_collector.rs +++ b/ethcore/src/engines/tendermint/vote_collector.rs @@ -58,20 +58,15 @@ impl VoteCollector { /// Insert vote if it is newer than the oldest one. pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option
{ - let is_new = { - let guard = self.votes.read(); - guard.keys().next().map_or(true, |oldest| &message > oldest) - }; - if is_new { - self.votes.write().insert(message, voter) - } else { - trace!(target: "poa", "vote: Old message ignored {:?}.", message); - None - } + self.votes.write().insert(message, voter) } - pub fn is_known(&self, message: &ConsensusMessage) -> bool { + pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool { self.votes.read().contains_key(message) + || { + let guard = self.votes.read(); + guard.keys().next().map_or(true, |oldest| message <= oldest) + } } /// Throws out messages older than message, leaves message as marker for the oldest. @@ -94,6 +89,7 @@ impl VoteCollector { .collect::>(); (proposal, votes) }; + // Remove messages that are no longer relevant. votes.last().map(|m| self.throw_out_old(m)); proposal.map(|p| SealSignatures { proposal: p.signature, @@ -127,9 +123,9 @@ impl VoteCollector { n } - pub fn get_older_than(&self, message: &ConsensusMessage) -> Vec { + pub fn get_up_to(&self, height: Height) -> Vec { let guard = self.votes.read(); - guard.keys().take_while(|m| *m <= message).map(|m| ::rlp::encode(m).to_vec()).collect() + guard.keys().take_while(|m| m.height <= height).map(|m| ::rlp::encode(m).to_vec()).collect() } }