From 546cd0065925279c38d82220d1d6a2de0dc21214 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 17 Nov 2016 16:00:23 +0100 Subject: [PATCH] allocate verifiers up front, hibernate when not needed --- ethcore/src/verification/queue/mod.rs | 166 ++++++++++++++++---------- 1 file changed, 103 insertions(+), 63 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 23f82b730..f4a9287d6 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -35,6 +35,9 @@ pub mod kind; const MIN_MEM_LIMIT: usize = 16384; const MIN_QUEUE_LIMIT: usize = 512; +// maximum possible number of verification threads. +const MAX_VERIFIERS: usize = 8; + /// Type alias for block queue convenience. pub type BlockQueue = VerificationQueue; @@ -63,13 +66,26 @@ impl Default for Config { struct VerifierHandle { deleting: Arc, + sleep: Arc<(Mutex, Condvar)>, thread: JoinHandle<()>, } impl VerifierHandle { + // signal to the verifier thread that it should sleep. + fn sleep(&self) { + *self.sleep.0.lock() = true; + } + + // signal to the verifier thread that it should wake up. + fn wake_up(&self) { + *self.sleep.0.lock() = false; + self.sleep.1.notify_all(); + } + // signal to the verifier thread that it should conclude its // operations. fn conclude(&self) { + self.wake_up(); self.deleting.store(true, AtomicOrdering::Release); } @@ -115,7 +131,7 @@ pub struct VerificationQueue { engine: Arc, more_to_verify: Arc, verification: Arc>, - verifiers: Mutex>, + verifiers: Mutex<(Vec, usize)>, deleting: Arc, ready_signal: Arc, empty: Arc, @@ -212,31 +228,83 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let queue = VerificationQueue { + let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); + let default_amount = max(::num_cpus::get(), 3) - 2; + let mut verifiers = Vec::with_capacity(max_verifiers); + + debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); + + for i in 0..max_verifiers { + debug!(target: "verification", "Adding verification thread #{}", i); + + let deleting = deleting.clone(); + let panic_handler = panic_handler.clone(); + let verification = verification.clone(); + let engine = engine.clone(); + let wait = more_to_verify.clone(); + let ready = ready_signal.clone(); + let empty = empty.clone(); + + // enable only the first few verifiers. + let sleep = if i < default_amount { + Arc::new((Mutex::new(false), Condvar::new())) + } else { + Arc::new((Mutex::new(true), Condvar::new())) + }; + + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + sleep: sleep.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep) + }).unwrap() + }) + .expect("Failed to create verifier thread.") + }); + } + + VerificationQueue { engine: engine, panic_handler: panic_handler, ready_signal: ready_signal, more_to_verify: more_to_verify, verification: verification, - verifiers: Mutex::new(Vec::with_capacity(::num_cpus::get())), + verifiers: Mutex::new((verifiers, default_amount)), deleting: deleting, processing: RwLock::new(HashSet::new()), empty: empty, ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), - }; - - let thread_count = max(::num_cpus::get(), 3) - 2; - for _ in 0..thread_count { - queue.add_verifier(); } - - queue } - fn verify(verification: Arc>, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify( + verification: Arc>, + engine: Arc, + wait: Arc, + ready: Arc, + deleting: Arc, + empty: Arc, + sleep: Arc<(Mutex, Condvar)>, + ) { while !deleting.load(AtomicOrdering::Acquire) { + { + let mut should_sleep = sleep.0.lock(); + while *should_sleep { + trace!(target: "verification", "Verifier sleeping"); + sleep.1.wait(&mut should_sleep); + trace!(target: "verification", "Verifier waking up"); + + if deleting.load(AtomicOrdering::Acquire) { + return; + } + } + } + { let mut more_to_verify = verification.more_to_verify.lock().unwrap(); @@ -548,7 +616,7 @@ impl VerificationQueue { // compute the average rate of verification per thread and determine // how many are necessary to match the rate of draining. - let num_verifiers = self.verifiers.lock().len(); + let num_verifiers = self.verifiers.lock().1; let v_count_per = v_count as f64 / num_verifiers as f64; let needed = if v_count < 20 { 1 @@ -559,63 +627,34 @@ impl VerificationQueue { trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers", v_count_per, target, needed); - for _ in num_verifiers..needed { - self.add_verifier(); - } - - for _ in needed..num_verifiers { - self.remove_verifier(); - } + self.scale_verifiers(needed); } - // add a verifier thread if possible. - fn add_verifier(&self) { + // wake up or sleep verifiers to get as close to the target as + // possible, never going over the amount of initially allocated threads + // or below 1. + fn scale_verifiers(&self, target: usize) { let mut verifiers = self.verifiers.lock(); - let len = verifiers.len(); - if len == ::num_cpus::get() { - return; + let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers; + + let target = min(verifiers.capacity(), target); + let target = max(1, target); + + debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target); + + // scaling up + for i in *verifier_count..target { + debug!(target: "verification", "Waking up verifier {}", i); + verifiers[i].wake_up(); } - debug!(target: "verification", "Adding verification thread #{}", len); - - let deleting = Arc::new(AtomicBool::new(false)); - let panic_handler = self.panic_handler.clone(); - let verification = self.verification.clone(); - let engine = self.engine.clone(); - let wait = self.more_to_verify.clone(); - let ready = self.ready_signal.clone(); - let empty = self.empty.clone(); - - verifiers.push(VerifierHandle { - deleting: deleting.clone(), - thread: thread::Builder::new() - .name(format!("Verifier #{}", len)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, wait, ready, deleting, empty) - }).unwrap() - }) - .expect("Failed to create verifier thread.") - }); - } - - // remove a verifier thread if possible. - fn remove_verifier(&self) { - let mut verifiers = self.verifiers.lock(); - let len = verifiers.len(); - - // never remove the last thread. - if len == 1 { - return; + // scaling down. + for i in target..*verifier_count { + debug!(target: "verification", "Putting verifier {} to sleep", i); + verifiers[i].sleep(); } - debug!(target: "verification", "Removing verification thread #{}", len - 1); - - if let Some(handle) = verifiers.pop() { - handle.conclude(); - self.more_to_verify.notify_all(); // to ensure it's joinable immediately. - handle.join(); - } + *verifier_count = target; } } @@ -631,7 +670,8 @@ impl Drop for VerificationQueue { self.clear(); self.deleting.store(true, AtomicOrdering::Release); - let mut verifiers = self.verifiers.lock(); + let mut verifiers = self.verifiers.get_mut(); + let mut verifiers = &mut verifiers.0; // first pass to signal conclusion. must be done before // notify or deadlock possible.