diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 3f81d53ce..b35b95e2b 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -61,6 +61,24 @@ impl Default for Config { } } +struct VerifierHandle { + deleting: Arc, + thread: JoinHandle<()>, +} + +impl VerifierHandle { + // signal to the verifier thread that it should conclude its + // operations. + fn conclude(&self) { + self.deleting.store(true, AtomicOrdering::Release); + } + + // join the verifier thread. + fn join(self) { + self.thread.join().unwrap(); + } +} + /// An item which is in the process of being verified. pub struct Verifying { hash: H256, @@ -90,7 +108,7 @@ pub struct VerificationQueue { engine: Arc, more_to_verify: Arc, verification: Arc>, - verifiers: Vec>, + verifiers: Mutex>, deleting: Arc, ready_signal: Arc, empty: Arc, @@ -157,7 +175,7 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let mut verifiers: Vec> = Vec::new(); + let mut verifiers: Vec = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; for i in 0..thread_count { let verification = verification.clone(); @@ -165,29 +183,30 @@ impl VerificationQueue { let more_to_verify = more_to_verify.clone(); let ready_signal = ready_signal.clone(); let empty = empty.clone(); - let deleting = deleting.clone(); + let deleting = Arc::new(AtomicBool::new(false)); let panic_handler = panic_handler.clone(); - verifiers.push( - thread::Builder::new() - .name(format!("Verifier #{}", i)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) - }).unwrap() - }) - .expect("Error starting block verification thread") - ); + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + }).unwrap() + }) + .expect("Error starting block verification thread") + }); } VerificationQueue { engine: engine, panic_handler: panic_handler, - ready_signal: ready_signal.clone(), - more_to_verify: more_to_verify.clone(), - verification: verification.clone(), - verifiers: verifiers, - deleting: deleting.clone(), + ready_signal: ready_signal, + more_to_verify: more_to_verify, + verification: verification, + verifiers: Mutex::new(verifiers), + deleting: deleting, processing: RwLock::new(HashSet::new()), - empty: empty.clone(), + empty: empty, max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), } @@ -420,14 +439,93 @@ impl VerificationQueue { } } - /// Optimise memory footprint of the heap fields. + /// Optimise memory footprint of the heap fields, and adjust the number of threads + /// to better suit the workload. pub fn collect_garbage(&self) { - { - self.verification.unverified.lock().shrink_to_fit(); + // thresholds for adding and removing verifier threads + // these are unbalanced since having all blocks verified + // is the desirable position. + const ADD_THREAD_THRESHOLD: usize = 10; + const DEL_THREAD_THRESHOLD: usize = 20; + + // TODO: sample over 5 or so ticks. + let (u_len, v_len) = { + let u_len = { + let mut v = self.verification.unverified.lock(); + v.shrink_to_fit(); + v.len() + }; + self.verification.verifying.lock().shrink_to_fit(); - self.verification.verified.lock().shrink_to_fit(); - } + + let v_len = { + let mut v = self.verification.verified.lock(); + v.shrink_to_fit(); + v.len() + }; + + (u_len, v_len) + }; self.processing.write().shrink_to_fit(); + + // more than 10x as many unverified as verified. + if v_len * ADD_THREAD_THRESHOLD < u_len { + self.add_verifier(); + } + + // more than 20x as many verified as unverified. + if u_len * DEL_THREAD_THRESHOLD < v_len { + self.remove_verifier(); + } + } + + // add a verifier thread if possible. + fn add_verifier(&self) { + let mut verifiers = self.verifiers.lock(); + let len = verifiers.len(); + if len == ::num_cpus::get() { + return; + } + + debug!(target: "verification", "Adding verification thread #{}", len); + + let deleting = Arc::new(AtomicBool::new(false)); + let panic_handler = self.panic_handler.clone(); + let verification = self.verification.clone(); + let engine = self.engine.clone(); + let wait = self.more_to_verify.clone(); + let ready = self.ready_signal.clone(); + let empty = self.empty.clone(); + + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", len)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, wait, ready, deleting, empty) + }).unwrap() + }) + .expect("Failed to create verifier thread.") + }); + } + + // remove a verifier thread if possible. + fn remove_verifier(&self) { + let mut verifiers = self.verifiers.lock(); + let len = verifiers.len(); + + if len == 1 { + return; + } + + debug!(target: "verification", "Removing verification thread #{}", len); + + if let Some(handle) = verifiers.pop() { + handle.conclude(); + self.more_to_verify.notify_all(); // to ensure it's joinable immediately. + handle.join(); + } } } @@ -442,10 +540,22 @@ impl Drop for VerificationQueue { trace!(target: "shutdown", "[VerificationQueue] Closing..."); self.clear(); self.deleting.store(true, AtomicOrdering::Release); - self.more_to_verify.notify_all(); - for t in self.verifiers.drain(..) { - t.join().unwrap(); + + let mut verifiers = self.verifiers.lock(); + + // first pass to signal conclusion. must be done before + // notify or deadlock possible. + for handle in verifiers.iter() { + handle.conclude(); } + + self.more_to_verify.notify_all(); + + // second pass to join. + for handle in verifiers.drain(..) { + handle.join(); + } + trace!(target: "shutdown", "[VerificationQueue] Closed."); } }