From d3b2bcdd79463bd9f4793ef14645d27347ecdb5c Mon Sep 17 00:00:00 2001 From: keorn Date: Mon, 10 Apr 2017 19:03:18 +0100 Subject: [PATCH] Tendermint fixes (#5415) * more resilience * refactor commit * fix proposal broadcast * list encoding * address grumbles * to into --- ethcore/src/engines/tendermint/message.rs | 40 ++--- ethcore/src/engines/tendermint/mod.rs | 189 +++++++++++----------- ethcore/src/engines/vote_collector.rs | 40 ++--- sync/src/chain.rs | 2 +- sync/src/tests/consensus.rs | 4 +- sync/src/tests/helpers.rs | 43 ++--- 6 files changed, 141 insertions(+), 177 deletions(-) diff --git a/ethcore/src/engines/tendermint/message.rs b/ethcore/src/engines/tendermint/message.rs index 0649ea050..304aa2671 100644 --- a/ethcore/src/engines/tendermint/message.rs +++ b/ethcore/src/engines/tendermint/message.rs @@ -61,6 +61,11 @@ pub fn consensus_view(header: &Header) -> Result { UntrustedRlp::new(view_rlp.as_slice()).as_val() } +/// Proposal signature. +pub fn proposal_signature(header: &Header) -> Result { + UntrustedRlp::new(header.seal().get(1).expect("seal passed basic verification; seal has 3 fields; qed").as_slice()).as_val() +} + impl Message for ConsensusMessage { type Round = VoteStep; @@ -84,34 +89,18 @@ impl ConsensusMessage { pub fn new_proposal(header: &Header) -> Result { Ok(ConsensusMessage { + signature: proposal_signature(header)?, vote_step: VoteStep::new(header.number() as Height, consensus_view(header)?, Step::Propose), - signature: UntrustedRlp::new(header.seal().get(1).expect("seal passed basic verification; seal has 3 fields; qed").as_slice()).as_val()?, block_hash: Some(header.bare_hash()), }) } - pub fn new_commit(proposal: &ConsensusMessage, signature: H520) -> Self { - let mut vote_step = proposal.vote_step.clone(); - vote_step.step = Step::Precommit; - ConsensusMessage { - vote_step: vote_step, - block_hash: proposal.block_hash, - signature: signature, - } - } - pub fn verify(&self) -> Result { let full_rlp = ::rlp::encode(self); let block_info = Rlp::new(&full_rlp).at(1); let public_key = recover(&self.signature.into(), &block_info.as_raw().sha3())?; Ok(public_to_address(&public_key)) } - - pub fn precommit_hash(&self) -> H256 { - let mut vote_step = self.vote_step.clone(); - vote_step.step = Step::Precommit; - message_info_rlp(&vote_step, self.block_hash).sha3() - } } impl Default for VoteStep { @@ -203,6 +192,10 @@ pub fn message_full_rlp(signature: &H520, vote_info: &Bytes) -> Bytes { s.out() } +pub fn message_hash(vote_step: VoteStep, block_hash: H256) -> H256 { + message_info_rlp(&vote_step, Some(block_hash)).sha3() +} + #[cfg(test)] mod tests { use util::*; @@ -294,19 +287,6 @@ mod tests { ); } - #[test] - fn message_info_from_header() { - let header = Header::default(); - let pro = ConsensusMessage { - signature: Default::default(), - vote_step: VoteStep::new(0, 0, Step::Propose), - block_hash: Some(header.bare_hash()) - }; - let pre = message_info_rlp(&VoteStep::new(0, 0, Step::Precommit), Some(header.bare_hash())); - - assert_eq!(pro.precommit_hash(), pre.sha3()); - } - #[test] fn step_ordering() { assert!(VoteStep::new(10, 123, Step::Precommit) < VoteStep::new(11, 123, Step::Precommit)); diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 464e102de..8c8094117 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -97,6 +97,8 @@ pub struct Tendermint { proposal: RwLock>, /// Hash of the proposal parent block. proposal_parent: RwLock, + /// Last block proposed by this validator. + last_proposed: RwLock, /// Set used to determine the current validators. validators: Box, } @@ -122,6 +124,7 @@ impl Tendermint { last_lock: AtomicUsize::new(0), proposal: RwLock::new(None), proposal_parent: Default::default(), + last_proposed: Default::default(), validators: new_validator_set(our_params.validators), }); let handler = TransitionHandler::new(Arc::downgrade(&engine) as Weak, Box::new(our_params.timeouts)); @@ -196,6 +199,7 @@ impl Tendermint { self.height.store(new_height, AtomicOrdering::SeqCst); self.view.store(0, AtomicOrdering::SeqCst); *self.lock_change.write() = None; + *self.proposal.write() = None; } /// Use via step_service to transition steps. @@ -206,7 +210,6 @@ impl Tendermint { *self.step.write() = step; match step { Step::Propose => { - *self.proposal.write() = None; self.update_sealing() }, Step::Prevote => { @@ -230,28 +233,6 @@ impl Tendermint { }, Step::Commit => { trace!(target: "engine", "to_step: Commit."); - // Commit the block using a complete signature set. - let view = self.view.load(AtomicOrdering::SeqCst); - let height = self.height.load(AtomicOrdering::SeqCst); - if let Some(block_hash) = *self.proposal.read() { - // Generate seal and remove old votes. - if self.is_signer_proposer(&*self.proposal_parent.read()) { - let proposal_step = VoteStep::new(height, view, Step::Propose); - let precommit_step = VoteStep::new(proposal_step.height, proposal_step.view, Step::Precommit); - if let Some(seal) = self.votes.seal_signatures(proposal_step, precommit_step, &block_hash) { - trace!(target: "engine", "Collected seal: {:?}", seal); - let seal = vec![ - ::rlp::encode(&view).to_vec(), - ::rlp::encode(&seal.proposal).to_vec(), - ::rlp::encode_list(&seal.votes).to_vec() - ]; - self.submit_seal(block_hash, seal); - self.to_next_height(height); - } else { - warn!(target: "engine", "Not enough votes found!"); - } - } - } }, } } @@ -260,8 +241,17 @@ impl Tendermint { self.validators.contains(&*self.proposal_parent.read(), address) } - fn is_above_threshold(&self, n: usize) -> bool { - n > self.validators.count(&*self.proposal_parent.read()) * 2/3 + fn check_above_threshold(&self, n: usize) -> Result<(), EngineError> { + let threshold = self.validators.count(&*self.proposal_parent.read()) * 2/3; + if n > threshold { + Ok(()) + } else { + Err(EngineError::BadSealFieldSize(OutOfBounds { + min: Some(threshold), + max: None, + found: n + })) + } } /// Find the designated for the given view. @@ -272,7 +262,7 @@ impl Tendermint { } /// Check if address is a proposer for given view. - fn is_view_proposer(&self, bh: &H256, height: Height, view: View, address: &Address) -> Result<(), EngineError> { + fn check_view_proposer(&self, bh: &H256, height: Height, view: View, address: &Address) -> Result<(), EngineError> { let proposer = self.view_proposer(bh, height, view); if proposer == *address { Ok(()) @@ -308,13 +298,13 @@ impl Tendermint { fn has_enough_any_votes(&self) -> bool { let step_votes = self.votes.count_round_votes(&VoteStep::new(self.height.load(AtomicOrdering::SeqCst), self.view.load(AtomicOrdering::SeqCst), *self.step.read())); - self.is_above_threshold(step_votes) + self.check_above_threshold(step_votes).is_ok() } fn has_enough_future_step_votes(&self, vote_step: &VoteStep) -> bool { if vote_step.view > self.view.load(AtomicOrdering::SeqCst) { let step_votes = self.votes.count_round_votes(vote_step); - self.is_above_threshold(step_votes) + self.check_above_threshold(step_votes).is_ok() } else { false } @@ -322,7 +312,7 @@ impl Tendermint { fn has_enough_aligned_votes(&self, message: &ConsensusMessage) -> bool { let aligned_count = self.votes.count_aligned_votes(&message); - self.is_above_threshold(aligned_count) + self.check_above_threshold(aligned_count).is_ok() } fn handle_valid_message(&self, message: &ConsensusMessage) { @@ -337,18 +327,32 @@ impl Tendermint { && self.has_enough_aligned_votes(message); if lock_change { trace!(target: "engine", "handle_valid_message: Lock change."); - *self.lock_change.write() = Some(message.clone()); + *self.lock_change.write() = Some(message.clone()); } // Check if it can affect the step transition. if self.is_height(message) { let next_step = match *self.step.read() { + Step::Precommit if message.block_hash.is_none() && self.has_enough_aligned_votes(message) => { + self.increment_view(1); + Some(Step::Propose) + }, Step::Precommit if self.has_enough_aligned_votes(message) => { - if message.block_hash.is_none() { - self.increment_view(1); - Some(Step::Propose) - } else { - Some(Step::Commit) + let bh = message.block_hash.expect("previous guard ensures is_some; qed"); + if *self.last_proposed.read() == bh { + // Commit the block using a complete signature set. + // Generate seal and remove old votes. + let precommits = self.votes.round_signatures(vote_step, &bh); + trace!(target: "engine", "Collected seal: {:?}", precommits); + let seal = vec![ + ::rlp::encode(&vote_step.view).to_vec(), + ::rlp::NULL_RLP.to_vec(), + ::rlp::encode_list(&precommits).to_vec() + ]; + self.submit_seal(bh, seal); + self.votes.throw_out_old(&vote_step); } + self.to_next_height(self.height.load(AtomicOrdering::SeqCst)); + Some(Step::Commit) }, Step::Precommit if self.has_enough_future_step_votes(&vote_step) => { self.increment_view(vote_step.view - self.view.load(AtomicOrdering::SeqCst)); @@ -442,6 +446,8 @@ impl Engine for Tendermint { // Insert Propose vote. debug!(target: "engine", "Submitting proposal {} at height {} view {}.", header.bare_hash(), height, view); self.votes.vote(ConsensusMessage::new(signature, height, view, Step::Propose, bh), author); + // Remember the owned block. + *self.last_proposed.write() = header.bare_hash(); // Remember proposal for later seal submission. *self.proposal.write() = bh; *self.proposal_parent.write() = header.parent_hash().clone(); @@ -462,12 +468,12 @@ impl Engine for Tendermint { if !self.votes.is_old_or_known(&message) { let sender = public_to_address(&recover(&message.signature.into(), &rlp.at(1)?.as_raw().sha3())?); if !self.is_authority(&sender) { - Err(EngineError::NotAuthorized(sender))?; + return Err(EngineError::NotAuthorized(sender).into()); } self.broadcast_message(rlp.as_raw().to_vec()); if self.votes.vote(message.clone(), &sender).is_some() { self.validators.report_malicious(&sender); - Err(EngineError::DoubleVote(sender))? + return Err(EngineError::DoubleVote(sender).into()); } trace!(target: "engine", "Handling a valid {:?} from {}.", message, sender); self.handle_valid_message(&message); @@ -491,22 +497,19 @@ impl Engine for Tendermint { fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> Result<(), Error> { let seal_length = header.seal().len(); if seal_length == self.seal_fields() { - let signatures_len = header.seal()[2].len(); - if signatures_len >= 1 { + // Either proposal or commit. + if (header.seal()[1] == ::rlp::NULL_RLP.to_vec()) + != (header.seal()[2] == ::rlp::EMPTY_LIST_RLP.to_vec()) { Ok(()) } else { - Err(From::from(EngineError::BadSealFieldSize(OutOfBounds { - min: Some(1), - max: None, - found: signatures_len - }))) + warn!(target: "engine", "verify_block_basic: Block is neither a Commit nor Proposal."); + Err(BlockError::InvalidSeal.into()) } } else { - Err(From::from(BlockError::InvalidSealArity( + Err(BlockError::InvalidSealArity( Mismatch { expected: self.seal_fields(), found: seal_length } - ))) + ).into()) } - } fn verify_block_unordered(&self, _header: &Header, _block: Option<&[u8]>) -> Result<(), Error> { @@ -515,50 +518,42 @@ impl Engine for Tendermint { /// Verify validators and gas limit. fn verify_block_family(&self, header: &Header, parent: &Header, _block: Option<&[u8]>) -> Result<(), Error> { - let proposal = ConsensusMessage::new_proposal(header)?; - let proposer = proposal.verify()?; - if !self.is_authority(&proposer) { - Err(EngineError::NotAuthorized(proposer))? - } - - let precommit_hash = proposal.precommit_hash(); - let ref signatures_field = header.seal()[2]; - let mut signature_count = 0; - let mut origins = HashSet::new(); - for rlp in UntrustedRlp::new(signatures_field).iter() { - let precommit: ConsensusMessage = ConsensusMessage::new_commit(&proposal, rlp.as_val()?); - let address = match self.votes.get(&precommit) { - Some(a) => a, - None => public_to_address(&recover(&precommit.signature.into(), &precommit_hash)?), - }; - if !self.validators.contains(header.parent_hash(), &address) { - Err(EngineError::NotAuthorized(address.to_owned()))? - } - - if origins.insert(address) { - signature_count += 1; - } else { - warn!(target: "engine", "verify_block_unordered: Duplicate signature from {} on the seal.", address); - Err(BlockError::InvalidSeal)?; - } - } - - // Check if its a proposal if there is not enough precommits. - if !self.is_above_threshold(signature_count) { - let signatures_len = signatures_field.len(); - // Proposal has to have an empty signature list. - if signatures_len != 1 { - Err(EngineError::BadSealFieldSize(OutOfBounds { - min: Some(1), - max: Some(1), - found: signatures_len - }))?; - } - self.is_view_proposer(header.parent_hash(), proposal.vote_step.height, proposal.vote_step.view, &proposer)?; - } - if header.number() == 0 { - Err(BlockError::RidiculousNumber(OutOfBounds { min: Some(1), max: None, found: header.number() }))?; + return Err(BlockError::RidiculousNumber(OutOfBounds { min: Some(1), max: None, found: header.number() }).into()); + } + + if let Ok(proposal) = ConsensusMessage::new_proposal(header) { + let proposer = proposal.verify()?; + if !self.is_authority(&proposer) { + return Err(EngineError::NotAuthorized(proposer).into()); + } + self.check_view_proposer(header.parent_hash(), proposal.vote_step.height, proposal.vote_step.view, &proposer)?; + } else { + let vote_step = VoteStep::new(header.number() as usize, consensus_view(header)?, Step::Precommit); + let precommit_hash = message_hash(vote_step.clone(), header.bare_hash()); + let ref signatures_field = header.seal().get(2).expect("block went through verify_block_basic; block has .seal_fields() fields; qed"); + let mut origins = HashSet::new(); + for rlp in UntrustedRlp::new(signatures_field).iter() { + let precommit = ConsensusMessage { + signature: rlp.as_val()?, + block_hash: Some(header.bare_hash()), + vote_step: vote_step.clone(), + }; + let address = match self.votes.get(&precommit) { + Some(a) => a, + None => public_to_address(&recover(&precommit.signature.into(), &precommit_hash)?), + }; + if !self.validators.contains(header.parent_hash(), &address) { + return Err(EngineError::NotAuthorized(address.to_owned()).into()); + } + + if !origins.insert(address) { + warn!(target: "engine", "verify_block_unordered: Duplicate signature from {} on the seal.", address); + return Err(BlockError::InvalidSeal.into()); + } + } + + self.check_above_threshold(origins.len())? } let gas_limit_divisor = self.gas_limit_bound_divisor; @@ -566,7 +561,7 @@ impl Engine for Tendermint { let max_gas = parent.gas_limit().clone() + parent.gas_limit().clone() / gas_limit_divisor; if header.gas_limit() <= &min_gas || header.gas_limit() >= &max_gas { self.validators.report_malicious(header.author()); - Err(BlockError::InvalidGasLimit(OutOfBounds { min: Some(min_gas), max: Some(max_gas), found: header.gas_limit().clone() }))?; + return Err(BlockError::InvalidGasLimit(OutOfBounds { min: Some(min_gas), max: Some(max_gas), found: header.gas_limit().clone() }).into()); } Ok(()) @@ -590,13 +585,14 @@ impl Engine for Tendermint { fn is_proposal(&self, header: &Header) -> bool { let signatures_len = header.seal()[2].len(); // Signatures have to be an empty list rlp. - let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed"); if signatures_len != 1 { // New Commit received, skip to next height. - trace!(target: "engine", "Received a commit: {:?}.", proposal.vote_step); - self.to_next_height(proposal.vote_step.height); + trace!(target: "engine", "Received a commit: {:?}.", header.number()); + self.to_next_height(header.number() as usize); + self.to_step(Step::Commit); return false; } + let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed"); let proposer = proposal.verify().expect("block went through full verification; this Engine tries verify; qed"); debug!(target: "engine", "Received a new proposal {:?} from {}.", proposal.vote_step, proposer); if self.is_view(&proposal) { @@ -647,6 +643,10 @@ impl Engine for Tendermint { } fn register_client(&self, client: Weak) { + use client::BlockChainClient; + if let Some(c) = client.upgrade() { + self.height.store(c.chain_info().best_block_number as usize + 1, AtomicOrdering::SeqCst); + } *self.client.write() = Some(client.clone()); self.validators.register_contract(client); } @@ -825,6 +825,7 @@ mod tests { let vote_info = message_info_rlp(&VoteStep::new(2, 0, Step::Precommit), Some(header.bare_hash())); let signature1 = tap.sign(proposer, None, vote_info.sha3()).unwrap(); + seal[1] = ::rlp::NULL_RLP.to_vec(); seal[2] = ::rlp::encode_list(&vec![H520::from(signature1.clone())]).to_vec(); header.set_seal(seal.clone()); diff --git a/ethcore/src/engines/vote_collector.rs b/ethcore/src/engines/vote_collector.rs index a2969db3a..482dd1d4b 100644 --- a/ethcore/src/engines/vote_collector.rs +++ b/ethcore/src/engines/vote_collector.rs @@ -136,30 +136,14 @@ impl VoteCollector { *guard = new_collector; } - /// Collects the signatures used to seal a block. - pub fn seal_signatures(&self, proposal_round: M::Round, commit_round: M::Round, block_hash: &H256) -> Option { - let ref bh = Some(*block_hash); - let maybe_seal = { - let guard = self.votes.read(); - guard - .get(&proposal_round) - .and_then(|c| c.block_votes.get(bh)) - .and_then(|proposals| proposals.keys().next()) - .map(|proposal| SealSignatures { - proposal: proposal.clone(), - votes: guard - .get(&commit_round) - .and_then(|c| c.block_votes.get(bh)) - .map(|precommits| precommits.keys().cloned().collect()) - .unwrap_or_else(Vec::new), - }) - .and_then(|seal| if seal.votes.is_empty() { None } else { Some(seal) }) - }; - if maybe_seal.is_some() { - // Remove messages that are no longer relevant. - self.throw_out_old(&commit_round); - } - maybe_seal + /// Collects the signatures for a given round and hash. + pub fn round_signatures(&self, round: &M::Round, block_hash: &H256) -> Vec { + let guard = self.votes.read(); + guard + .get(round) + .and_then(|c| c.block_votes.get(&Some(*block_hash))) + .map(|votes| votes.keys().cloned().collect()) + .unwrap_or_else(Vec::new) } /// Count votes which agree with the given message. @@ -275,11 +259,9 @@ mod tests { random_vote(&collector, signatures[1].clone(), commit_round.clone(), bh.clone()); // Wrong round, same signature. random_vote(&collector, signatures[1].clone(), 7, bh.clone()); - let seal = SealSignatures { - proposal: signatures[0], - votes: signatures[1..3].to_vec() - }; - assert_eq!(seal, collector.seal_signatures(propose_round, commit_round, &bh.unwrap()).unwrap()); + + assert_eq!(signatures[0..1].to_vec(), collector.round_signatures(&propose_round, &bh.unwrap())); + assert_eq!(signatures[1..3].iter().collect::>(), collector.round_signatures(&commit_round, &bh.unwrap()).iter().collect::>()); } #[test] diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 3c1717620..be2aeada7 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -2132,7 +2132,7 @@ impl ChainSync { let queue_info = io.chain().queue_info(); let is_syncing = self.status().is_syncing(queue_info); - if !is_syncing || !sealed.is_empty() { + if !is_syncing || !sealed.is_empty() || !proposed.is_empty() { trace!(target: "sync", "Propagating blocks, state={:?}", self.state); self.propagate_latest_blocks(io, sealed); self.propagate_proposed_blocks(io, proposed); diff --git a/sync/src/tests/consensus.rs b/sync/src/tests/consensus.rs index 2c45bbd28..00a7452c4 100644 --- a/sync/src/tests/consensus.rs +++ b/sync/src/tests/consensus.rs @@ -196,8 +196,8 @@ fn tendermint() { // Propose net.peer(0).chain.engine().step(); net.peer(1).chain.engine().step(); -net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 2.into())).unwrap(); - net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 2.into())).unwrap(); + net.peer(0).chain.miner().import_own_transaction(&*net.peer(0).chain, new_tx(s0.secret(), 2.into())).unwrap(); + net.peer(1).chain.miner().import_own_transaction(&*net.peer(1).chain, new_tx(s1.secret(), 2.into())).unwrap(); // Send different prevotes net.sync(); // Prevote timeout diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 328c0a24f..9d32d1951 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -277,31 +277,32 @@ impl TestNet> { started: false, disconnect_events: Vec::new(), }; - for _ in 0..n { - let spec = spec_factory(); - let client = EthcoreClient::new( - ClientConfig::default(), - &spec, - Arc::new(::util::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))), - Arc::new(Miner::with_spec_and_accounts(&spec, accounts.clone())), - IoChannel::disconnected(), - ).unwrap(); - - let ss = Arc::new(TestSnapshotService::new()); - let sync = ChainSync::new(config.clone(), &*client); - let peer = Arc::new(EthPeer { - sync: RwLock::new(sync), - snapshot_service: ss, - chain: client, - queue: RwLock::new(VecDeque::new()), - }); - peer.chain.add_notify(peer.clone()); - net.peers.push(peer); + net.add_peer(config.clone(), spec_factory(), accounts.clone()); } - net } + + pub fn add_peer(&mut self, config: SyncConfig, spec: Spec, accounts: Option>) { + let client = EthcoreClient::new( + ClientConfig::default(), + &spec, + Arc::new(::util::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))), + Arc::new(Miner::with_spec_and_accounts(&spec, accounts)), + IoChannel::disconnected(), + ).unwrap(); + + let ss = Arc::new(TestSnapshotService::new()); + let sync = ChainSync::new(config, &*client); + let peer = Arc::new(EthPeer { + sync: RwLock::new(sync), + snapshot_service: ss, + chain: client, + queue: RwLock::new(VecDeque::new()), + }); + peer.chain.add_notify(peer.clone()); + self.peers.push(peer); + } } impl

TestNet

where P: Peer {