From d95e9710306e95552a2ba3ad4df66067c77bea96 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 16 Feb 2016 17:53:31 +0100 Subject: [PATCH] Prevent deadlocks --- ethcore/src/block_queue.rs | 8 ++++---- util/src/io/worker.rs | 15 +++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 1a1dee48e..f11519067 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -153,7 +153,7 @@ impl BlockQueue { } fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { - while !deleting.load(AtomicOrdering::Relaxed) { + while !deleting.load(AtomicOrdering::Acquire) { { let mut lock = verification.lock().unwrap(); @@ -161,11 +161,11 @@ impl BlockQueue { empty.notify_all(); } - while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { + while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { lock = wait.wait(lock).unwrap(); } - if deleting.load(AtomicOrdering::Relaxed) { + if deleting.load(AtomicOrdering::Acquire) { return; } } @@ -347,7 +347,7 @@ impl MayPanic for BlockQueue { impl Drop for BlockQueue { fn drop(&mut self) { self.clear(); - self.deleting.store(true, AtomicOrdering::Relaxed); + self.deleting.store(true, AtomicOrdering::Release); self.more_to_verify.notify_all(); for t in self.verifiers.drain(..) { t.join().unwrap(); diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 1ba0318bc..b874ea0a4 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -44,6 +44,7 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, + wait_mutex: Arc>, } impl Worker { @@ -61,6 +62,7 @@ impl Worker { thread: None, wait: wait.clone(), deleting: deleting.clone(), + wait_mutex: wait_mutex.clone(), }; worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( move || { @@ -77,13 +79,17 @@ impl Worker { wait_mutex: Arc>, deleting: Arc) where Message: Send + Sync + Clone + 'static { - while !deleting.load(AtomicOrdering::Relaxed) { + loop { { let lock = wait_mutex.lock().unwrap(); - let _ = wait.wait(lock).unwrap(); - if deleting.load(AtomicOrdering::Relaxed) { + if deleting.load(AtomicOrdering::Acquire) { return; } + let _ = wait.wait(lock).unwrap(); + } + + if deleting.load(AtomicOrdering::Acquire) { + return; } while let chase_lev::Steal::Data(work) = stealer.steal() { Worker::do_work(work, channel.clone()); @@ -114,7 +120,8 @@ impl Worker { impl Drop for Worker { fn drop(&mut self) { - self.deleting.store(true, AtomicOrdering::Relaxed); + let _ = self.wait_mutex.lock(); + self.deleting.store(true, AtomicOrdering::Release); self.wait.notify_all(); let thread = mem::replace(&mut self.thread, None).unwrap(); thread.join().ok();