diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 99e09784d..686a1d093 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -35,6 +35,9 @@ pub mod kind; const MIN_MEM_LIMIT: usize = 16384; const MIN_QUEUE_LIMIT: usize = 512; +// maximum possible number of verification threads. +const MAX_VERIFIERS: usize = 8; + /// Type alias for block queue convenience. pub type BlockQueue = VerificationQueue; @@ -61,6 +64,37 @@ impl Default for Config { } } +struct VerifierHandle { + deleting: Arc, + sleep: Arc, + thread: JoinHandle<()>, +} + +impl VerifierHandle { + // signal to the verifier thread that it should sleep. + fn sleep(&self) { + self.sleep.store(true, AtomicOrdering::SeqCst); + } + + // signal to the verifier thread that it should wake up. + fn wake_up(&self) { + self.sleep.store(false, AtomicOrdering::SeqCst); + self.thread.thread().unpark(); + } + + // signal to the verifier thread that it should conclude its + // operations. + fn conclude(&self) { + self.wake_up(); + self.deleting.store(true, AtomicOrdering::Release); + } + + // join the verifier thread. + fn join(self) { + self.thread.join().expect("Verifier thread panicked"); + } +} + /// An item which is in the process of being verified. pub struct Verifying { hash: H256, @@ -97,11 +131,12 @@ pub struct VerificationQueue { engine: Arc, more_to_verify: Arc, verification: Arc>, - verifiers: Vec>, + verifiers: Mutex<(Vec, usize)>, deleting: Arc, ready_signal: Arc, empty: Arc, processing: RwLock>, + ticks_since_adjustment: AtomicUsize, max_queue_size: usize, max_mem_use: usize, } @@ -157,6 +192,7 @@ struct Verification { more_to_verify: SMutex<()>, empty: SMutex<()>, sizes: Sizes, + check_seal: bool, } impl VerificationQueue { @@ -173,7 +209,8 @@ impl VerificationQueue { unverified: AtomicUsize::new(0), verifying: AtomicUsize::new(0), verified: AtomicUsize::new(0), - } + }, + check_seal: check_seal, }); let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); @@ -185,44 +222,82 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let mut verifiers: Vec> = Vec::new(); - let thread_count = max(::num_cpus::get(), 3) - 2; - for i in 0..thread_count { - let verification = verification.clone(); - let engine = engine.clone(); - let more_to_verify = more_to_verify.clone(); - let ready_signal = ready_signal.clone(); - let empty = empty.clone(); + let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); + let default_amount = max(::num_cpus::get(), 3) - 2; + let mut verifiers = Vec::with_capacity(max_verifiers); + + debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); + + for i in 0..max_verifiers { + debug!(target: "verification", "Adding verification thread #{}", i); + let deleting = deleting.clone(); let panic_handler = panic_handler.clone(); - verifiers.push( - thread::Builder::new() - .name(format!("Verifier #{}", i)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty, check_seal) - }).unwrap() - }) - .expect("Error starting block verification thread") - ); + let verification = verification.clone(); + let engine = engine.clone(); + let wait = more_to_verify.clone(); + let ready = ready_signal.clone(); + let empty = empty.clone(); + + // enable only the first few verifiers. + let sleep = if i < default_amount { + Arc::new(AtomicBool::new(false)) + } else { + Arc::new(AtomicBool::new(true)) + }; + + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + sleep: sleep.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep) + }).unwrap() + }) + .expect("Failed to create verifier thread.") + }); } + VerificationQueue { engine: engine, panic_handler: panic_handler, - ready_signal: ready_signal.clone(), - more_to_verify: more_to_verify.clone(), - verification: verification.clone(), - verifiers: verifiers, - deleting: deleting.clone(), + ready_signal: ready_signal, + more_to_verify: more_to_verify, + verification: verification, + verifiers: Mutex::new((verifiers, default_amount)), + deleting: deleting, processing: RwLock::new(HashSet::new()), - empty: empty.clone(), + empty: empty, + ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), } } - fn verify(verification: Arc>, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc, check_seal: bool) { + fn verify( + verification: Arc>, + engine: Arc, + wait: Arc, + ready: Arc, + deleting: Arc, + empty: Arc, + sleep: Arc, + ) { while !deleting.load(AtomicOrdering::Acquire) { + { + while sleep.load(AtomicOrdering::SeqCst) { + trace!(target: "verification", "Verifier sleeping"); + ::std::thread::park(); + trace!(target: "verification", "Verifier waking up"); + + if deleting.load(AtomicOrdering::Acquire) { + return; + } + } + } + { let mut more_to_verify = verification.more_to_verify.lock().unwrap(); @@ -255,7 +330,7 @@ impl VerificationQueue { }; let hash = item.hash(); - let is_ready = match K::verify(item, &*engine, check_seal) { + let is_ready = match K::verify(item, &*engine, verification.check_seal) { Ok(verified) => { let mut verifying = verification.verifying.lock(); let mut idx = None; @@ -302,9 +377,15 @@ impl VerificationQueue { } } - fn drain_verifying(verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet, sizes: &Sizes) { + fn drain_verifying( + verifying: &mut VecDeque>, + verified: &mut VecDeque, + bad: &mut HashSet, + sizes: &Sizes, + ) { let mut removed_size = 0; let mut inserted_size = 0; + while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { assert!(verifying.pop_front().is_some()); let size = output.heap_size_of_children(); @@ -487,14 +568,85 @@ impl VerificationQueue { } } - /// Optimise memory footprint of the heap fields. + /// Optimise memory footprint of the heap fields, and adjust the number of threads + /// to better suit the workload. pub fn collect_garbage(&self) { - { - self.verification.unverified.lock().shrink_to_fit(); + // number of ticks to average queue stats over + // when deciding whether to change the number of verifiers. + #[cfg(not(test))] + const READJUSTMENT_PERIOD: usize = 12; + + #[cfg(test)] + const READJUSTMENT_PERIOD: usize = 1; + + let (u_len, v_len) = { + let u_len = { + let mut q = self.verification.unverified.lock(); + q.shrink_to_fit(); + q.len() + }; self.verification.verifying.lock().shrink_to_fit(); - self.verification.verified.lock().shrink_to_fit(); - } + + let v_len = { + let mut q = self.verification.verified.lock(); + q.shrink_to_fit(); + q.len() + }; + + (u_len as isize, v_len as isize) + }; + self.processing.write().shrink_to_fit(); + + if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD { + self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); + } else { + return; + } + + let current = self.verifiers.lock().1; + + let diff = (v_len - u_len).abs(); + let total = v_len + u_len; + + self.scale_verifiers( + if u_len < 20 { + 1 + } else if diff <= total / 10 { + current + } else if v_len > u_len { + current - 1 + } else { + current + 1 + } + ); + } + + // wake up or sleep verifiers to get as close to the target as + // possible, never going over the amount of initially allocated threads + // or below 1. + fn scale_verifiers(&self, target: usize) { + let mut verifiers = self.verifiers.lock(); + let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers; + + let target = min(verifiers.len(), target); + let target = max(1, target); + + debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target); + + // scaling up + for i in *verifier_count..target { + debug!(target: "verification", "Waking up verifier {}", i); + verifiers[i].wake_up(); + } + + // scaling down. + for i in target..*verifier_count { + debug!(target: "verification", "Putting verifier {} to sleep", i); + verifiers[i].sleep(); + } + + *verifier_count = target; } } @@ -509,10 +661,23 @@ impl Drop for VerificationQueue { trace!(target: "shutdown", "[VerificationQueue] Closing..."); self.clear(); self.deleting.store(true, AtomicOrdering::Release); - self.more_to_verify.notify_all(); - for t in self.verifiers.drain(..) { - t.join().unwrap(); + + let mut verifiers = self.verifiers.get_mut(); + let mut verifiers = &mut verifiers.0; + + // first pass to signal conclusion. must be done before + // notify or deadlock possible. + for handle in verifiers.iter() { + handle.conclude(); } + + self.more_to_verify.notify_all(); + + // second pass to join. + for handle in verifiers.drain(..) { + handle.join(); + } + trace!(target: "shutdown", "[VerificationQueue] Closed."); } } @@ -611,4 +776,56 @@ mod tests { } assert!(queue.queue_info().is_full()); } + + #[test] + fn scaling_limits() { + use super::MAX_VERIFIERS; + + let queue = get_test_queue(); + queue.scale_verifiers(MAX_VERIFIERS + 1); + + assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1); + + queue.scale_verifiers(0); + + assert!(queue.verifiers.lock().1 == 1); + } + + #[test] + fn readjust_verifiers() { + let queue = get_test_queue(); + + // put all the verifiers to sleep to ensure + // the test isn't timing sensitive. + let num_verifiers = { + let verifiers = queue.verifiers.lock(); + for i in 0..verifiers.1 { + verifiers.0[i].sleep(); + } + + verifiers.1 + }; + + for block in get_good_dummy_block_seq(5000) { + queue.import(Unverified::new(block)).expect("Block good by definition; qed"); + } + + // almost all unverified == bump verifier count. + queue.collect_garbage(); + assert_eq!(queue.verifiers.lock().1, num_verifiers + 1); + + // wake them up again and verify everything. + { + let verifiers = queue.verifiers.lock(); + for i in 0..verifiers.1 { + verifiers.0[i].wake_up(); + } + } + + queue.flush(); + + // nothing to verify == use minimum number of verifiers. + queue.collect_garbage(); + assert_eq!(queue.verifiers.lock().1, 1); + } }