diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 9e2cfeff4..f3a3dda10 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,7 +17,7 @@ use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque}; use std::fmt; use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; @@ -90,6 +90,8 @@ use_contract!(registry, "Registry", "res/contracts/registrar.json"); const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; +// Max number of blocks imported at once. +const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MIN_HISTORY_SIZE: u64 = 8; @@ -210,8 +212,12 @@ pub struct Client { queue_transactions: IoChannelQueue, /// Ancient blocks import queue queue_ancient_blocks: IoChannelQueue, - /// Hashes of pending ancient block wainting to be included - pending_ancient_blocks: RwLock>, + /// Queued ancient blocks, make sure they are imported in order. + queued_ancient_blocks: Arc, + VecDeque<(Header, Bytes, Bytes)> + )>>, + ancient_blocks_import_lock: Arc>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -434,7 +440,6 @@ impl Importer { let hash = header.hash(); let _import_lock = self.import_lock.lock(); - trace!(target: "client", "Trying to import old block #{}", header.number()); { trace_time!("import_old_block"); // verify the block, passing the chain for updating the epoch verifier. @@ -763,7 +768,8 @@ impl Client { notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), - pending_ancient_blocks: RwLock::new(HashSet::new()), + queued_ancient_blocks: Default::default(), + ancient_blocks_import_lock: Default::default(), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, @@ -2008,8 +2014,9 @@ impl BlockChainClient for Client { impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { + trace_time!("queue_transactions"); let len = transactions.len(); - self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| { + self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { trace_time!("import_queued_transactions"); let txs: Vec = transactions @@ -2028,6 +2035,7 @@ impl IoClient for Client { } fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result { + trace_time!("queue_ancient_block"); let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?; let hash = header.hash(); @@ -2036,31 +2044,51 @@ impl IoClient for Client { if self.chain.read().is_known(&hash) { bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); } - - let parent_hash = *header.parent_hash(); - let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash); - let status = self.block_status(BlockId::Hash(parent_hash)); - if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) { - bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash))); + let parent_hash = header.parent_hash(); + // NOTE To prevent race condition with import, make sure to check queued blocks first + // (and attempt to acquire lock) + let is_parent_pending = self.queued_ancient_blocks.read().0.contains(parent_hash); + if !is_parent_pending { + let status = self.block_status(BlockId::Hash(*parent_hash)); + if status == BlockStatus::Unknown || status == BlockStatus::Pending { + bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*parent_hash))); + } } } - self.pending_ancient_blocks.write().insert(hash); + // we queue blocks here and trigger an IO message. + { + let mut queued = self.queued_ancient_blocks.write(); + queued.0.insert(hash); + queued.1.push_back((header, block_bytes, receipts_bytes)); + } - trace!(target: "client", "Queuing old block #{}", header.number()); - match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| { - let result = client.importer.import_old_block( - &header, - &block_bytes, - &receipts_bytes, - &**client.db.read(), - &*client.chain.read() - ); - - client.pending_ancient_blocks.write().remove(&hash); - result.map(|_| ()).unwrap_or_else(|e| { - error!(target: "client", "Error importing ancient block: {}", e); - }); + let queued = self.queued_ancient_blocks.clone(); + let lock = self.ancient_blocks_import_lock.clone(); + match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { + trace_time!("import_ancient_block"); + // Make sure to hold the lock here to prevent importing out of order. + // We use separate lock, cause we don't want to block queueing. + let _lock = lock.lock(); + for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT { + let first = queued.write().1.pop_front(); + if let Some((header, block_bytes, receipts_bytes)) = first { + let hash = header.hash(); + client.importer.import_old_block( + &header, + &block_bytes, + &receipts_bytes, + &**client.db.read(), + &*client.chain.read() + ).ok().map_or((), |e| { + error!(target: "client", "Error importing ancient block: {}", e); + }); + // remove from pending + queued.write().0.remove(&hash); + } else { + break; + } + } }) { Ok(_) => Ok(hash), Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))), @@ -2068,7 +2096,7 @@ impl IoClient for Client { } fn queue_consensus_message(&self, message: Bytes) { - match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| { + match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { if let Err(e) = client.engine().handle_message(&message) { debug!(target: "poa", "Invalid message received: {}", e); } @@ -2480,38 +2508,35 @@ impl fmt::Display for QueueError { /// Queue some items to be processed by IO client. struct IoChannelQueue { - queue: Arc>>>, + currently_queued: Arc, limit: usize, } impl IoChannelQueue { pub fn new(limit: usize) -> Self { IoChannelQueue { - queue: Default::default(), + currently_queued: Default::default(), limit, } } - pub fn queue(&self, channel: &mut IoChannel, fun: F) -> Result<(), QueueError> - where F: Fn(&Client) + Send + Sync + 'static + pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where + F: Fn(&Client) + Send + Sync + 'static, { - { - let mut queue = self.queue.lock(); - let queue_size = queue.len(); - ensure!(queue_size < self.limit, QueueError::Full(self.limit)); + let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); + ensure!(queue_size < self.limit, QueueError::Full(self.limit)); - queue.push_back(Box::new(fun)); - } - - let queue = self.queue.clone(); + let currently_queued = self.currently_queued.clone(); let result = channel.send(ClientIoMessage::execute(move |client| { - while let Some(fun) = queue.lock().pop_front() { - fun(client); - } + currently_queued.fetch_sub(count, AtomicOrdering::SeqCst); + fun(client); })); match result { - Ok(_) => Ok(()), + Ok(_) => { + self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); + Ok(()) + }, Err(e) => Err(QueueError::Channel(e)), } } diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index bd5a98edc..4ebdf9e3f 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -174,7 +174,7 @@ impl TransactionQueue { transactions: Vec, ) -> Vec> { // Run verification - let _timer = ::trace_time::PerfTimer::new("queue::verifyAndImport"); + let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import"); let options = self.options.read().clone(); let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());