possible fix for queue drop deadlock (#3702)

* possible fix for #3686

* queue: simplify conclusion, don't block on joining

* queue: park verifiers with timeout to prevent race

* more robust verification loop

* queue: re-introduce wait for verifier joining
This commit is contained in:
Robert Habermeier 2016-12-05 18:18:56 +01:00 committed by Arkadiy Paronyan
parent a726472023
commit 1b6ebe1a6d

View File

@ -17,7 +17,7 @@
//! A queue of blocks. Sits between network or other I/O and the `BlockChain`. //! A queue of blocks. Sits between network or other I/O and the `BlockChain`.
//! Sorts them ready for blockchain insertion. //! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self}; use std::thread::{self, JoinHandle};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*; use util::*;
@ -64,35 +64,11 @@ impl Default for Config {
} }
} }
struct VerifierHandle { // pool states
deleting: Arc<AtomicBool>, enum State {
sleep: Arc<AtomicBool>, // all threads with id < inner value are to work.
thread: JoinHandle<()>, Work(usize),
} Exit,
impl VerifierHandle {
// signal to the verifier thread that it should sleep.
fn sleep(&self) {
self.sleep.store(true, AtomicOrdering::SeqCst);
}
// signal to the verifier thread that it should wake up.
fn wake_up(&self) {
self.sleep.store(false, AtomicOrdering::SeqCst);
self.thread.thread().unpark();
}
// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.wake_up();
self.deleting.store(true, AtomicOrdering::Release);
}
// join the verifier thread.
fn join(self) {
self.thread.join().expect("Verifier thread panicked");
}
} }
/// An item which is in the process of being verified. /// An item which is in the process of being verified.
@ -131,7 +107,6 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>, engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>, more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
@ -139,6 +114,8 @@ pub struct VerificationQueue<K: Kind> {
ticks_since_adjustment: AtomicUsize, ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>,
} }
struct QueueSignal { struct QueueSignal {
@ -224,40 +201,39 @@ impl<K: Kind> VerificationQueue<K> {
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
let default_amount = max(::num_cpus::get(), 3) - 2; let default_amount = max(::num_cpus::get(), 3) - 2;
let mut verifiers = Vec::with_capacity(max_verifiers); let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
for i in 0..max_verifiers { for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i); debug!(target: "verification", "Adding verification thread #{}", i);
let deleting = deleting.clone();
let panic_handler = panic_handler.clone(); let panic_handler = panic_handler.clone();
let verification = verification.clone(); let verification = verification.clone();
let engine = engine.clone(); let engine = engine.clone();
let wait = more_to_verify.clone(); let wait = more_to_verify.clone();
let ready = ready_signal.clone(); let ready = ready_signal.clone();
let empty = empty.clone(); let empty = empty.clone();
let state = state.clone();
// enable only the first few verifiers. let handle = thread::Builder::new()
let sleep = if i < default_amount { .name(format!("Verifier #{}", i))
Arc::new(AtomicBool::new(false)) .spawn(move || {
} else { panic_handler.catch_panic(move || {
Arc::new(AtomicBool::new(true)) VerificationQueue::verify(
}; verification,
engine,
verifiers.push(VerifierHandle { wait,
deleting: deleting.clone(), ready,
sleep: sleep.clone(), empty,
thread: thread::Builder::new() state,
.name(format!("Verifier #{}", i)) i,
.spawn(move || { )
panic_handler.catch_panic(move || { }).unwrap()
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep) })
}).unwrap() .expect("Failed to create verifier thread.");
}) verifier_handles.push(handle);
.expect("Failed to create verifier thread.")
});
} }
VerificationQueue { VerificationQueue {
@ -266,13 +242,14 @@ impl<K: Kind> VerificationQueue<K> {
ready_signal: ready_signal, ready_signal: ready_signal,
more_to_verify: more_to_verify, more_to_verify: more_to_verify,
verification: verification, verification: verification,
verifiers: Mutex::new((verifiers, default_amount)),
deleting: deleting, deleting: deleting,
processing: RwLock::new(HashSet::new()), processing: RwLock::new(HashSet::new()),
empty: empty, empty: empty,
ticks_since_adjustment: AtomicUsize::new(0), ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
verifier_handles: verifier_handles,
state: state,
} }
} }
@ -281,23 +258,30 @@ impl<K: Kind> VerificationQueue<K> {
engine: Arc<Engine>, engine: Arc<Engine>,
wait: Arc<SCondvar>, wait: Arc<SCondvar>,
ready: Arc<QueueSignal>, ready: Arc<QueueSignal>,
deleting: Arc<AtomicBool>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
sleep: Arc<AtomicBool>, state: Arc<(Mutex<State>, Condvar)>,
id: usize,
) { ) {
while !deleting.load(AtomicOrdering::Acquire) { loop {
// check current state.
{ {
while sleep.load(AtomicOrdering::SeqCst) { let mut cur_state = state.0.lock();
trace!(target: "verification", "Verifier sleeping"); while let State::Work(x) = *cur_state {
::std::thread::park(); // sleep until this thread is required.
trace!(target: "verification", "Verifier waking up"); if id < x { break }
if deleting.load(AtomicOrdering::Acquire) { debug!(target: "verification", "verifier {} sleeping", id);
return; state.1.wait(&mut cur_state);
} debug!(target: "verification", "verifier {} waking up", id);
}
if let State::Exit = *cur_state {
debug!(target: "verification", "verifier {} exiting", id);
break;
} }
} }
// wait for work if empty.
{ {
let mut more_to_verify = verification.more_to_verify.lock().unwrap(); let mut more_to_verify = verification.more_to_verify.lock().unwrap();
@ -305,15 +289,22 @@ impl<K: Kind> VerificationQueue<K> {
empty.notify_all(); empty.notify_all();
} }
while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) { while verification.unverified.lock().is_empty() {
if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
return;
}
more_to_verify = wait.wait(more_to_verify).unwrap(); more_to_verify = wait.wait(more_to_verify).unwrap();
} }
if deleting.load(AtomicOrdering::Acquire) { if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
return; return;
} }
} }
// do work.
let item = { let item = {
// acquire these locks before getting the item to verify. // acquire these locks before getting the item to verify.
let mut unverified = verification.unverified.lock(); let mut unverified = verification.unverified.lock();
@ -568,6 +559,14 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
/// Get the current number of working verifiers.
pub fn num_verifiers(&self) -> usize {
match *self.state.0.lock() {
State::Work(x) => x,
State::Exit => panic!("state only set to exit on drop; queue live now; qed"),
}
}
/// Optimise memory footprint of the heap fields, and adjust the number of threads /// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload. /// to better suit the workload.
pub fn collect_garbage(&self) { pub fn collect_garbage(&self) {
@ -604,7 +603,7 @@ impl<K: Kind> VerificationQueue<K> {
return; return;
} }
let current = self.verifiers.lock().1; let current = self.num_verifiers();
let diff = (v_len - u_len).abs(); let diff = (v_len - u_len).abs();
let total = v_len + u_len; let total = v_len + u_len;
@ -626,27 +625,14 @@ impl<K: Kind> VerificationQueue<K> {
// possible, never going over the amount of initially allocated threads // possible, never going over the amount of initially allocated threads
// or below 1. // or below 1.
fn scale_verifiers(&self, target: usize) { fn scale_verifiers(&self, target: usize) {
let mut verifiers = self.verifiers.lock(); let current = self.num_verifiers();
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers; let target = min(self.verifier_handles.len(), target);
let target = min(verifiers.len(), target);
let target = max(1, target); let target = max(1, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target); debug!(target: "verification", "Scaling from {} to {} verifiers", current, target);
// scaling up *self.state.0.lock() = State::Work(target);
for i in *verifier_count..target { self.state.1.notify_all();
debug!(target: "verification", "Waking up verifier {}", i);
verifiers[i].wake_up();
}
// scaling down.
for i in target..*verifier_count {
debug!(target: "verification", "Putting verifier {} to sleep", i);
verifiers[i].sleep();
}
*verifier_count = target;
} }
} }
@ -660,22 +646,18 @@ impl<K: Kind> Drop for VerificationQueue<K> {
fn drop(&mut self) { fn drop(&mut self) {
trace!(target: "shutdown", "[VerificationQueue] Closing..."); trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear(); self.clear();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::SeqCst);
let mut verifiers = self.verifiers.get_mut(); // set exit state; should be done before `more_to_verify` notification.
let mut verifiers = &mut verifiers.0; *self.state.0.lock() = State::Exit;
self.state.1.notify_all();
// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
}
// wake up all threads waiting for more work.
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
// second pass to join. // wait for all verifier threads to join.
for handle in verifiers.drain(..) { for thread in self.verifier_handles.drain(..) {
handle.join(); thread.join().expect("Propagating verifier thread panic on shutdown");
} }
trace!(target: "shutdown", "[VerificationQueue] Closed."); trace!(target: "shutdown", "[VerificationQueue] Closed.");
@ -687,7 +669,7 @@ mod tests {
use util::*; use util::*;
use io::*; use io::*;
use spec::*; use spec::*;
use super::{BlockQueue, Config}; use super::{BlockQueue, Config, State};
use super::kind::blocks::Unverified; use super::kind::blocks::Unverified;
use tests::helpers::*; use tests::helpers::*;
use error::*; use error::*;
@ -784,11 +766,11 @@ mod tests {
let queue = get_test_queue(); let queue = get_test_queue();
queue.scale_verifiers(MAX_VERIFIERS + 1); queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1); assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
queue.scale_verifiers(0); queue.scale_verifiers(0);
assert!(queue.verifiers.lock().1 == 1); assert!(queue.num_verifiers() == 1);
} }
#[test] #[test]
@ -797,14 +779,7 @@ mod tests {
// put all the verifiers to sleep to ensure // put all the verifiers to sleep to ensure
// the test isn't timing sensitive. // the test isn't timing sensitive.
let num_verifiers = { *queue.state.0.lock() = State::Work(0);
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].sleep();
}
verifiers.1
};
for block in get_good_dummy_block_seq(5000) { for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed"); queue.import(Unverified::new(block)).expect("Block good by definition; qed");
@ -812,20 +787,12 @@ mod tests {
// almost all unverified == bump verifier count. // almost all unverified == bump verifier count.
queue.collect_garbage(); queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1); assert_eq!(queue.num_verifiers(), 1);
// wake them up again and verify everything.
{
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].wake_up();
}
}
queue.flush(); queue.flush();
// nothing to verify == use minimum number of verifiers. // nothing to verify == use minimum number of verifiers.
queue.collect_garbage(); queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, 1); assert_eq!(queue.num_verifiers(), 1);
} }
} }