Prevent deadlocks

This commit is contained in:
arkpar 2016-02-16 17:53:31 +01:00
parent 33e8d749d2
commit d95e971030
2 changed files with 15 additions and 8 deletions

View File

@ -153,7 +153,7 @@ impl BlockQueue {
} }
fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) { fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
while !deleting.load(AtomicOrdering::Relaxed) { while !deleting.load(AtomicOrdering::Acquire) {
{ {
let mut lock = verification.lock().unwrap(); let mut lock = verification.lock().unwrap();
@ -161,11 +161,11 @@ impl BlockQueue {
empty.notify_all(); empty.notify_all();
} }
while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
lock = wait.wait(lock).unwrap(); lock = wait.wait(lock).unwrap();
} }
if deleting.load(AtomicOrdering::Relaxed) { if deleting.load(AtomicOrdering::Acquire) {
return; return;
} }
} }
@ -347,7 +347,7 @@ impl MayPanic for BlockQueue {
impl Drop for BlockQueue { impl Drop for BlockQueue {
fn drop(&mut self) { fn drop(&mut self) {
self.clear(); self.clear();
self.deleting.store(true, AtomicOrdering::Relaxed); self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) { for t in self.verifiers.drain(..) {
t.join().unwrap(); t.join().unwrap();

View File

@ -44,6 +44,7 @@ pub struct Worker {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>, wait: Arc<Condvar>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
wait_mutex: Arc<Mutex<()>>,
} }
impl Worker { impl Worker {
@ -61,6 +62,7 @@ impl Worker {
thread: None, thread: None,
wait: wait.clone(), wait: wait.clone(),
deleting: deleting.clone(), deleting: deleting.clone(),
wait_mutex: wait_mutex.clone(),
}; };
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
move || { move || {
@ -77,13 +79,17 @@ impl Worker {
wait_mutex: Arc<Mutex<()>>, wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>) deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
while !deleting.load(AtomicOrdering::Relaxed) { loop {
{ {
let lock = wait_mutex.lock().unwrap(); let lock = wait_mutex.lock().unwrap();
let _ = wait.wait(lock).unwrap(); if deleting.load(AtomicOrdering::Acquire) {
if deleting.load(AtomicOrdering::Relaxed) {
return; return;
} }
let _ = wait.wait(lock).unwrap();
}
if deleting.load(AtomicOrdering::Acquire) {
return;
} }
while let chase_lev::Steal::Data(work) = stealer.steal() { while let chase_lev::Steal::Data(work) = stealer.steal() {
Worker::do_work(work, channel.clone()); Worker::do_work(work, channel.clone());
@ -114,7 +120,8 @@ impl Worker {
impl Drop for Worker { impl Drop for Worker {
fn drop(&mut self) { fn drop(&mut self) {
self.deleting.store(true, AtomicOrdering::Relaxed); let _ = self.wait_mutex.lock();
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all(); self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap(); let thread = mem::replace(&mut self.thread, None).unwrap();
thread.join().ok(); thread.join().ok();