fix deadlock in queue drop

This commit is contained in:
Robert Habermeier 2016-12-19 17:15:54 +01:00
parent 14a9942d14
commit ae8f77bc7c

View File

@ -137,7 +137,7 @@ pub struct VerificationQueue<K: Kind> {
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
scale_verifiers: bool, scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>, verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>, state: Arc<(Mutex<State>, Condvar)>,
} }
@ -225,8 +225,8 @@ impl<K: Kind> VerificationQueue<K> {
let num_cpus = ::num_cpus::get(); let num_cpus = ::num_cpus::get();
let max_verifiers = min(num_cpus, MAX_VERIFIERS); let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers)); let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new())); let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers); let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
@ -248,11 +248,11 @@ impl<K: Kind> VerificationQueue<K> {
.spawn(move || { .spawn(move || {
panic_handler.catch_panic(move || { panic_handler.catch_panic(move || {
VerificationQueue::verify( VerificationQueue::verify(
verification, verification,
engine, engine,
wait, wait,
ready, ready,
empty, empty,
state, state,
i, i,
) )
@ -299,11 +299,11 @@ impl<K: Kind> VerificationQueue<K> {
debug!(target: "verification", "verifier {} sleeping", id); debug!(target: "verification", "verifier {} sleeping", id);
state.1.wait(&mut cur_state); state.1.wait(&mut cur_state);
debug!(target: "verification", "verifier {} waking up", id); debug!(target: "verification", "verifier {} waking up", id);
} }
if let State::Exit = *cur_state { if let State::Exit = *cur_state {
debug!(target: "verification", "verifier {} exiting", id); debug!(target: "verification", "verifier {} exiting", id);
break; break;
} }
} }
@ -326,7 +326,7 @@ impl<K: Kind> VerificationQueue<K> {
} }
if let State::Exit = *state.0.lock() { if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id); debug!(target: "verification", "verifier {} exiting", id);
return; return;
} }
} }
@ -681,8 +681,12 @@ impl<K: Kind> Drop for VerificationQueue<K> {
*self.state.0.lock() = State::Exit; *self.state.0.lock() = State::Exit;
self.state.1.notify_all(); self.state.1.notify_all();
// wake up all threads waiting for more work. // acquire this lock to force threads to reach the waiting point
self.more_to_verify.notify_all(); // if they're in-between the exit check and the more_to_verify wait.
{
let _more = self.verification.more_to_verify.lock().unwrap();
self.more_to_verify.notify_all();
}
// wait for all verifier threads to join. // wait for all verifier threads to join.
for thread in self.verifier_handles.drain(..) { for thread in self.verifier_handles.drain(..) {
@ -811,7 +815,7 @@ mod tests {
fn readjust_verifiers() { fn readjust_verifiers() {
let queue = get_test_queue(true); let queue = get_test_queue(true);
// put all the verifiers to sleep to ensure // put all the verifiers to sleep to ensure
// the test isn't timing sensitive. // the test isn't timing sensitive.
*queue.state.0.lock() = State::Work(0); *queue.state.0.lock() = State::Work(0);