From 9d4bee4922d558e7ea2f88c1b062c500a9589f9a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 27 Sep 2016 16:50:24 +0200 Subject: [PATCH] make block queue into a more generic verification queue and fix block heap size calculation (#2095) * move block queue to own module, a couple readability changes * make block queue generic over verifiable data also fixes heap size calculation * make block queue into a more generic verification queue * some module reoganization * implement header queue * clean up verification error messages --- ethcore/src/client/client.rs | 22 +- ethcore/src/client/config.rs | 6 +- ethcore/src/client/mod.rs | 2 +- ethcore/src/client/test_client.rs | 6 +- ethcore/src/client/traits.rs | 2 +- ethcore/src/header.rs | 6 + ethcore/src/lib.rs | 1 - ethcore/src/types/block_queue_info.rs | 34 --- ethcore/src/types/block_status.rs | 11 + ethcore/src/types/mod.rs.in | 2 +- ethcore/src/types/transaction.rs | 14 +- ethcore/src/types/verification_queue_info.rs | 53 ++++ ethcore/src/verification/mod.rs | 2 + ethcore/src/verification/queue/kind.rs | 182 ++++++++++++ .../queue/mod.rs} | 263 +++++++++--------- ethcore/src/verification/verification.rs | 14 +- 16 files changed, 434 insertions(+), 186 deletions(-) delete mode 100644 ethcore/src/types/block_queue_info.rs create mode 100644 ethcore/src/types/verification_queue_info.rs create mode 100644 ethcore/src/verification/queue/kind.rs rename ethcore/src/{block_queue.rs => verification/queue/mod.rs} (65%) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 445ec37f7..befdc32c3 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -25,13 +25,12 @@ use time::precise_time_ns; use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock}; use util::journaldb::{self, JournalDB}; use util::{U256, H256, Address, H2048, Uint}; -use util::sha3::*; use util::TrieFactory; use util::kvdb::*; // other use io::*; -use views::{BlockView, HeaderView, BodyView}; +use views::{HeaderView, BodyView}; use error::{ImportError, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError}; use header::BlockNumber; use state::State; @@ -47,7 +46,7 @@ use transaction::{LocalizedTransaction, SignedTransaction, Action}; use blockchain::extras::TransactionAddress; use types::filter::Filter; use log_entry::LocalizedLogEntry; -use block_queue::{BlockQueue, BlockQueueInfo}; +use verification::queue::{BlockQueue, QueueInfo as BlockQueueInfo}; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; use client::{ BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient, @@ -805,7 +804,7 @@ impl BlockChainClient for Client { let chain = self.chain.read(); match Self::block_hash(&chain, id) { Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain, - Some(hash) => self.block_queue.block_status(&hash), + Some(hash) => self.block_queue.status(&hash).into(), None => BlockStatus::Unknown } } @@ -917,16 +916,21 @@ impl BlockChainClient for Client { } fn import_block(&self, bytes: Bytes) -> Result { + use verification::queue::kind::HasHash; + use verification::queue::kind::blocks::Unverified; + + // create unverified block here so the `sha3` calculation can be cached. + let unverified = Unverified::new(bytes); + { - let header = BlockView::new(&bytes).header_view(); - if self.chain.read().is_known(&header.sha3()) { + if self.chain.read().is_known(&unverified.hash()) { return Err(BlockImportError::Import(ImportError::AlreadyInChain)); } - if self.block_status(BlockID::Hash(header.parent_hash())) == BlockStatus::Unknown { - return Err(BlockImportError::Block(BlockError::UnknownParent(header.parent_hash()))); + if self.block_status(BlockID::Hash(unverified.parent_hash())) == BlockStatus::Unknown { + return Err(BlockImportError::Block(BlockError::UnknownParent(unverified.parent_hash()))); } } - Ok(try!(self.block_queue.import_block(bytes))) + Ok(try!(self.block_queue.import(unverified))) } fn queue_info(&self) -> BlockQueueInfo { diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 0146293df..399132108 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -16,11 +16,11 @@ use std::str::FromStr; pub use std::time::Duration; -pub use block_queue::BlockQueueConfig; pub use blockchain::Config as BlockChainConfig; pub use trace::Config as TraceConfig; pub use evm::VMType; -pub use verification::VerifierType; + +use verification::{VerifierType, QueueConfig}; use util::{journaldb, CompactionProfile}; use util::trie::TrieSpec; @@ -84,7 +84,7 @@ impl Default for Mode { #[derive(Debug, PartialEq, Default)] pub struct ClientConfig { /// Block queue configuration. - pub queue: BlockQueueConfig, + pub queue: QueueConfig, /// Blockchain configuration. pub blockchain: BlockChainConfig, /// Trace configuration. diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index a5ff89c47..3bbf9011b 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -23,7 +23,7 @@ mod trace; mod client; pub use self::client::*; -pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, VMType}; +pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockChainConfig, VMType}; pub use self::error::Error; pub use types::ids::*; pub use self::test_client::{TestBlockChainClient, EachBlockWith}; diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 0abc50bfb..480d49dc9 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -37,7 +37,7 @@ use evm::{Factory as EvmFactory, VMType}; use miner::{Miner, MinerService, TransactionImportResult}; use spec::Spec; -use block_queue::BlockQueueInfo; +use verification::queue::QueueInfo; use block::{OpenBlock, SealedBlock}; use executive::Executed; use error::CallError; @@ -544,8 +544,8 @@ impl BlockChainClient for TestBlockChainClient { Ok(h) } - fn queue_info(&self) -> BlockQueueInfo { - BlockQueueInfo { + fn queue_info(&self) -> QueueInfo { + QueueInfo { verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed), unverified_queue_size: 0, verifying_queue_size: 0, diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 45f7322fd..4da84bcbb 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; use util::{U256, Address, H256, H2048, Bytes, Itertools}; use blockchain::TreeRoute; -use block_queue::BlockQueueInfo; +use verification::queue::QueueInfo as BlockQueueInfo; use block::{OpenBlock, SealedBlock}; use header::{BlockNumber}; use transaction::{LocalizedTransaction, SignedTransaction}; diff --git a/ethcore/src/header.rs b/ethcore/src/header.rs index 04581fac9..7d86cfd61 100644 --- a/ethcore/src/header.rs +++ b/ethcore/src/header.rs @@ -295,6 +295,12 @@ impl Encodable for Header { } } +impl HeapSizeOf for Header { + fn heap_size_of_children(&self) -> usize { + self.extra_data.heap_size_of_children() + self.seal.heap_size_of_children() + } +} + #[cfg(test)] mod tests { use rustc_serialize::hex::FromHex; diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 5344381f0..22b5a514a 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -119,7 +119,6 @@ pub extern crate ethstore; pub mod account_provider; pub mod engines; pub mod block; -pub mod block_queue; pub mod client; pub mod error; pub mod ethereum; diff --git a/ethcore/src/types/block_queue_info.rs b/ethcore/src/types/block_queue_info.rs deleted file mode 100644 index d299258ce..000000000 --- a/ethcore/src/types/block_queue_info.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -//! Block queue info types - -/// Block queue status -#[derive(Debug, Binary)] -pub struct BlockQueueInfo { - /// Number of queued blocks pending verification - pub unverified_queue_size: usize, - /// Number of verified queued blocks pending import - pub verified_queue_size: usize, - /// Number of blocks being verified - pub verifying_queue_size: usize, - /// Configured maximum number of blocks in the queue - pub max_queue_size: usize, - /// Configured maximum number of bytes to use - pub max_mem_use: usize, - /// Heap memory used in bytes - pub mem_used: usize, -} diff --git a/ethcore/src/types/block_status.rs b/ethcore/src/types/block_status.rs index bf8218e47..857daae10 100644 --- a/ethcore/src/types/block_status.rs +++ b/ethcore/src/types/block_status.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . //! Block status description module +use verification::queue::Status as QueueStatus; /// General block status #[derive(Debug, Eq, PartialEq, Binary)] @@ -28,3 +29,13 @@ pub enum BlockStatus { /// Unknown. Unknown, } + +impl From for BlockStatus { + fn from(status: QueueStatus) -> Self { + match status { + QueueStatus::Queued => BlockStatus::Queued, + QueueStatus::Bad => BlockStatus::Bad, + QueueStatus::Unknown => BlockStatus::Unknown, + } + } +} \ No newline at end of file diff --git a/ethcore/src/types/mod.rs.in b/ethcore/src/types/mod.rs.in index 0537fe056..32c7faabe 100644 --- a/ethcore/src/types/mod.rs.in +++ b/ethcore/src/types/mod.rs.in @@ -25,7 +25,7 @@ pub mod executed; pub mod block_status; pub mod account_diff; pub mod state_diff; -pub mod block_queue_info; +pub mod verification_queue_info; pub mod filter; pub mod trace_filter; pub mod call_analytics; diff --git a/ethcore/src/types/transaction.rs b/ethcore/src/types/transaction.rs index 386b85f7e..f32a2f4dd 100644 --- a/ethcore/src/types/transaction.rs +++ b/ethcore/src/types/transaction.rs @@ -20,7 +20,7 @@ use std::ops::Deref; use std::cell::*; use rlp::*; use util::sha3::Hashable; -use util::{H256, Address, U256, Bytes}; +use util::{H256, Address, U256, Bytes, HeapSizeOf}; use ethkey::{Signature, sign, Secret, Public, recover, public_to_address, Error as EthkeyError}; use error::*; use evm::Schedule; @@ -86,6 +86,12 @@ impl Transaction { } } +impl HeapSizeOf for Transaction { + fn heap_size_of_children(&self) -> usize { + self.data.heap_size_of_children() + } +} + impl From for SignedTransaction { fn from(t: ethjson::state::Transaction) -> Self { let to: Option = t.to.into(); @@ -251,6 +257,12 @@ impl Encodable for SignedTransaction { fn rlp_append(&self, s: &mut RlpStream) { self.rlp_append_sealed_transaction(s) } } +impl HeapSizeOf for SignedTransaction { + fn heap_size_of_children(&self) -> usize { + self.unsigned.heap_size_of_children() + } +} + impl SignedTransaction { /// Append object with a signature into RLP stream pub fn rlp_append_sealed_transaction(&self, s: &mut RlpStream) { diff --git a/ethcore/src/types/verification_queue_info.rs b/ethcore/src/types/verification_queue_info.rs new file mode 100644 index 000000000..35954e7a9 --- /dev/null +++ b/ethcore/src/types/verification_queue_info.rs @@ -0,0 +1,53 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Verification queue info types + +/// Verification queue status +#[derive(Debug, Binary)] +pub struct VerificationQueueInfo { + /// Number of queued items pending verification + pub unverified_queue_size: usize, + /// Number of verified queued items pending import + pub verified_queue_size: usize, + /// Number of items being verified + pub verifying_queue_size: usize, + /// Configured maximum number of items in the queue + pub max_queue_size: usize, + /// Configured maximum number of bytes to use + pub max_mem_use: usize, + /// Heap memory used in bytes + pub mem_used: usize, +} + +impl VerificationQueueInfo { + /// The total size of the queues. + pub fn total_queue_size(&self) -> usize { self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size } + + /// The size of the unverified and verifying queues. + pub fn incomplete_queue_size(&self) -> usize { self.unverified_queue_size + self.verifying_queue_size } + + /// Indicates that queue is full + pub fn is_full(&self) -> bool { + self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size > self.max_queue_size || + self.mem_used > self.max_mem_use + } + + /// Indicates that queue is empty + pub fn is_empty(&self) -> bool { + self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size == 0 + } +} \ No newline at end of file diff --git a/ethcore/src/verification/mod.rs b/ethcore/src/verification/mod.rs index ed9c8ebc7..ed9e0ec4c 100644 --- a/ethcore/src/verification/mod.rs +++ b/ethcore/src/verification/mod.rs @@ -16,6 +16,7 @@ pub mod verification; pub mod verifier; +pub mod queue; mod canon_verifier; mod noop_verifier; @@ -23,6 +24,7 @@ pub use self::verification::*; pub use self::verifier::Verifier; pub use self::canon_verifier::CanonVerifier; pub use self::noop_verifier::NoopVerifier; +pub use self::queue::{BlockQueue, Config as QueueConfig, VerificationQueue, QueueInfo}; /// Verifier type. #[derive(Debug, PartialEq, Clone)] diff --git a/ethcore/src/verification/queue/kind.rs b/ethcore/src/verification/queue/kind.rs new file mode 100644 index 000000000..7585f1e6d --- /dev/null +++ b/ethcore/src/verification/queue/kind.rs @@ -0,0 +1,182 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Definition of valid items for the verification queue. + +use engines::Engine; +use error::Error; + +use util::{HeapSizeOf, H256}; + +pub use self::blocks::Blocks; +pub use self::headers::Headers; + +/// Something which can produce a hash and a parent hash. +pub trait HasHash { + /// Get the hash of this item. + fn hash(&self) -> H256; + + /// Get the hash of this item's parent. + fn parent_hash(&self) -> H256; +} + +/// Defines transitions between stages of verification. +/// +/// It starts with a fallible transformation from an "input" into the unverified item. +/// This consists of quick, simply done checks as well as extracting particular data. +/// +/// Then, there is a `verify` function which performs more expensive checks and +/// produces the verified output. +/// +/// For correctness, the hashes produced by each stage of the pipeline should be +/// consistent. +pub trait Kind: 'static + Sized + Send + Sync { + /// The first stage: completely unverified. + type Input: Sized + Send + HasHash + HeapSizeOf; + + /// The second stage: partially verified. + type Unverified: Sized + Send + HasHash + HeapSizeOf; + + /// The third stage: completely verified. + type Verified: Sized + Send + HasHash + HeapSizeOf; + + /// Attempt to create the `Unverified` item from the input. + fn create(input: Self::Input, engine: &Engine) -> Result; + + /// Attempt to verify the `Unverified` item using the given engine. + fn verify(unverified: Self::Unverified, engine: &Engine) -> Result; +} + +/// The blocks verification module. +pub mod blocks { + use super::{Kind, HasHash}; + + use engines::Engine; + use error::Error; + use header::Header; + use verification::{PreverifiedBlock, verify_block_basic, verify_block_unordered}; + + use util::{Bytes, HeapSizeOf, H256}; + + /// A mode for verifying blocks. + pub struct Blocks; + + impl Kind for Blocks { + type Input = Unverified; + type Unverified = Unverified; + type Verified = PreverifiedBlock; + + fn create(input: Self::Input, engine: &Engine) -> Result { + match verify_block_basic(&input.header, &input.bytes, engine) { + Ok(()) => Ok(input), + Err(e) => { + warn!(target: "client", "Stage 1 block verification failed for {}: {:?}", input.hash(), e); + Err(e) + } + } + } + + fn verify(un: Self::Unverified, engine: &Engine) -> Result { + let hash = un.hash(); + match verify_block_unordered(un.header, un.bytes, engine) { + Ok(verified) => Ok(verified), + Err(e) => { + warn!(target: "client", "Stage 2 block verification failed for {}: {:?}", hash, e); + Err(e) + } + } + } + } + + /// An unverified block. + pub struct Unverified { + header: Header, + bytes: Bytes, + } + + impl Unverified { + /// Create an `Unverified` from raw bytes. + pub fn new(bytes: Bytes) -> Self { + use views::BlockView; + + let header = BlockView::new(&bytes).header(); + Unverified { + header: header, + bytes: bytes, + } + } + } + + impl HeapSizeOf for Unverified { + fn heap_size_of_children(&self) -> usize { + self.header.heap_size_of_children() + self.bytes.heap_size_of_children() + } + } + + impl HasHash for Unverified { + fn hash(&self) -> H256 { + self.header.hash() + } + + fn parent_hash(&self) -> H256 { + self.header.parent_hash().clone() + } + } + + impl HasHash for PreverifiedBlock { + fn hash(&self) -> H256 { + self.header.hash() + } + + fn parent_hash(&self) -> H256 { + self.header.parent_hash().clone() + } + } +} + +/// Verification for headers. +pub mod headers { + use super::{Kind, HasHash}; + + use engines::Engine; + use error::Error; + use header::Header; + use verification::verify_header_params; + + use util::hash::H256; + + impl HasHash for Header { + fn hash(&self) -> H256 { self.hash() } + fn parent_hash(&self) -> H256 { self.parent_hash().clone() } + } + + /// A mode for verifying headers. + pub struct Headers; + + impl Kind for Headers { + type Input = Header; + type Unverified = Header; + type Verified = Header; + + fn create(input: Self::Input, engine: &Engine) -> Result { + verify_header_params(&input, engine).map(|_| input) + } + + fn verify(unverified: Self::Unverified, engine: &Engine) -> Result { + engine.verify_block_unordered(&unverified, None).map(|_| unverified) + } + } +} \ No newline at end of file diff --git a/ethcore/src/block_queue.rs b/ethcore/src/verification/queue/mod.rs similarity index 65% rename from ethcore/src/block_queue.rs rename to ethcore/src/verification/queue/mod.rs index c441136fd..3f81d53ce 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -16,30 +16,35 @@ //! A queue of blocks. Sits between network or other I/O and the `BlockChain`. //! Sorts them ready for blockchain insertion. + use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use util::*; use io::*; -use verification::*; use error::*; use engines::Engine; -use views::*; -use header::*; use service::*; -use client::BlockStatus; -pub use types::block_queue_info::BlockQueueInfo; +use self::kind::{HasHash, Kind}; -known_heap_size!(0, UnverifiedBlock, VerifyingBlock, PreverifiedBlock); +pub use types::verification_queue_info::VerificationQueueInfo as QueueInfo; + +pub mod kind; const MIN_MEM_LIMIT: usize = 16384; const MIN_QUEUE_LIMIT: usize = 512; -/// Block queue configuration +/// Type alias for block queue convenience. +pub type BlockQueue = VerificationQueue; + +/// Type alias for header queue convenience. +pub type HeaderQueue = VerificationQueue; + +/// Verification queue configuration #[derive(Debug, PartialEq, Clone)] -pub struct BlockQueueConfig { - /// Maximum number of blocks to keep in unverified queue. +pub struct Config { + /// Maximum number of items to keep in unverified queue. /// When the limit is reached, is_full returns true. pub max_queue_size: usize, /// Maximum heap memory to use. @@ -47,42 +52,44 @@ pub struct BlockQueueConfig { pub max_mem_use: usize, } -impl Default for BlockQueueConfig { +impl Default for Config { fn default() -> Self { - BlockQueueConfig { + Config { max_queue_size: 30000, max_mem_use: 50 * 1024 * 1024, } } } +/// An item which is in the process of being verified. +pub struct Verifying { + hash: H256, + output: Option, +} -impl BlockQueueInfo { - /// The total size of the queues. - pub fn total_queue_size(&self) -> usize { self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size } - - /// The size of the unverified and verifying queues. - pub fn incomplete_queue_size(&self) -> usize { self.unverified_queue_size + self.verifying_queue_size } - - /// Indicates that queue is full - pub fn is_full(&self) -> bool { - self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size > self.max_queue_size || - self.mem_used > self.max_mem_use - } - - /// Indicates that queue is empty - pub fn is_empty(&self) -> bool { - self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size == 0 +impl HeapSizeOf for Verifying { + fn heap_size_of_children(&self) -> usize { + self.output.heap_size_of_children() } } -/// A queue of blocks. Sits between network or other I/O and the `BlockChain`. -/// Sorts them ready for blockchain insertion. -pub struct BlockQueue { +/// Status of items in the queue. +pub enum Status { + /// Currently queued. + Queued, + /// Known to be bad. + Bad, + /// Unknown. + Unknown, +} + +/// A queue of items to be verified. Sits between network or other I/O and the `BlockChain`. +/// Keeps them in the same order as inserted, minus invalid items. +pub struct VerificationQueue { panic_handler: Arc, engine: Arc, more_to_verify: Arc, - verification: Arc, + verification: Arc>, verifiers: Vec>, deleting: Arc, ready_signal: Arc, @@ -92,16 +99,6 @@ pub struct BlockQueue { max_mem_use: usize, } -struct UnverifiedBlock { - header: Header, - bytes: Bytes, -} - -struct VerifyingBlock { - hash: H256, - block: Option, -} - struct QueueSignal { deleting: Arc, signalled: AtomicBool, @@ -128,19 +125,19 @@ impl QueueSignal { } } -struct Verification { +struct Verification { // All locks must be captured in the order declared here. - unverified: Mutex>, - verified: Mutex>, - verifying: Mutex>, + unverified: Mutex>, + verified: Mutex>, + verifying: Mutex>>, bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, } -impl BlockQueue { +impl VerificationQueue { /// Creates a new queue instance. - pub fn new(config: BlockQueueConfig, engine: Arc, message_channel: IoChannel) -> BlockQueue { + pub fn new(config: Config, engine: Arc, message_channel: IoChannel) -> Self { let verification = Arc::new(Verification { unverified: Mutex::new(VecDeque::new()), verified: Mutex::new(VecDeque::new()), @@ -175,13 +172,13 @@ impl BlockQueue { .name(format!("Verifier #{}", i)) .spawn(move || { panic_handler.catch_panic(move || { - BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) }).unwrap() }) .expect("Error starting block verification thread") ); } - BlockQueue { + VerificationQueue { engine: engine, panic_handler: panic_handler, ready_signal: ready_signal.clone(), @@ -196,7 +193,7 @@ impl BlockQueue { } } - fn verify(verification: Arc, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify(verification: Arc>, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { let mut more_to_verify = verification.more_to_verify.lock().unwrap(); @@ -214,57 +211,66 @@ impl BlockQueue { } } - let block = { + let item = { + // acquire these locks before getting the item to verify. let mut unverified = verification.unverified.lock(); - if unverified.is_empty() { - continue; - } let mut verifying = verification.verifying.lock(); - let block = unverified.pop_front().unwrap(); - verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); - block + + let item = match unverified.pop_front() { + Some(item) => item, + None => continue, + }; + + verifying.push_back(Verifying { hash: item.hash(), output: None }); + item }; - let block_hash = block.header.hash(); - match verify_block_unordered(block.header, block.bytes, &*engine) { + let hash = item.hash(); + match K::verify(item, &*engine) { Ok(verified) => { let mut verifying = verification.verifying.lock(); - for e in verifying.iter_mut() { - if e.hash == block_hash { - e.block = Some(verified); + let mut idx = None; + for (i, e) in verifying.iter_mut().enumerate() { + if e.hash == hash { + idx = Some(i); + e.output = Some(verified); break; } } - if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash { + + if idx == Some(0) { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } }, - Err(err) => { + Err(_) => { let mut verifying = verification.verifying.lock(); let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); - bad.insert(block_hash.clone()); - verifying.retain(|e| e.hash != block_hash); - BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); - ready.set(); + + bad.insert(hash.clone()); + verifying.retain(|e| e.hash != hash); + + if verifying.front().map_or(false, |x| x.output.is_some()) { + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + ready.set(); + } } } } } - fn drain_verifying(verifying: &mut VecDeque, verified: &mut VecDeque, bad: &mut HashSet) { - while !verifying.is_empty() && verifying.front().unwrap().block.is_some() { - let block = verifying.pop_front().unwrap().block.unwrap(); - if bad.contains(block.header.parent_hash()) { - bad.insert(block.header.hash()); - } - else { - verified.push_back(block); + fn drain_verifying(verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet) { + while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { + assert!(verifying.pop_front().is_some()); + + if bad.contains(&output.parent_hash()) { + bad.insert(output.hash()); + } else { + verified.push_back(output); } } } @@ -288,21 +294,20 @@ impl BlockQueue { } } - /// Check if the block is currently in the queue - pub fn block_status(&self, hash: &H256) -> BlockStatus { + /// Check if the item is currently in the queue + pub fn status(&self, hash: &H256) -> Status { if self.processing.read().contains(hash) { - return BlockStatus::Queued; + return Status::Queued; } if self.verification.bad.lock().contains(hash) { - return BlockStatus::Bad; + return Status::Bad; } - BlockStatus::Unknown + Status::Unknown } /// Add a block to the queue. - pub fn import_block(&self, bytes: Bytes) -> ImportResult { - let header = BlockView::new(&bytes).header(); - let h = header.hash(); + pub fn import(&self, input: K::Input) -> ImportResult { + let h = input.hash(); { if self.processing.read().contains(&h) { return Err(ImportError::AlreadyQueued.into()); @@ -313,74 +318,71 @@ impl BlockQueue { return Err(ImportError::KnownBad.into()); } - if bad.contains(header.parent_hash()) { + if bad.contains(&input.parent_hash()) { bad.insert(h.clone()); return Err(ImportError::KnownBad.into()); } } - match verify_block_basic(&header, &bytes, &*self.engine) { - Ok(()) => { + match K::create(input, &*self.engine) { + Ok(item) => { self.processing.write().insert(h.clone()); - self.verification.unverified.lock().push_back(UnverifiedBlock { header: header, bytes: bytes }); + self.verification.unverified.lock().push_back(item); self.more_to_verify.notify_all(); Ok(h) }, Err(err) => { - warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); self.verification.bad.lock().insert(h.clone()); Err(err) } } } - /// Mark given block and all its children as bad. Stops verification. - pub fn mark_as_bad(&self, block_hashes: &[H256]) { - if block_hashes.is_empty() { + /// Mark given item and all its children as bad. pauses verification + /// until complete. + pub fn mark_as_bad(&self, hashes: &[H256]) { + if hashes.is_empty() { return; } let mut verified_lock = self.verification.verified.lock(); let mut verified = &mut *verified_lock; let mut bad = self.verification.bad.lock(); let mut processing = self.processing.write(); - bad.reserve(block_hashes.len()); - for hash in block_hashes { + bad.reserve(hashes.len()); + for hash in hashes { bad.insert(hash.clone()); processing.remove(hash); } let mut new_verified = VecDeque::new(); - for block in verified.drain(..) { - if bad.contains(block.header.parent_hash()) { - bad.insert(block.header.hash()); - processing.remove(&block.header.hash()); + for output in verified.drain(..) { + if bad.contains(&output.parent_hash()) { + bad.insert(output.hash()); + processing.remove(&output.hash()); } else { - new_verified.push_back(block); + new_verified.push_back(output); } } *verified = new_verified; } - /// Mark given block as processed - pub fn mark_as_good(&self, block_hashes: &[H256]) { - if block_hashes.is_empty() { + /// Mark given item as processed + pub fn mark_as_good(&self, hashes: &[H256]) { + if hashes.is_empty() { return; } let mut processing = self.processing.write(); - for hash in block_hashes { + for hash in hashes { processing.remove(hash); } } - /// Removes up to `max` verified blocks from the queue - pub fn drain(&self, max: usize) -> Vec { + /// Removes up to `max` verified items from the queue + pub fn drain(&self, max: usize) -> Vec { let mut verified = self.verification.verified.lock(); let count = min(max, verified.len()); - let mut result = Vec::with_capacity(count); - for _ in 0..count { - let block = verified.pop_front().unwrap(); - result.push(block); - } + let result = verified.drain(..count).collect::>(); + self.ready_signal.reset(); if !verified.is_empty() { self.ready_signal.set(); @@ -389,7 +391,7 @@ impl BlockQueue { } /// Get queue status. - pub fn queue_info(&self) -> BlockQueueInfo { + pub fn queue_info(&self) -> QueueInfo { let (unverified_len, unverified_bytes) = { let v = self.verification.unverified.lock(); (v.len(), v.heap_size_of_children()) @@ -402,7 +404,8 @@ impl BlockQueue { let v = self.verification.verified.lock(); (v.len(), v.heap_size_of_children()) }; - BlockQueueInfo { + + QueueInfo { unverified_queue_size: unverified_len, verifying_queue_size: verifying_len, verified_queue_size: verified_len, @@ -428,22 +431,22 @@ impl BlockQueue { } } -impl MayPanic for BlockQueue { +impl MayPanic for VerificationQueue { fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); } } -impl Drop for BlockQueue { +impl Drop for VerificationQueue { fn drop(&mut self) { - trace!(target: "shutdown", "[BlockQueue] Closing..."); + trace!(target: "shutdown", "[VerificationQueue] Closing..."); self.clear(); self.deleting.store(true, AtomicOrdering::Release); self.more_to_verify.notify_all(); for t in self.verifiers.drain(..) { t.join().unwrap(); } - trace!(target: "shutdown", "[BlockQueue] Closed."); + trace!(target: "shutdown", "[VerificationQueue] Closed."); } } @@ -452,7 +455,8 @@ mod tests { use util::*; use io::*; use spec::*; - use block_queue::*; + use super::{BlockQueue, Config}; + use super::kind::blocks::Unverified; use tests::helpers::*; use error::*; use views::*; @@ -460,7 +464,7 @@ mod tests { fn get_test_queue() -> BlockQueue { let spec = get_test_spec(); let engine = spec.engine; - BlockQueue::new(BlockQueueConfig::default(), engine, IoChannel::disconnected()) + BlockQueue::new(Config::default(), engine, IoChannel::disconnected()) } #[test] @@ -468,13 +472,13 @@ mod tests { // TODO better test let spec = Spec::new_test(); let engine = spec.engine; - let _ = BlockQueue::new(BlockQueueConfig::default(), engine, IoChannel::disconnected()); + let _ = BlockQueue::new(Config::default(), engine, IoChannel::disconnected()); } #[test] fn can_import_blocks() { let queue = get_test_queue(); - if let Err(e) = queue.import_block(get_good_dummy_block()) { + if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { panic!("error importing block that is valid by definition({:?})", e); } } @@ -482,11 +486,11 @@ mod tests { #[test] fn returns_error_for_duplicates() { let queue = get_test_queue(); - if let Err(e) = queue.import_block(get_good_dummy_block()) { + if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { panic!("error importing block that is valid by definition({:?})", e); } - let duplicate_import = queue.import_block(get_good_dummy_block()); + let duplicate_import = queue.import(Unverified::new(get_good_dummy_block())); match duplicate_import { Err(e) => { match e { @@ -503,14 +507,14 @@ mod tests { let queue = get_test_queue(); let block = get_good_dummy_block(); let hash = BlockView::new(&block).header().hash().clone(); - if let Err(e) = queue.import_block(block) { + if let Err(e) = queue.import(Unverified::new(block)) { panic!("error importing block that is valid by definition({:?})", e); } queue.flush(); queue.drain(10); queue.mark_as_good(&[ hash ]); - if let Err(e) = queue.import_block(get_good_dummy_block()) { + if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { panic!("error importing block that has already been drained ({:?})", e); } } @@ -518,7 +522,8 @@ mod tests { #[test] fn returns_empty_once_finished() { let queue = get_test_queue(); - queue.import_block(get_good_dummy_block()).expect("error importing block that is valid by definition"); + queue.import(Unverified::new(get_good_dummy_block())) + .expect("error importing block that is valid by definition"); queue.flush(); queue.drain(1); @@ -529,13 +534,13 @@ mod tests { fn test_mem_limit() { let spec = get_test_spec(); let engine = spec.engine; - let mut config = BlockQueueConfig::default(); + let mut config = Config::default(); config.max_mem_use = super::MIN_MEM_LIMIT; // empty queue uses about 15000 let queue = BlockQueue::new(config, engine, IoChannel::disconnected()); assert!(!queue.queue_info().is_full()); let mut blocks = get_good_dummy_block_seq(50); for b in blocks.drain(..) { - queue.import_block(b).unwrap(); + queue.import(Unverified::new(b)).unwrap(); } assert!(queue.queue_info().is_full()); } diff --git a/ethcore/src/verification/verification.rs b/ethcore/src/verification/verification.rs index 4e1305a33..f89ac7d9a 100644 --- a/ethcore/src/verification/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -36,14 +36,22 @@ pub struct PreverifiedBlock { pub bytes: Bytes, } +impl HeapSizeOf for PreverifiedBlock { + fn heap_size_of_children(&self) -> usize { + self.header.heap_size_of_children() + + self.transactions.heap_size_of_children() + + self.bytes.heap_size_of_children() + } +} + /// Phase 1 quick block verification. Only does checks that are cheap. Operates on a single block pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { - try!(verify_header(&header, engine)); + try!(verify_header_params(&header, engine)); try!(verify_block_integrity(bytes, &header.transactions_root(), &header.uncles_hash())); try!(engine.verify_block_basic(&header, Some(bytes))); for u in try!(UntrustedRlp::new(bytes).at(2)).iter().map(|rlp| rlp.as_val::
()) { let u = try!(u); - try!(verify_header(&u, engine)); + try!(verify_header_params(&u, engine)); try!(engine.verify_block_basic(&u, None)); } // Verify transactions. @@ -179,7 +187,7 @@ pub fn verify_block_final(expected: &Header, got: &Header) -> Result<(), Error> } /// Check basic header parameters. -fn verify_header(header: &Header, engine: &Engine) -> Result<(), Error> { +pub fn verify_header_params(header: &Header, engine: &Engine) -> Result<(), Error> { if header.number() >= From::from(BlockNumber::max_value()) { return Err(From::from(BlockError::RidiculousNumber(OutOfBounds { max: Some(From::from(BlockNumber::max_value())), min: None, found: header.number() }))) }