diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 993ecdda2..5cfe8fce8 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,7 +17,7 @@ use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque}; use std::fmt; use std::str::FromStr; -use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; @@ -210,6 +210,8 @@ pub struct Client { queue_transactions: IoChannelQueue, /// Ancient blocks import queue queue_ancient_blocks: IoChannelQueue, + /// Hashes of pending ancient block wainting to be included + pending_ancient_blocks: RwLock>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -433,6 +435,7 @@ impl Importer { let hash = header.hash(); let _import_lock = self.import_lock.lock(); + trace!(target: "client", "Trying to import old block #{}", header.number()); { trace_time!("import_old_block"); // verify the block, passing the chain for updating the epoch verifier. @@ -761,6 +764,7 @@ impl Client { notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), + pending_ancient_blocks: RwLock::new(HashSet::new()), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, @@ -2008,7 +2012,7 @@ impl BlockChainClient for Client { impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { let len = transactions.len(); - self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { + self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| { trace_time!("import_queued_transactions"); let txs: Vec = transactions @@ -2032,23 +2036,32 @@ impl IoClient for Client { { // check block order - if self.chain.read().is_known(&header.hash()) { + if self.chain.read().is_known(&hash) { bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); } - let status = self.block_status(BlockId::Hash(*header.parent_hash())); - if status == BlockStatus::Unknown || status == BlockStatus::Pending { - bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash()))); + + let parent_hash = *header.parent_hash(); + let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash); + let status = self.block_status(BlockId::Hash(parent_hash)); + if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) { + bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash))); } } - match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { - client.importer.import_old_block( + self.pending_ancient_blocks.write().insert(hash); + + trace!(target: "client", "Queuing old block #{}", header.number()); + match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| { + let result = client.importer.import_old_block( &header, &block_bytes, &receipts_bytes, &**client.db.read(), &*client.chain.read() - ).map(|_| ()).unwrap_or_else(|e| { + ); + + client.pending_ancient_blocks.write().remove(&hash); + result.map(|_| ()).unwrap_or_else(|e| { error!(target: "client", "Error importing ancient block: {}", e); }); }) { @@ -2058,7 +2071,7 @@ impl IoClient for Client { } fn queue_consensus_message(&self, message: Bytes) { - match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { + match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| { if let Err(e) = client.engine().handle_message(&message) { debug!(target: "poa", "Invalid message received: {}", e); } @@ -2471,35 +2484,38 @@ impl fmt::Display for QueueError { /// Queue some items to be processed by IO client. struct IoChannelQueue { - currently_queued: Arc, + queue: Arc>>>, limit: usize, } impl IoChannelQueue { pub fn new(limit: usize) -> Self { IoChannelQueue { - currently_queued: Default::default(), + queue: Default::default(), limit, } } - pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where - F: Fn(&Client) + Send + Sync + 'static, + pub fn queue(&self, channel: &mut IoChannel, fun: F) -> Result<(), QueueError> + where F: Fn(&Client) + Send + Sync + 'static { - let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); - ensure!(queue_size < self.limit, QueueError::Full(self.limit)); + { + let mut queue = self.queue.lock(); + let queue_size = queue.len(); + ensure!(queue_size < self.limit, QueueError::Full(self.limit)); - let currently_queued = self.currently_queued.clone(); + queue.push_back(Box::new(fun)); + } + + let queue = self.queue.clone(); let result = channel.send(ClientIoMessage::execute(move |client| { - currently_queued.fetch_sub(count, AtomicOrdering::SeqCst); - fun(client); + while let Some(fun) = queue.lock().pop_front() { + fun(client); + } })); match result { - Ok(_) => { - self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); - Ok(()) - }, + Ok(_) => Ok(()), Err(e) => Err(QueueError::Channel(e)), } } diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 321c783b4..283f4ed61 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -266,7 +266,7 @@ impl BlockCollection { } } - /// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain. + /// Get a valid chain of blocks ordered in ascending order and ready for importing into blockchain. pub fn drain(&mut self) -> Vec { if self.blocks.is_empty() || self.head.is_none() { return Vec::new();