Fix ancient blocks queue deadlock (#8751)

* Revert "Fix not downloading old blocks (#8642)"

This reverts commit d1934363e7.

* Make sure only one thread actually imports old blocks.

* Add some trace timers.

* Bring back pending hashes set.

* Separate locks so that queue can happen while we are importing.

* Address grumbles.
This commit is contained in:
Tomasz Drwięga 2018-06-05 19:49:46 +02:00 committed by Afri Schoedon
parent 123b6ae62e
commit 6771539a90
2 changed files with 70 additions and 45 deletions

View File

@ -17,7 +17,7 @@
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque}; use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
@ -90,6 +90,8 @@ use_contract!(registry, "Registry", "res/contracts/registrar.json");
const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8; const MIN_HISTORY_SIZE: u64 = 8;
@ -210,8 +212,12 @@ pub struct Client {
queue_transactions: IoChannelQueue, queue_transactions: IoChannelQueue,
/// Ancient blocks import queue /// Ancient blocks import queue
queue_ancient_blocks: IoChannelQueue, queue_ancient_blocks: IoChannelQueue,
/// Hashes of pending ancient block wainting to be included /// Queued ancient blocks, make sure they are imported in order.
pending_ancient_blocks: RwLock<HashSet<H256>>, queued_ancient_blocks: Arc<RwLock<(
HashSet<H256>,
VecDeque<(Header, Bytes, Bytes)>
)>>,
ancient_blocks_import_lock: Arc<Mutex<()>>,
/// Consensus messages import queue /// Consensus messages import queue
queue_consensus_message: IoChannelQueue, queue_consensus_message: IoChannelQueue,
@ -434,7 +440,6 @@ impl Importer {
let hash = header.hash(); let hash = header.hash();
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
trace!(target: "client", "Trying to import old block #{}", header.number());
{ {
trace_time!("import_old_block"); trace_time!("import_old_block");
// verify the block, passing the chain for updating the epoch verifier. // verify the block, passing the chain for updating the epoch verifier.
@ -763,7 +768,8 @@ impl Client {
notify: RwLock::new(Vec::new()), notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
pending_ancient_blocks: RwLock::new(HashSet::new()), queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(),
queue_consensus_message: IoChannelQueue::new(usize::max_value()), queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()), last_hashes: RwLock::new(VecDeque::new()),
factories: factories, factories: factories,
@ -2008,8 +2014,9 @@ impl BlockChainClient for Client {
impl IoClient for Client { impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) { fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len(); let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| { self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
trace_time!("import_queued_transactions"); trace_time!("import_queued_transactions");
let txs: Vec<UnverifiedTransaction> = transactions let txs: Vec<UnverifiedTransaction> = transactions
@ -2028,6 +2035,7 @@ impl IoClient for Client {
} }
fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> { fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
trace_time!("queue_ancient_block");
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?; let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
let hash = header.hash(); let hash = header.hash();
@ -2036,31 +2044,51 @@ impl IoClient for Client {
if self.chain.read().is_known(&hash) { if self.chain.read().is_known(&hash) {
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
} }
let parent_hash = header.parent_hash();
let parent_hash = *header.parent_hash(); // NOTE To prevent race condition with import, make sure to check queued blocks first
let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash); // (and attempt to acquire lock)
let status = self.block_status(BlockId::Hash(parent_hash)); let is_parent_pending = self.queued_ancient_blocks.read().0.contains(parent_hash);
if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) { if !is_parent_pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash))); let status = self.block_status(BlockId::Hash(*parent_hash));
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*parent_hash)));
}
} }
} }
self.pending_ancient_blocks.write().insert(hash); // we queue blocks here and trigger an IO message.
{
let mut queued = self.queued_ancient_blocks.write();
queued.0.insert(hash);
queued.1.push_back((header, block_bytes, receipts_bytes));
}
trace!(target: "client", "Queuing old block #{}", header.number()); let queued = self.queued_ancient_blocks.clone();
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| { let lock = self.ancient_blocks_import_lock.clone();
let result = client.importer.import_old_block( match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
let _lock = lock.lock();
for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT {
let first = queued.write().1.pop_front();
if let Some((header, block_bytes, receipts_bytes)) = first {
let hash = header.hash();
client.importer.import_old_block(
&header, &header,
&block_bytes, &block_bytes,
&receipts_bytes, &receipts_bytes,
&**client.db.read(), &**client.db.read(),
&*client.chain.read() &*client.chain.read()
); ).ok().map_or((), |e| {
client.pending_ancient_blocks.write().remove(&hash);
result.map(|_| ()).unwrap_or_else(|e| {
error!(target: "client", "Error importing ancient block: {}", e); error!(target: "client", "Error importing ancient block: {}", e);
}); });
// remove from pending
queued.write().0.remove(&hash);
} else {
break;
}
}
}) { }) {
Ok(_) => Ok(hash), Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))), Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
@ -2068,7 +2096,7 @@ impl IoClient for Client {
} }
fn queue_consensus_message(&self, message: Bytes) { fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| { match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) { if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e); debug!(target: "poa", "Invalid message received: {}", e);
} }
@ -2480,38 +2508,35 @@ impl fmt::Display for QueueError {
/// Queue some items to be processed by IO client. /// Queue some items to be processed by IO client.
struct IoChannelQueue { struct IoChannelQueue {
queue: Arc<Mutex<VecDeque<Box<Fn(&Client) + Send>>>>, currently_queued: Arc<AtomicUsize>,
limit: usize, limit: usize,
} }
impl IoChannelQueue { impl IoChannelQueue {
pub fn new(limit: usize) -> Self { pub fn new(limit: usize) -> Self {
IoChannelQueue { IoChannelQueue {
queue: Default::default(), currently_queued: Default::default(),
limit, limit,
} }
} }
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, fun: F) -> Result<(), QueueError> pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
where F: Fn(&Client) + Send + Sync + 'static F: Fn(&Client) + Send + Sync + 'static,
{ {
{ let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
let mut queue = self.queue.lock();
let queue_size = queue.len();
ensure!(queue_size < self.limit, QueueError::Full(self.limit)); ensure!(queue_size < self.limit, QueueError::Full(self.limit));
queue.push_back(Box::new(fun)); let currently_queued = self.currently_queued.clone();
}
let queue = self.queue.clone();
let result = channel.send(ClientIoMessage::execute(move |client| { let result = channel.send(ClientIoMessage::execute(move |client| {
while let Some(fun) = queue.lock().pop_front() { currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client); fun(client);
}
})); }));
match result { match result {
Ok(_) => Ok(()), Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => Err(QueueError::Channel(e)), Err(e) => Err(QueueError::Channel(e)),
} }
} }

View File

@ -174,7 +174,7 @@ impl TransactionQueue {
transactions: Vec<verifier::Transaction>, transactions: Vec<verifier::Transaction>,
) -> Vec<Result<(), transaction::Error>> { ) -> Vec<Result<(), transaction::Error>> {
// Run verification // Run verification
let _timer = ::trace_time::PerfTimer::new("queue::verifyAndImport"); let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import");
let options = self.options.read().clone(); let options = self.options.read().clone();
let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone()); let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());