diff --git a/Cargo.lock b/Cargo.lock index d3611b47a..9be4fe144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1004,6 +1004,7 @@ dependencies = [ "rand_xorshift 0.2.0", "rayon", "regex 1.3.9", + "reth-util", "rlp", "rlp_compress", "rlp_derive", @@ -3916,6 +3917,11 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "reth-util" +version = "0.1.0" +source = "git+https://github.com/gnosis/reth.git?rev=573e128#573e128487d5651f301e21faa97fc8e80f91dee8" + [[package]] name = "ring" version = "0.14.6" diff --git a/crates/ethcore/Cargo.toml b/crates/ethcore/Cargo.toml index e1b83ebc9..6e77eac69 100644 --- a/crates/ethcore/Cargo.toml +++ b/crates/ethcore/Cargo.toml @@ -72,6 +72,7 @@ time-utils = { path = "../util/time-utils" } trace-time = "0.1" triehash-ethereum = { version = "0.2", path = "../util/triehash-ethereum" } unexpected = { path = "../util/unexpected" } +reth-util = { git = "https://github.com/gnosis/reth.git", rev = "573e128", package="reth-util" } using_queue = { path = "../concensus/miner/using-queue" } vm = { path = "../vm/vm" } walkdir = "2.3" diff --git a/crates/ethcore/service/src/service.rs b/crates/ethcore/service/src/service.rs index 9969be0ef..7e24113fd 100644 --- a/crates/ethcore/service/src/service.rs +++ b/crates/ethcore/service/src/service.rs @@ -142,6 +142,7 @@ impl ClientService { pub fn shutdown(&self) { trace!(target: "shutdown", "Shutting down Client Service"); self.snapshot.shutdown(); + self.client.shutdown(); } } diff --git a/crates/ethcore/src/client/client.rs b/crates/ethcore/src/client/client.rs index 7f707a915..9021c45ca 100644 --- a/crates/ethcore/src/client/client.rs +++ b/crates/ethcore/src/client/client.rs @@ -34,7 +34,6 @@ use blockchain::{ use bytes::{Bytes, ToPretty}; use call_contract::CallContract; use db::{DBTransaction, DBValue, KeyValueDB}; -use error::Error; use ethcore_miner::pool::VerifiedTransaction; use ethereum_types::{Address, H256, H264, U256}; use hash::keccak; @@ -80,8 +79,8 @@ use engines::{ MAX_UNCLE_AGE, }; use error::{ - BlockError, CallError, Error as EthcoreError, ErrorKind as EthcoreErrorKind, EthcoreResult, - ExecutionError, ImportErrorKind, + BlockError, CallError, Error, Error as EthcoreError, ErrorKind as EthcoreErrorKind, + EthcoreResult, ExecutionError, ImportErrorKind, QueueErrorKind, }; use executive::{contract_address, Executed, Executive, TransactOptions}; use factory::{Factories, VmFactory}; @@ -105,14 +104,14 @@ use vm::Schedule; // re-export pub use blockchain::CacheSize as BlockChainCacheSize; use db::{keys::BlockDetails, Readable, Writable}; +pub use reth_util::queue::ExecutionQueue; pub use types::{block_status::BlockStatus, blockchain_info::BlockChainInfo}; pub use verification::QueueInfo as BlockQueueInfo; - use_contract!(registry, "res/contracts/registrar.json"); -const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; +const ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; // Max number of blocks imported at once. -const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4; +const ANCIENT_BLOCKS_BATCH_SIZE: usize = 4; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MIN_HISTORY_SIZE: u64 = 8; @@ -235,10 +234,9 @@ pub struct Client { /// Queued transactions from IO queue_transactions: IoChannelQueue, /// Ancient blocks import queue - queue_ancient_blocks: IoChannelQueue, /// Queued ancient blocks, make sure they are imported in order. - queued_ancient_blocks: Arc, VecDeque<(Unverified, Bytes)>)>>, - ancient_blocks_import_lock: Arc>, + queued_ancient_blocks: Arc>>, + queued_ancient_blocks_executer: Mutex>>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -973,9 +971,8 @@ impl Client { io_channel: RwLock::new(message_channel), notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size), - queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), queued_ancient_blocks: Default::default(), - ancient_blocks_import_lock: Default::default(), + queued_ancient_blocks_executer: Default::default(), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories, @@ -987,6 +984,44 @@ impl Client { config, }); + let exec_client = client.clone(); + + let queued = client.queued_ancient_blocks.clone(); + let queued_ancient_blocks_executer = ExecutionQueue::new( + ANCIENT_BLOCKS_QUEUE_SIZE, + ANCIENT_BLOCKS_BATCH_SIZE, + move |ancient_block: Vec<(Unverified, Bytes)>| { + trace_time!("import_ancient_block"); + for (unverified, receipts_bytes) in ancient_block { + let hash = unverified.hash(); + if !exec_client.chain.read().is_known(&unverified.parent_hash()) { + queued.write().remove(&hash); + continue; + } + let result = exec_client.importer.import_old_block( + unverified, + &receipts_bytes, + &**exec_client.db.read().key_value(), + &*exec_client.chain.read(), + ); + if let Err(e) = result { + error!(target: "client", "Error importing ancient block: {}", e); + + let mut queued = queued.write(); + queued.clear(); + } + // remove from pending + queued.write().remove(&hash); + } + }, + "ancient_block_exec", + ); + + client + .queued_ancient_blocks_executer + .lock() + .replace(queued_ancient_blocks_executer); + // prune old states. { let state_db = client.state_db.read().boxed_clone(); @@ -1034,6 +1069,15 @@ impl Client { Ok(client) } + /// signals shutdown of application. We do cleanup here. + pub fn shutdown(&self) { + let mut abe = self.queued_ancient_blocks_executer.lock(); + if abe.is_some() { + abe.as_mut().unwrap().end() + } + *abe = None; + } + /// Wakes up client if it's a sleep. pub fn keep_alive(&self) { let should_wake = match *self.mode.lock() { @@ -1866,6 +1910,12 @@ impl StateClient for Client { } } +impl Drop for Client { + fn drop(&mut self) { + self.shutdown() + } +} + impl Call for Client { type State = State<::state_db::StateDB>; @@ -2714,7 +2764,7 @@ impl IoClient for Client { let parent_hash = unverified.parent_hash(); // NOTE To prevent race condition with import, make sure to check queued blocks first // (and attempt to acquire lock) - let is_parent_pending = self.queued_ancient_blocks.read().0.contains(&parent_hash); + let is_parent_pending = self.queued_ancient_blocks.read().contains(&parent_hash); if !is_parent_pending && !self.chain.read().is_known(&parent_hash) { bail!(EthcoreErrorKind::Block(BlockError::UnknownParent( parent_hash @@ -2722,49 +2772,33 @@ impl IoClient for Client { } } - // we queue blocks here and trigger an IO message. + // we queue blocks here and trigger an Executer. { let mut queued = self.queued_ancient_blocks.write(); - queued.0.insert(hash); - queued.1.push_back((unverified, receipts_bytes)); + queued.insert(hash); } - let queued = self.queued_ancient_blocks.clone(); - let lock = self.ancient_blocks_import_lock.clone(); - self.queue_ancient_blocks - .queue(&self.io_channel.read(), 1, move |client| { - trace_time!("import_ancient_block"); - // Make sure to hold the lock here to prevent importing out of order. - // We use separate lock, cause we don't want to block queueing. - let _lock = lock.lock(); - for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT { - let first = queued.write().1.pop_front(); - if let Some((unverified, receipts_bytes)) = first { - let hash = unverified.hash(); - let result = client.importer.import_old_block( - unverified, - &receipts_bytes, - &**client.db.read().key_value(), - &*client.chain.read(), - ); - if let Err(e) = result { - error!(target: "client", "Error importing ancient block: {}", e); - - let mut queued = queued.write(); - queued.0.clear(); - queued.1.clear(); - } - // remove from pending - queued.write().0.remove(&hash); - } else { - break; - } + // see content of executer in Client::new() + match self.queued_ancient_blocks_executer.lock().as_ref() { + Some(queue) => { + if !queue.enqueue((unverified, receipts_bytes)) { + bail!(EthcoreErrorKind::Queue(QueueErrorKind::Full( + ANCIENT_BLOCKS_QUEUE_SIZE + ))); } - })?; - + } + None => (), + } Ok(hash) } + fn ancient_block_queue_fullness(&self) -> f32 { + match self.queued_ancient_blocks_executer.lock().as_ref() { + Some(queue) => queue.len() as f32 / ANCIENT_BLOCKS_QUEUE_SIZE as f32, + None => 1.0, //return 1.0 if queue is not set + } + } + fn queue_consensus_message(&self, message: Bytes) { match self .queue_consensus_message diff --git a/crates/ethcore/src/client/test_client.rs b/crates/ethcore/src/client/test_client.rs index 09e880746..960e95915 100644 --- a/crates/ethcore/src/client/test_client.rs +++ b/crates/ethcore/src/client/test_client.rs @@ -1087,6 +1087,10 @@ impl IoClient for TestBlockChainClient { self.miner.import_external_transactions(self, txs); } + fn ancient_block_queue_fullness(&self) -> f32 { + 0.0 + } + fn queue_ancient_block(&self, unverified: Unverified, _r: Bytes) -> EthcoreResult { self.import_block(unverified) } diff --git a/crates/ethcore/src/client/traits.rs b/crates/ethcore/src/client/traits.rs index f314741e4..6546710b0 100644 --- a/crates/ethcore/src/client/traits.rs +++ b/crates/ethcore/src/client/traits.rs @@ -220,6 +220,9 @@ pub trait IoClient: Sync + Send { receipts_bytes: Bytes, ) -> EthcoreResult; + /// Return percentage of how full is queue that handles ancient blocks. 0 if empty, 1 if full. + fn ancient_block_queue_fullness(&self) -> f32; + /// Queue conensus engine message. fn queue_consensus_message(&self, message: Bytes); } diff --git a/crates/ethcore/src/json_tests/chain.rs b/crates/ethcore/src/json_tests/chain.rs index f923fd216..b58f4e1fc 100644 --- a/crates/ethcore/src/json_tests/chain.rs +++ b/crates/ethcore/src/json_tests/chain.rs @@ -232,6 +232,7 @@ pub fn json_chain_test( client.chain_info().best_block_hash == blockchain.best_block.into() && post_state_success, ); + client.shutdown() } } diff --git a/crates/ethcore/src/lib.rs b/crates/ethcore/src/lib.rs index 5a0c1b0e3..b2d955491 100644 --- a/crates/ethcore/src/lib.rs +++ b/crates/ethcore/src/lib.rs @@ -52,6 +52,7 @@ extern crate parking_lot; extern crate patricia_trie_ethereum as ethtrie; extern crate rand; extern crate rayon; +extern crate reth_util; extern crate rlp; extern crate rustc_hex; extern crate serde; diff --git a/crates/ethcore/src/snapshot/tests/service.rs b/crates/ethcore/src/snapshot/tests/service.rs index 7046d4b47..2c567b232 100644 --- a/crates/ethcore/src/snapshot/tests/service.rs +++ b/crates/ethcore/src/snapshot/tests/service.rs @@ -369,7 +369,7 @@ fn recover_aborted_recovery() { } e => panic!("Snapshot restoration must be ongoing ; {:?}", e), } - + // abort restoration so that we can delete snapshot root folder service.abort_restore(); // Remove the snapshot directory, and restart the restoration diff --git a/crates/ethcore/src/tests/trace.rs b/crates/ethcore/src/tests/trace.rs index 4cb92a2e3..bb16edac1 100644 --- a/crates/ethcore/src/tests/trace.rs +++ b/crates/ethcore/src/tests/trace.rs @@ -243,5 +243,6 @@ fn can_trace_block_and_uncle_reward() { // Test1. Check block filter let traces = client.block_traces(BlockId::Number(3)); + client.shutdown(); assert_eq!(traces.unwrap().len(), 3); } diff --git a/crates/ethcore/sync/src/chain/mod.rs b/crates/ethcore/sync/src/chain/mod.rs index 1ccbc1f4f..b5508a39e 100644 --- a/crates/ethcore/sync/src/chain/mod.rs +++ b/crates/ethcore/sync/src/chain/mod.rs @@ -1131,17 +1131,20 @@ impl ChainSync { } // Only ask for old blocks if the peer has an equal or higher difficulty - let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty); - + let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty); + // check queue fullness + let ancient_block_fullness = io.chain().ancient_block_queue_fullness(); if force || equal_or_higher_difficulty { let mut is_complete = false; if let Some(old_blocks) = self.old_blocks.as_mut() { - if let Some(request) = old_blocks.request_blocks(peer_id, io, num_active_peers) { - SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); - return; + // check if ancient queue can take more request or not. + if ancient_block_fullness < 0.8 { + if let Some(request) = old_blocks.request_blocks(peer_id, io, num_active_peers) { + SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); + return; + } + is_complete = old_blocks.is_complete(); } - is_complete = old_blocks.is_complete(); - } if is_complete { // if old_blocks is in complete state, set it to None. self.old_blocks = None;