better gossip, better proposal collection

This commit is contained in:
keorn 2016-12-04 19:43:24 +00:00
parent edef7a185f
commit f7a01b87b5
3 changed files with 84 additions and 75 deletions

View File

@ -154,6 +154,7 @@ impl Tendermint {
// TODO: memoize the rlp for consecutive broadcasts // TODO: memoize the rlp for consecutive broadcasts
let message = ConsensusMessage::new(signature, h, r, *s, block_hash); let message = ConsensusMessage::new(signature, h, r, *s, block_hash);
self.votes.vote(message.clone(), *authority); self.votes.vote(message.clone(), *authority);
debug!(target: "poa", "Generated a message for height {:?}.", message);
self.handle_valid_message(&message); self.handle_valid_message(&message);
Some(message_rlp) Some(message_rlp)
@ -176,13 +177,20 @@ impl Tendermint {
} }
fn broadcast_old_messages(&self) { fn broadcast_old_messages(&self) {
if let Some(ref lc) = *self.lock_change.read() { for m in self.votes.get_up_to(self.height.load(AtomicOrdering::SeqCst)).into_iter() {
for m in self.votes.get_older_than(lc).into_iter() { self.broadcast_message(m);
self.broadcast_message(m);
}
} }
} }
fn to_height(&self, height: Height) {
debug!(target: "poa", "Transitioning to height {}.", height);
self.last_lock.store(0, AtomicOrdering::SeqCst);
self.height.store(height, AtomicOrdering::SeqCst);
self.round.store(0, AtomicOrdering::SeqCst);
*self.lock_change.write() = None;
}
/// Use via step_service to transition steps.
fn to_step(&self, step: Step) { fn to_step(&self, step: Step) {
*self.step.write() = step; *self.step.write() = step;
match step { match step {
@ -209,22 +217,26 @@ impl Tendermint {
self.generate_and_broadcast_message(block_hash); self.generate_and_broadcast_message(block_hash);
}, },
Step::Commit => { Step::Commit => {
debug!(target: "poa", "to_step: Commit."); trace!(target: "poa", "to_step: Commit.");
// Commit the block using a complete signature set. // Commit the block using a complete signature set.
let round = self.round.load(AtomicOrdering::SeqCst); let round = self.round.load(AtomicOrdering::SeqCst);
let height = self.height.load(AtomicOrdering::SeqCst);
if let Some(block_hash) = *self.proposal.read() { if let Some(block_hash) = *self.proposal.read() {
if let Some(seal) = self.votes.seal_signatures(self.height.load(AtomicOrdering::SeqCst), round, block_hash) { // Generate seal and remove old votes.
let seal = vec![ if let Some(seal) = self.votes.seal_signatures(height, round, block_hash) {
::rlp::encode(&round).to_vec(), trace!(target: "poa", "to_step: Collected seal: {:?}", seal);
::rlp::encode(&seal.proposal).to_vec(), if self.is_proposer(&*self.authority.read()).is_ok() {
::rlp::encode(&seal.votes).to_vec() let seal = vec![
]; ::rlp::encode(&round).to_vec(),
self.submit_seal(block_hash, seal); ::rlp::encode(&seal.proposal).to_vec(),
::rlp::encode(&seal.votes).to_vec()
];
self.submit_seal(block_hash, seal);
}
} else { } else {
warn!(target: "poa", "Proposal was not found!"); warn!(target: "poa", "Proposal was not found!");
} }
} }
*self.lock_change.write() = None;
}, },
} }
} }
@ -237,10 +249,11 @@ impl Tendermint {
n > self.our_params.authority_n * 2/3 n > self.our_params.authority_n * 2/3
} }
/// Round proposer switching. /// Check if address is a proposer for given round.
fn is_proposer(&self, address: &Address) -> Result<(), EngineError> { fn is_round_proposer(&self, height: Height, round: Round, address: &Address) -> Result<(), EngineError> {
let ref p = self.our_params; let ref p = self.our_params;
let proposer_nonce = self.height.load(AtomicOrdering::SeqCst) + self.round.load(AtomicOrdering::SeqCst); let proposer_nonce = height + round;
trace!(target: "poa", "is_proposer: Proposer nonce: {}", proposer_nonce);
let proposer = p.authorities.get(proposer_nonce % p.authority_n).expect("There are authority_n authorities; taking number modulo authority_n gives number in authority_n range; qed"); let proposer = p.authorities.get(proposer_nonce % p.authority_n).expect("There are authority_n authorities; taking number modulo authority_n gives number in authority_n range; qed");
if proposer == address { if proposer == address {
Ok(()) Ok(())
@ -249,6 +262,11 @@ impl Tendermint {
} }
} }
/// Check if address is the current proposer.
fn is_proposer(&self, address: &Address) -> Result<(), EngineError> {
self.is_round_proposer(self.height.load(AtomicOrdering::SeqCst), self.round.load(AtomicOrdering::SeqCst), address)
}
fn is_height(&self, message: &ConsensusMessage) -> bool { fn is_height(&self, message: &ConsensusMessage) -> bool {
message.is_height(self.height.load(AtomicOrdering::SeqCst)) message.is_height(self.height.load(AtomicOrdering::SeqCst))
} }
@ -258,15 +276,12 @@ impl Tendermint {
} }
fn increment_round(&self, n: Round) { fn increment_round(&self, n: Round) {
debug!(target: "poa", "increment_round: New round."); trace!(target: "poa", "increment_round: New round.");
self.round.fetch_add(n, AtomicOrdering::SeqCst); self.round.fetch_add(n, AtomicOrdering::SeqCst);
} }
fn reset_round(&self) { fn new_height(&self) {
debug!(target: "poa", "reset_round: New height."); self.to_height(self.height.load(AtomicOrdering::SeqCst) + 1);
self.last_lock.store(0, AtomicOrdering::SeqCst);
self.height.fetch_add(1, AtomicOrdering::SeqCst);
self.round.store(0, AtomicOrdering::SeqCst);
} }
fn should_unlock(&self, lock_change_round: Round) -> bool { fn should_unlock(&self, lock_change_round: Round) -> bool {
@ -295,7 +310,6 @@ impl Tendermint {
} }
fn handle_valid_message(&self, message: &ConsensusMessage) { fn handle_valid_message(&self, message: &ConsensusMessage) {
trace!(target: "poa", "handle_valid_message: Processing valid message: {:?}", message);
let is_newer_than_lock = match *self.lock_change.read() { let is_newer_than_lock = match *self.lock_change.read() {
Some(ref lock) => message > lock, Some(ref lock) => message > lock,
None => true, None => true,
@ -304,7 +318,7 @@ impl Tendermint {
&& message.step == Step::Prevote && message.step == Step::Prevote
&& message.block_hash.is_some() && message.block_hash.is_some()
&& self.has_enough_aligned_votes(message) { && self.has_enough_aligned_votes(message) {
debug!(target: "poa", "handle_valid_message: Lock change."); trace!(target: "poa", "handle_valid_message: Lock change.");
*self.lock_change.write() = Some(message.clone()); *self.lock_change.write() = Some(message.clone());
} }
// Check if it can affect the step transition. // Check if it can affect the step transition.
@ -376,11 +390,6 @@ impl Engine for Tendermint {
}); });
} }
/// Get the address to be used as authority.
fn on_new_block(&self, block: &mut ExecutedBlock) {
*self.authority.write() = *block.header().author();
}
/// Should this node participate. /// Should this node participate.
fn is_sealer(&self, address: &Address) -> Option<bool> { fn is_sealer(&self, address: &Address) -> Option<bool> {
Some(self.is_authority(address)) Some(self.is_authority(address))
@ -399,14 +408,15 @@ impl Engine for Tendermint {
let height = header.number() as Height; let height = header.number() as Height;
let round = self.round.load(AtomicOrdering::SeqCst); let round = self.round.load(AtomicOrdering::SeqCst);
let bh = Some(header.bare_hash()); let bh = Some(header.bare_hash());
let vote_info = message_info_rlp(height, round, Step::Propose, bh); let vote_info = message_info_rlp(height, round, Step::Propose, bh.clone());
if let Ok(signature) = ap.sign(*author, self.password.read().clone(), vote_info.sha3()).map(H520::from) { if let Ok(signature) = ap.sign(*author, self.password.read().clone(), vote_info.sha3()).map(H520::from) {
// Insert Propose vote. // Insert Propose vote.
debug!(target: "poa", "Submitting proposal {} at height {} round {}.", header.bare_hash(), height, round);
self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), *author); self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), *author);
// Remember proposal for later seal submission. // Remember proposal for later seal submission.
*self.proposal.write() = Some(header.bare_hash()); *self.proposal.write() = bh;
Some(vec![ Some(vec![
::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(), ::rlp::encode(&round).to_vec(),
::rlp::encode(&signature).to_vec(), ::rlp::encode(&signature).to_vec(),
::rlp::EMPTY_LIST_RLP.to_vec() ::rlp::EMPTY_LIST_RLP.to_vec()
]) ])
@ -422,8 +432,7 @@ impl Engine for Tendermint {
fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> { fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> {
let message: ConsensusMessage = try!(rlp.as_val()); let message: ConsensusMessage = try!(rlp.as_val());
// Check if the message is known. if !self.votes.is_old_or_known(&message) {
if !self.votes.is_known(&message) {
let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3()))); let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3())));
if !self.is_authority(&sender) { if !self.is_authority(&sender) {
try!(Err(EngineError::NotAuthorized(sender))); try!(Err(EngineError::NotAuthorized(sender)));
@ -431,7 +440,10 @@ impl Engine for Tendermint {
self.votes.vote(message.clone(), sender); self.votes.vote(message.clone(), sender);
self.broadcast_message(rlp.as_raw().to_vec()); self.broadcast_message(rlp.as_raw().to_vec());
trace!(target: "poa", "Handling a valid message: {:?}", message);
self.handle_valid_message(&message); self.handle_valid_message(&message);
} else {
trace!(target: "poa", "handle_message: Old or known message ignored {:?}.", message);
} }
Ok(()) Ok(())
} }
@ -484,8 +496,13 @@ impl Engine for Tendermint {
} }
} }
if self.is_above_threshold(signature_count) {
// Skip ahead if block is from the future.
if proposal.height > self.height.load(AtomicOrdering::SeqCst) {
self.to_height(proposal.height);
}
// Check if its a proposal if there is not enough precommits. // Check if its a proposal if there is not enough precommits.
if !self.is_above_threshold(signature_count) { } else {
let signatures_len = signatures_field.len(); let signatures_len = signatures_field.len();
// Proposal has to have an empty signature list. // Proposal has to have an empty signature list.
if signatures_len != 1 { if signatures_len != 1 {
@ -495,11 +512,13 @@ impl Engine for Tendermint {
found: signatures_len found: signatures_len
}))); })));
} }
try!(self.is_proposer(&proposer)); try!(self.is_round_proposer(proposal.height, proposal.round, &proposer));
*self.proposal.write() = proposal.block_hash.clone(); if self.is_round(&proposal) {
self.votes.vote(proposal, proposer); debug!(target: "poa", "Received a new proposal for height {}, round {} from {}.", proposal.height, proposal.round, proposer);
*self.proposal.write() = proposal.block_hash.clone();
self.votes.vote(proposal, proposer);
}
} }
Ok(()) Ok(())
} }
@ -548,12 +567,22 @@ impl Engine for Tendermint {
fn is_new_best_block(&self, _best_total_difficulty: U256, best_header: HeaderView, _parent_details: &BlockDetails, new_header: &HeaderView) -> bool { fn is_new_best_block(&self, _best_total_difficulty: U256, best_header: HeaderView, _parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
trace!(target: "poa", "new_header: {}, best_header: {}", new_header.number(), best_header.number()); trace!(target: "poa", "new_header: {}, best_header: {}", new_header.number(), best_header.number());
if new_header.number() > best_header.number() { let new_number = new_header.number();
true let best_number = best_header.number();
if new_number != best_number {
new_number > best_number
} else { } else {
let new_signatures = new_header.seal().get(2).expect("Tendermint seal should have three elements.").len(); let new_seal = new_header.seal();
let best_signatures = best_header.seal().get(2).expect("Tendermint seal should have three elements.").len(); let best_seal = best_header.seal();
new_signatures > best_signatures let new_signatures = new_seal.get(2).expect("Tendermint seal should have three elements.").len();
let best_signatures = best_seal.get(2).expect("Tendermint seal should have three elements.").len();
if new_signatures > best_signatures {
true
} else {
let new_round: Round = ::rlp::Rlp::new(&new_seal.get(0).expect("Tendermint seal should have three elements.")).as_val();
let best_round: Round = ::rlp::Rlp::new(&best_seal.get(0).expect("Tendermint seal should have three elements.")).as_val();
new_round > best_round
}
} }
} }
@ -820,20 +849,4 @@ mod tests {
let second = test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal)); let second = test_io.received.read().contains(&ClientIoMessage::SubmitSeal(proposal.unwrap(), seal));
assert!(first ^ second); assert!(first ^ second);
} }
#[test]
fn timeout_transitioning() {
let (spec, tap) = setup();
let engine = spec.engine.clone();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(&mut db, &TrieFactory::new(TrieSpec::Secure)).unwrap();
println!("{:?}", ::rlp::EMPTY_LIST_RLP.to_vec().len());
println!("{:?}", ::rlp::encode(&vec![H520::default()]).to_vec().len());
let v = insert_and_register(&tap, &engine, "0");
println!("done");
}
} }

