diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index ae0555dfa..ac0dbf592 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -18,7 +18,7 @@ //! Sorts them ready for blockchain insertion. use std::thread::{JoinHandle, self}; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use util::*; use io::*; @@ -113,7 +113,7 @@ pub struct VerificationQueue { ready_signal: Arc, empty: Arc, processing: RwLock>, - rolling_sample: Mutex>, + ticks_since_adjustment: AtomicUsize, max_queue_size: usize, max_mem_use: usize, } @@ -152,6 +152,8 @@ struct Verification { bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, + verified_count: AtomicUsize, + drained: AtomicUsize, } impl VerificationQueue { @@ -164,7 +166,8 @@ impl VerificationQueue { bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), - + verified_count: AtomicUsize::new(0), + drained: AtomicUsize::new(0), }); let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); @@ -186,7 +189,7 @@ impl VerificationQueue { deleting: deleting, processing: RwLock::new(HashSet::new()), empty: empty, - rolling_sample: Mutex::new(VecDeque::new()), + ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), }; @@ -248,7 +251,7 @@ impl VerificationQueue { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count); ready.set(); } }, @@ -261,7 +264,7 @@ impl VerificationQueue { verifying.retain(|e| e.hash != hash); if verifying.front().map_or(false, |x| x.output.is_some()) { - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count); ready.set(); } } @@ -269,7 +272,13 @@ impl VerificationQueue { } } - fn drain_verifying(verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet) { + fn drain_verifying( + verifying: &mut VecDeque>, + verified: &mut VecDeque, + bad: &mut HashSet, + v_count: &AtomicUsize + ) { + let start_len = verified.len(); while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { assert!(verifying.pop_front().is_some()); @@ -279,6 +288,8 @@ impl VerificationQueue { verified.push_back(output); } } + + v_count.fetch_add(verified.len() - start_len, AtomicOrdering::AcqRel); } /// Clear the queue and stop verification activity. @@ -389,6 +400,8 @@ impl VerificationQueue { let count = min(max, verified.len()); let result = verified.drain(..count).collect::>(); + self.verification.drained.fetch_add(count, AtomicOrdering::AcqRel); + self.ready_signal.reset(); if !verified.is_empty() { self.ready_signal.set(); @@ -429,53 +442,49 @@ impl VerificationQueue { /// Optimise memory footprint of the heap fields, and adjust the number of threads /// to better suit the workload. pub fn collect_garbage(&self) { - // thresholds for adding and removing verifier threads - // these are unbalanced since having all blocks verified - // is the desirable position. - const ADD_THREAD_THRESHOLD: usize = 10; - const DEL_THREAD_THRESHOLD: usize = 20; - // number of ticks to average queue stats over // when deciding whether to change the number of verifiers. - const SAMPLE_SIZE: usize = 5; - - let (u_len, v_len) = { - let u_len = { - let mut v = self.verification.unverified.lock(); - v.shrink_to_fit(); - v.len() - }; + const READJUSTMENT_PERIOD: usize = 5; + { + self.verification.unverified.lock().shrink_to_fit(); self.verification.verifying.lock().shrink_to_fit(); + self.verification.verified.lock().shrink_to_fit(); + } - let v_len = { - let mut v = self.verification.verified.lock(); - v.shrink_to_fit(); - v.len() - }; - - (u_len, v_len) - }; self.processing.write().shrink_to_fit(); - let (u_len, v_len) = { - let mut sample = self.rolling_sample.lock(); - sample.push_back((u_len, v_len)); + if self.ticks_since_adjustment.load(AtomicOrdering::SeqCst) == READJUSTMENT_PERIOD { + self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); + } else { + self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst); + return; + } - if sample.len() > SAMPLE_SIZE { - let _ = sample.pop_front(); - } + let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire); + let drained = self.verification.drained.load(AtomicOrdering::Acquire); - sample.iter().cloned().fold((0, 0), |(u_t, v_t), (u_i, v_i)| (u_t + u_i, v_t + v_i)) + self.verification.verified_count.store(0, AtomicOrdering::Release); + self.verification.drained.store(0, AtomicOrdering::Release); + + // compute the average rate of verification per thread and determine + // how many are necessary to match the rate of draining. + let num_verifiers = self.verifiers.lock().len(); + let v_count_per = v_count as f64 / num_verifiers as f64; + let needed = if v_count < 20 { + 1 + } else { + (drained as f64 / v_count_per as f64).ceil() as usize }; - // more than 10x as many unverified as verified. - if v_len * ADD_THREAD_THRESHOLD < u_len { + trace!(target: "verification", "v_rate_per={}, drained={}, scaling to {} verifiers", + v_count_per, drained, needed); + + for _ in num_verifiers..needed { self.add_verifier(); } - // more than 20x as many verified as unverified. - if u_len * DEL_THREAD_THRESHOLD < v_len { + for _ in needed..num_verifiers { self.remove_verifier(); } }