From a47a5b19929c6654de7329dff994294a0e7e4345 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 24 Jan 2020 16:51:11 +0100 Subject: [PATCH] verification: fix race same block + misc (#11400) * ethcore: fix race in verification * verification: fix some nits * verification: refactor err type `Kind::create` * fix: tests * address grumbles * address grumbles: don't panic --- ethcore/light/src/client/mod.rs | 2 +- ethcore/src/client/client.rs | 146 +++++++++++----------- ethcore/types/src/errors/ethcore_error.rs | 2 +- ethcore/verification/src/queue/kind.rs | 41 ++++-- ethcore/verification/src/queue/mod.rs | 27 ++-- 5 files changed, 120 insertions(+), 98 deletions(-) diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index db5fa26b7..e835711e5 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -220,7 +220,7 @@ impl Client { /// Import a header to the queue for additional verification. pub fn import_header(&self, header: Header) -> EthcoreResult { - self.queue.import(header).map_err(|(_, e)| e) + self.queue.import(header).map_err(|(e, _)| e) } /// Inquire about the status of a given header. diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 831516e01..699f543ea 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -262,7 +262,7 @@ impl Importer { let block_queue = BlockQueue::new( config.queue.clone(), engine.clone(), - message_channel.clone(), + message_channel, config.verifier_type.verifying_seal() ); @@ -404,11 +404,11 @@ impl Importer { let verify_external_result = engine.verify_block_external(&header); if let Err(e) = verify_external_result { warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - return Err(e.into()); + return Err(e); }; // Enact Verified Block - let last_hashes = client.build_last_hashes(header.parent_hash()); + let last_hashes = client.build_last_hashes(*header.parent_hash()); let db = client.state_db.read().boxed_clone_canon(header.parent_hash()); let is_epoch_begin = chain.epoch_transition(parent.number(), *header.parent_hash()).is_some(); @@ -428,7 +428,7 @@ impl Importer { Ok(b) => b, Err(e) => { warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - return Err(e.into()); + return Err(e); } }; @@ -444,7 +444,7 @@ impl Importer { // Final Verification if let Err(e) = verification::verify_block_final(&header, &locked_block.header) { warn!(target: "client", "Stage 5 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - return Err(e.into()); + return Err(e); } let pending = self.check_epoch_end_signal( @@ -562,14 +562,14 @@ impl Importer { a }).collect(); - let route = chain.insert_block(&mut batch, block_data, receipts.clone(), ExtrasInsert { + let route = chain.insert_block(&mut batch, block_data, receipts, ExtrasInsert { fork_choice, is_finalized, }); client.tracedb.read().import(&mut batch, TraceImportRequest { traces: traces.into(), - block_hash: hash.clone(), + block_hash: *hash, block_number: number, enacted: route.enacted.clone(), retracted: route.retracted.len() @@ -619,10 +619,10 @@ impl Importer { Proof::WithState(with_state) => { let env_info = EnvInfo { number: header.number(), - author: header.author().clone(), + author: *header.author(), timestamp: header.timestamp(), - difficulty: header.difficulty().clone(), - last_hashes: client.build_last_hashes(header.parent_hash()), + difficulty: *header.difficulty(), + last_hashes: client.build_last_hashes(*header.parent_hash()), gas_used: U256::default(), gas_limit: u64::max_value().into(), }; @@ -636,7 +636,7 @@ impl Importer { let mut state = State::from_existing( backend, - header.state_root().clone(), + *header.state_root(), self.engine.account_start_nonce(header.number()), client.factories.clone(), ).expect("state known to be available for just-imported block; qed"); @@ -669,7 +669,7 @@ impl Importer { debug!(target: "client", "Block {} signals epoch end.", hash); - Ok(Some(PendingTransition { proof: proof })) + Ok(Some(PendingTransition { proof })) }, EpochChange::No => Ok(None), EpochChange::Unsure(_) => { @@ -777,7 +777,7 @@ impl Client { chain: RwLock::new(chain), tracedb, engine, - pruning: config.pruning.clone(), + pruning: config.pruning, snapshotting_at: AtomicU64::new(0), db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), @@ -824,7 +824,7 @@ impl Client { chain.insert_epoch_transition(&mut batch, 0, EpochTransition { block_hash: gh.hash(), block_number: 0, - proof: proof, + proof, }); client.db.read().key_value().write_buffered(batch); @@ -901,17 +901,17 @@ impl Client { author: header.author(), timestamp: header.timestamp(), difficulty: header.difficulty(), - last_hashes: self.build_last_hashes(&header.parent_hash()), + last_hashes: self.build_last_hashes(header.parent_hash()), gas_used: U256::default(), gas_limit: header.gas_limit(), } }) } - fn build_last_hashes(&self, parent_hash: &H256) -> Arc { + fn build_last_hashes(&self, parent_hash: H256) -> Arc { { let hashes = self.last_hashes.read(); - if hashes.front().map_or(false, |h| h == parent_hash) { + if hashes.front().map_or(false, |h| h == &parent_hash) { let mut res = Vec::from(hashes.clone()); res.resize(256, H256::zero()); return Arc::new(res); @@ -919,12 +919,12 @@ impl Client { } let mut last_hashes = LastHashes::new(); last_hashes.resize(256, H256::zero()); - last_hashes[0] = parent_hash.clone(); + last_hashes[0] = parent_hash; let chain = self.chain.read(); for i in 0..255 { match chain.block_details(&last_hashes[i]) { Some(details) => { - last_hashes[i + 1] = details.parent.clone(); + last_hashes[i + 1] = details.parent; }, None => break, } @@ -941,7 +941,7 @@ impl Client { let call = |a, d| { let tx = self.contract_call_tx(id, a, d); let (result, items) = self.prove_transaction(tx, id) - .ok_or_else(|| format!("Unable to make call. State unavailable?"))?; + .ok_or_else(|| "Unable to make call. State unavailable?".to_string())?; let items = items.into_iter().map(|x| x.to_vec()).collect(); Ok((result, items)) @@ -1296,7 +1296,11 @@ impl DatabaseRestore for Client { impl BlockChainReset for Client { fn reset(&self, num: u32) -> Result<(), String> { if num as u64 > self.pruning_history() { - return Err(format!("Attempting to reset the chain {} blocks back failed: state is pruned (max available: {})", num, self.pruning_history()).into()) + return Err( + format!("Attempting to reset the chain {} blocks back failed: state is pruned (max available: {})", + num, + self.pruning_history() + )); } else if num == 0 { return Err("0 is an invalid number of blocks to reset".into()) } @@ -1450,26 +1454,28 @@ impl ImportBlock for Client { } let raw = if self.importer.block_queue.is_empty() { - Some(( - unverified.bytes.clone(), - unverified.header.hash(), - *unverified.header.difficulty(), - )) - } else { None }; + Some((unverified.bytes.clone(), *unverified.header.difficulty())) + } else { + None + }; match self.importer.block_queue.import(unverified) { Ok(hash) => { - if let Some((raw, hash, difficulty)) = raw { - self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty)); + if let Some((bytes, difficulty)) = raw { + self.notify(move |n| n.block_pre_import(&bytes, &hash, &difficulty)); } Ok(hash) }, // we only care about block errors (not import errors) - Err((block, EthcoreError::Block(err))) => { - self.importer.bad_blocks.report(block.bytes, format!("{:?}", err)); - return Err(EthcoreError::Block(err)) + Err((EthcoreError::Block(e), Some(input))) => { + self.importer.bad_blocks.report(input.bytes, e.to_string()); + Err(EthcoreError::Block(e)) }, - Err((_, e)) => Err(e), + Err((EthcoreError::Block(e), None)) => { + error!(target: "client", "BlockError {} detected but it was missing raw_bytes of the block", e); + Err(EthcoreError::Block(e)) + } + Err((e, _input)) => Err(e), } } @@ -1497,10 +1503,10 @@ impl Call for Client { fn call(&self, transaction: &SignedTransaction, analytics: CallAnalytics, state: &mut Self::State, header: &Header) -> Result { let env_info = EnvInfo { number: header.number(), - author: header.author().clone(), + author: *header.author(), timestamp: header.timestamp(), - difficulty: header.difficulty().clone(), - last_hashes: self.build_last_hashes(header.parent_hash()), + difficulty: *header.difficulty(), + last_hashes: self.build_last_hashes(*header.parent_hash()), gas_used: U256::default(), gas_limit: U256::max_value(), }; @@ -1512,10 +1518,10 @@ impl Call for Client { fn call_many(&self, transactions: &[(SignedTransaction, CallAnalytics)], state: &mut Self::State, header: &Header) -> Result, CallError> { let mut env_info = EnvInfo { number: header.number(), - author: header.author().clone(), + author: *header.author(), timestamp: header.timestamp(), - difficulty: header.difficulty().clone(), - last_hashes: self.build_last_hashes(header.parent_hash()), + difficulty: *header.difficulty(), + last_hashes: self.build_last_hashes(*header.parent_hash()), gas_used: U256::default(), gas_limit: U256::max_value(), }; @@ -1539,10 +1545,10 @@ impl Call for Client { let env_info = EnvInfo { number: header.number(), - author: header.author().clone(), + author: *header.author(), timestamp: header.timestamp(), - difficulty: header.difficulty().clone(), - last_hashes: self.build_last_hashes(header.parent_hash()), + difficulty: *header.difficulty(), + last_hashes: self.build_last_hashes(*header.parent_hash()), gas_used: U256::default(), gas_limit: max, }; @@ -1634,7 +1640,7 @@ impl BlockChainClient for Client { let address = self.transaction_address(id).ok_or_else(|| CallError::TransactionNotFound)?; let block = BlockId::Hash(address.block_hash); - const PROOF: &'static str = "The transaction address contains a valid index within block; qed"; + const PROOF: &str = "The transaction address contains a valid index within block; qed"; Ok(self.replay_block_transactions(block, analytics)?.nth(address.index).expect(PROOF).1) } @@ -1645,8 +1651,8 @@ impl BlockChainClient for Client { let txs = body.transactions(); let engine = self.engine.clone(); - const PROOF: &'static str = "Transactions fetched from blockchain; blockchain transactions are valid; qed"; - const EXECUTE_PROOF: &'static str = "Transaction replayed; qed"; + const PROOF: &str = "Transactions fetched from blockchain; blockchain transactions are valid; qed"; + const EXECUTE_PROOF: &str = "Transaction replayed; qed"; Ok(Box::new(txs.into_iter() .map(move |t| { @@ -1660,9 +1666,7 @@ impl BlockChainClient for Client { } fn mode(&self) -> Mode { - let r = self.mode.lock().clone().into(); - trace!(target: "mode", "Asked for mode = {:?}. returning {:?}", &*self.mode.lock(), r); - r + self.mode.lock().clone() } fn queue_info(&self) -> BlockQueueInfo { @@ -1682,7 +1686,7 @@ impl BlockChainClient for Client { } { let mut mode = self.mode.lock(); - *mode = new_mode.clone().into(); + *mode = new_mode.clone(); trace!(target: "mode", "Mode now {:?}", &*mode); if let Some(ref mut f) = *self.on_user_defaults_change.lock() { trace!(target: "mode", "Making callback..."); @@ -1972,17 +1976,17 @@ impl BlockChainClient for Client { // pending logs themselves). let from = match self.block_number_ref(&filter.from_block) { Some(val) if val <= chain.best_block_number() => val, - _ => return Err(filter.from_block.clone()), + _ => return Err(filter.from_block), }; let to = match self.block_number_ref(&filter.to_block) { Some(val) if val <= chain.best_block_number() => val, - _ => return Err(filter.to_block.clone()), + _ => return Err(filter.to_block), }; // If from is greater than to, then the current bloom filter behavior is to just return empty // result. There's no point to continue here. if from > to { - return Err(filter.to_block.clone()); + return Err(filter.to_block); } chain.blocks_with_bloom(&filter.bloom_possibilities(), from, to) @@ -1993,7 +1997,7 @@ impl BlockChainClient for Client { // Otherwise, we use a slower version that finds a link between from_block and to_block. let from_hash = match Self::block_hash(&chain, filter.from_block) { Some(val) => val, - None => return Err(filter.from_block.clone()), + None => return Err(filter.from_block), }; let from_number = match chain.block_number(&from_hash) { Some(val) => val, @@ -2001,7 +2005,7 @@ impl BlockChainClient for Client { }; let to_hash = match Self::block_hash(&chain, filter.to_block) { Some(val) => val, - None => return Err(filter.to_block.clone()), + None => return Err(filter.to_block), }; let blooms = filter.bloom_possibilities(); @@ -2103,7 +2107,7 @@ impl BlockChainClient for Client { } fn last_hashes(&self) -> LastHashes { - (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() + self.build_last_hashes(self.chain.read().best_block_hash()).to_vec() } fn transactions_to_propagate(&self) -> Vec> { @@ -2338,7 +2342,7 @@ impl PrepareOpenBlock for Client { self.tracedb.read().tracing_enabled(), self.state_db.read().boxed_clone_canon(&h), &best_header, - self.build_last_hashes(&h), + self.build_last_hashes(h), author, gas_range_target, extra_data, @@ -2388,7 +2392,7 @@ impl ImportSealedBlock for Client { block.rlp_bytes(), format!("Detected an issue with locally sealed block: {}", e), ); - return Err(e.into()); + return Err(e); } // scope for self.import_lock @@ -2517,12 +2521,12 @@ impl ProvingBlockChainClient for Client { _ => return None, }; - env_info.gas_limit = transaction.gas.clone(); + env_info.gas_limit = transaction.gas; let mut jdb = self.state_db.read().journal_db().boxed_clone(); executive_state::prove_transaction_virtual( jdb.as_hash_db_mut(), - header.state_root().clone(), + header.state_root(), &transaction, self.engine.machine(), &env_info, @@ -2758,12 +2762,12 @@ fn transaction_receipt( from: sender, to: match tx.action { Action::Create => None, - Action::Call(ref address) => Some(address.clone().into()) + Action::Call(ref address) => Some(*address) }, - transaction_hash: transaction_hash, - transaction_index: transaction_index, - block_hash: block_hash, - block_number: block_number, + transaction_hash, + transaction_index, + block_hash, + block_number, cumulative_gas_used: receipt.gas_used, gas_used: receipt.gas_used - prior_gas_used, contract_address: match tx.action { @@ -2772,10 +2776,10 @@ fn transaction_receipt( }, logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry { entry: log, - block_hash: block_hash, - block_number: block_number, - transaction_hash: transaction_hash, - transaction_index: transaction_index, + block_hash, + block_number, + transaction_hash, + transaction_index, transaction_log_index: i, log_index: prior_no_of_logs + i, }).collect(), @@ -2921,8 +2925,8 @@ mod tests { let tx1 = raw_tx.clone().sign(secret, None); let transaction = LocalizedTransaction { signed: tx1.clone().into(), - block_number: block_number, - block_hash: block_hash, + block_number, + block_hash, transaction_index: 1, cached_sender: Some(tx1.sender()), }; @@ -2937,7 +2941,7 @@ mod tests { }]; let receipt = Receipt { outcome: TransactionOutcome::StateRoot(state_root), - gas_used: gas_used, + gas_used, log_bloom: Default::default(), logs: logs.clone(), }; diff --git a/ethcore/types/src/errors/ethcore_error.rs b/ethcore/types/src/errors/ethcore_error.rs index 93be62e69..1a99c65dc 100644 --- a/ethcore/types/src/errors/ethcore_error.rs +++ b/ethcore/types/src/errors/ethcore_error.rs @@ -21,7 +21,7 @@ use derive_more::{Display, From}; use ethereum_types::{U256, U512}; use ethtrie::TrieError; use parity_snappy::InvalidInput; -use parity_crypto::publickey::{Error as EthPublicKeyCryptoError}; +use parity_crypto::publickey::Error as EthPublicKeyCryptoError; use errors::{BlockError, EngineError, ImportError, SnapshotError}; use transaction::Error as TransactionError; diff --git a/ethcore/verification/src/queue/kind.rs b/ethcore/verification/src/queue/kind.rs index acea47237..546df2d5a 100644 --- a/ethcore/verification/src/queue/kind.rs +++ b/ethcore/verification/src/queue/kind.rs @@ -62,7 +62,14 @@ pub trait Kind: 'static + Sized + Send + Sync { type Verified: Sized + Send + BlockLike + MallocSizeOf; /// Attempt to create the `Unverified` item from the input. - fn create(input: Self::Input, engine: &dyn Engine, check_seal: bool) -> Result; + /// + /// The return type is quite complex because in some scenarios the input + /// is needed (typically for BlockError) to get the raw block bytes without cloning them + fn create( + input: Self::Input, + engine: &dyn Engine, + check_seal: bool + ) -> Result)>; /// Attempt to verify the `Unverified` item using the given engine. fn verify(unverified: Self::Unverified, engine: &dyn Engine, check_seal: bool) -> Result; @@ -91,16 +98,20 @@ pub mod blocks { type Unverified = Unverified; type Verified = PreverifiedBlock; - fn create(input: Self::Input, engine: &dyn Engine, check_seal: bool) -> Result { + fn create( + input: Self::Input, + engine: &dyn Engine, + check_seal: bool + ) -> Result)> { match verify_block_basic(&input, engine, check_seal) { Ok(()) => Ok(input), Err(Error::Block(BlockError::TemporarilyInvalid(oob))) => { debug!(target: "client", "Block received too early {}: {:?}", input.hash(), oob); - Err((input, BlockError::TemporarilyInvalid(oob).into())) + Err((BlockError::TemporarilyInvalid(oob).into(), Some(input))) }, Err(e) => { warn!(target: "client", "Stage 1 block verification failed for {}: {:?}", input.hash(), e); - Err((input, e)) + Err((e, Some(input))) } } } @@ -127,11 +138,11 @@ pub mod blocks { } fn parent_hash(&self) -> H256 { - self.header.parent_hash().clone() + *self.header.parent_hash() } fn difficulty(&self) -> U256 { - self.header.difficulty().clone() + *self.header.difficulty() } } @@ -145,11 +156,11 @@ pub mod blocks { } fn parent_hash(&self) -> H256 { - self.header.parent_hash().clone() + *self.header.parent_hash() } fn difficulty(&self) -> U256 { - self.header.difficulty().clone() + *self.header.difficulty() } } } @@ -170,8 +181,8 @@ pub mod headers { impl BlockLike for Header { fn hash(&self) -> H256 { self.hash() } fn raw_hash(&self) -> H256 { self.hash() } - fn parent_hash(&self) -> H256 { self.parent_hash().clone() } - fn difficulty(&self) -> U256 { self.difficulty().clone() } + fn parent_hash(&self) -> H256 { *self.parent_hash() } + fn difficulty(&self) -> U256 { *self.difficulty() } } /// A mode for verifying headers. @@ -182,19 +193,23 @@ pub mod headers { type Unverified = Header; type Verified = Header; - fn create(input: Self::Input, engine: &dyn Engine, check_seal: bool) -> Result { + fn create( + input: Self::Input, + engine: &dyn Engine, + check_seal: bool + ) -> Result)> { let res = verify_header_params(&input, engine, check_seal) .and_then(|_| verify_header_time(&input)); match res { Ok(_) => Ok(input), - Err(err) => Err((input, err)) + Err(e) => Err((e, Some(input))), } } fn verify(unverified: Self::Unverified, engine: &dyn Engine, check_seal: bool) -> Result { match check_seal { - true => engine.verify_block_unordered(&unverified,).map(|_| unverified), + true => engine.verify_block_unordered(&unverified).map(|_| unverified), false => Ok(unverified), } } diff --git a/ethcore/verification/src/queue/mod.rs b/ethcore/verification/src/queue/mod.rs index 64422573f..31d205c79 100644 --- a/ethcore/verification/src/queue/mod.rs +++ b/ethcore/verification/src/queue/mod.rs @@ -467,30 +467,33 @@ impl VerificationQueue { } /// Add a block to the queue. - pub fn import(&self, input: K::Input) -> Result { + // + // TODO: #11403 - rework `EthcoreError::Block` to include raw bytes of the error cause + pub fn import(&self, input: K::Input) -> Result)> { let hash = input.hash(); let raw_hash = input.raw_hash(); { if self.processing.read().contains_key(&hash) { - return Err((input, Error::Import(ImportError::AlreadyQueued).into())); + return Err((Error::Import(ImportError::AlreadyQueued), Some(input))); } let mut bad = self.verification.bad.lock(); if bad.contains(&hash) || bad.contains(&raw_hash) { - return Err((input, Error::Import(ImportError::KnownBad).into())); + return Err((Error::Import(ImportError::KnownBad), Some(input))); } if bad.contains(&input.parent_hash()) { bad.insert(hash); - return Err((input, Error::Import(ImportError::KnownBad).into())); + return Err((Error::Import(ImportError::KnownBad), Some(input))); } } match K::create(input, &*self.engine, self.verification.check_seal) { Ok(item) => { + if self.processing.write().insert(hash, item.difficulty()).is_some() { + return Err((Error::Import(ImportError::AlreadyQueued), None)); + } self.verification.sizes.unverified.fetch_add(item.malloc_size_of(), AtomicOrdering::SeqCst); - - self.processing.write().insert(hash, item.difficulty()); { let mut td = self.total_difficulty.write(); *td = *td + item.difficulty(); @@ -499,7 +502,7 @@ impl VerificationQueue { self.more_to_verify.notify_all(); Ok(hash) }, - Err((input, err)) => { + Err((err, input)) => { match err { // Don't mark future blocks as bad. Error::Block(BlockError::TemporarilyInvalid(_)) => {}, @@ -517,7 +520,7 @@ impl VerificationQueue { self.verification.bad.lock().insert(hash); } } - Err((input, err)) + Err((err, input)) } } } @@ -582,7 +585,7 @@ impl VerificationQueue { let count = cmp::min(max, verified.len()); let result = verified.drain(..count).collect::>(); - let drained_size = result.iter().map(MallocSizeOfExt::malloc_size_of).fold(0, |a, c| a + c); + let drained_size = result.iter().map(MallocSizeOfExt::malloc_size_of).sum(); self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst); self.ready_signal.reset(); @@ -636,7 +639,7 @@ impl VerificationQueue { /// Get the total difficulty of all the blocks in the queue. pub fn total_difficulty(&self) -> U256 { - self.total_difficulty.read().clone() + *self.total_difficulty.read() } /// Get the current number of working verifiers. @@ -806,9 +809,9 @@ mod tests { let duplicate_import = queue.import(new_unverified(get_good_dummy_block())); match duplicate_import { - Err((_, e)) => { + Err(e) => { match e { - EthcoreError::Import(ImportError::AlreadyQueued) => {}, + (EthcoreError::Import(ImportError::AlreadyQueued), _) => {}, _ => { panic!("must return AlreadyQueued error"); } } }