fix deadlock in queue drop (#4095)

Former-commit-id: 923fc16078834f0c63e245dcb01ee95210bfc7f7
This commit is contained in:
Robert Habermeier 2017-01-09 14:56:06 +01:00 committed by Arkadiy Paronyan
parent e152d012b4
commit 2dc9c23d07

View File

@ -97,7 +97,6 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>, engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>, more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
@ -105,6 +104,7 @@ pub struct VerificationQueue<K: Kind> {
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
total_difficulty: RwLock<U256>, total_difficulty: RwLock<U256>,
verifier_handles: Vec<JoinHandle<()>>,
} }
struct QueueSignal { struct QueueSignal {
@ -186,7 +186,7 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifier_handles: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count { for i in 0..thread_count {
let verification = verification.clone(); let verification = verification.clone();
@ -196,7 +196,7 @@ impl<K: Kind> VerificationQueue<K> {
let empty = empty.clone(); let empty = empty.clone();
let deleting = deleting.clone(); let deleting = deleting.clone();
let panic_handler = panic_handler.clone(); let panic_handler = panic_handler.clone();
verifiers.push( verifier_handles.push(
thread::Builder::new() thread::Builder::new()
.name(format!("Verifier #{}", i)) .name(format!("Verifier #{}", i))
.spawn(move || { .spawn(move || {
@ -213,13 +213,13 @@ impl<K: Kind> VerificationQueue<K> {
ready_signal: ready_signal.clone(), ready_signal: ready_signal.clone(),
more_to_verify: more_to_verify.clone(), more_to_verify: more_to_verify.clone(),
verification: verification.clone(), verification: verification.clone(),
verifiers: verifiers,
deleting: deleting.clone(), deleting: deleting.clone(),
processing: RwLock::new(HashMap::new()), processing: RwLock::new(HashMap::new()),
empty: empty.clone(), empty: empty.clone(),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
total_difficulty: RwLock::new(0.into()), total_difficulty: RwLock::new(0.into()),
verifier_handles: verifier_handles,
} }
} }
@ -531,8 +531,15 @@ impl<K: Kind> Drop for VerificationQueue<K> {
trace!(target: "shutdown", "[VerificationQueue] Closing..."); trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear(); self.clear();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::Release);
// acquire this lock to force threads to reach the waiting point
// if they're in-between the exit check and the more_to_verify wait.
{
let _more = self.verification.more_to_verify.lock().unwrap();
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) { }
for t in self.verifier_handles.drain(..) {
t.join().unwrap(); t.join().unwrap();
} }
trace!(target: "shutdown", "[VerificationQueue] Closed."); trace!(target: "shutdown", "[VerificationQueue] Closed.");