View File

@ -106,7 +106,7 @@ impl IoHandler<Step> for TransitionHandler {
Step::Commit => { Step::Commit => {
trace!(target: "poa", "timeout: Commit timeout."); trace!(target: "poa", "timeout: Commit timeout.");
set_timeout(io, engine.our_params.timeouts.propose); set_timeout(io, engine.our_params.timeouts.propose);
engine.reset_round(); engine.new_height();
Some(Step::Propose) Some(Step::Propose)
}, },
}; };

View File

@ -58,20 +58,15 @@ impl VoteCollector {
/// Insert vote if it is newer than the oldest one. /// Insert vote if it is newer than the oldest one.
pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> { pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> {
let is_new = { self.votes.write().insert(message, voter)
let guard = self.votes.read();
guard.keys().next().map_or(true, |oldest| &message > oldest)
};
if is_new {
self.votes.write().insert(message, voter)
} else {
trace!(target: "poa", "vote: Old message ignored {:?}.", message);
None
}
} }
pub fn is_known(&self, message: &ConsensusMessage) -> bool { pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool {
self.votes.read().contains_key(message) self.votes.read().contains_key(message)
|| {
let guard = self.votes.read();
guard.keys().next().map_or(true, |oldest| message <= oldest)
}
} }
/// Throws out messages older than message, leaves message as marker for the oldest. /// Throws out messages older than message, leaves message as marker for the oldest.
@ -94,6 +89,7 @@ impl VoteCollector {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
(proposal, votes) (proposal, votes)
}; };
// Remove messages that are no longer relevant.
votes.last().map(|m| self.throw_out_old(m)); votes.last().map(|m| self.throw_out_old(m));
proposal.map(|p| SealSignatures { proposal.map(|p| SealSignatures {
proposal: p.signature, proposal: p.signature,
@ -127,9 +123,9 @@ impl VoteCollector {
n n
} }
pub fn get_older_than(&self, message: &ConsensusMessage) -> Vec<Bytes> { pub fn get_up_to(&self, height: Height) -> Vec<Bytes> {
let guard = self.votes.read(); let guard = self.votes.read();
guard.keys().take_while(|m| *m <= message).map(|m| ::rlp::encode(m).to_vec()).collect() guard.keys().take_while(|m| m.height <= height).map(|m| ::rlp::encode(m).to_vec()).collect()
} }
} }