From 2d28c703d6a98586df5744e52a1de77aa59501c7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 3 Oct 2016 20:36:49 +0200 Subject: [PATCH] reuse add_verifier instrumentation, rolling sample of 5 ticks --- ethcore/src/verification/queue/mod.rs | 53 ++++++++++++++------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 3db7135a6..ae0555dfa 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -113,6 +113,7 @@ pub struct VerificationQueue { ready_signal: Arc, empty: Arc, processing: RwLock>, + rolling_sample: Mutex>, max_queue_size: usize, max_mem_use: usize, } @@ -175,41 +176,27 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let mut verifiers: Vec = Vec::new(); - let thread_count = max(::num_cpus::get(), 3) - 2; - for i in 0..thread_count { - let verification = verification.clone(); - let engine = engine.clone(); - let more_to_verify = more_to_verify.clone(); - let ready_signal = ready_signal.clone(); - let empty = empty.clone(); - let deleting = Arc::new(AtomicBool::new(false)); - let panic_handler = panic_handler.clone(); - verifiers.push(VerifierHandle { - deleting: deleting.clone(), - thread: thread::Builder::new() - .name(format!("Verifier #{}", i)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) - }).unwrap() - }) - .expect("Error starting block verification thread") - }); - } - VerificationQueue { + let queue = VerificationQueue { engine: engine, panic_handler: panic_handler, ready_signal: ready_signal, more_to_verify: more_to_verify, verification: verification, - verifiers: Mutex::new(verifiers), + verifiers: Mutex::new(Vec::with_capacity(::num_cpus::get())), deleting: deleting, processing: RwLock::new(HashSet::new()), empty: empty, + rolling_sample: Mutex::new(VecDeque::new()), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), + }; + + let thread_count = max(::num_cpus::get(), 3) - 2; + for _ in 0..thread_count { + queue.add_verifier(); } + + queue } fn verify(verification: Arc>, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { @@ -448,7 +435,10 @@ impl VerificationQueue { const ADD_THREAD_THRESHOLD: usize = 10; const DEL_THREAD_THRESHOLD: usize = 20; - // TODO: sample over 5 or so ticks. + // number of ticks to average queue stats over + // when deciding whether to change the number of verifiers. + const SAMPLE_SIZE: usize = 5; + let (u_len, v_len) = { let u_len = { let mut v = self.verification.unverified.lock(); @@ -468,6 +458,17 @@ impl VerificationQueue { }; self.processing.write().shrink_to_fit(); + let (u_len, v_len) = { + let mut sample = self.rolling_sample.lock(); + sample.push_back((u_len, v_len)); + + if sample.len() > SAMPLE_SIZE { + let _ = sample.pop_front(); + } + + sample.iter().cloned().fold((0, 0), |(u_t, v_t), (u_i, v_i)| (u_t + u_i, v_t + v_i)) + }; + // more than 10x as many unverified as verified. if v_len * ADD_THREAD_THRESHOLD < u_len { self.add_verifier(); @@ -520,7 +521,7 @@ impl VerificationQueue { return; } - debug!(target: "verification", "Removing verification thread #{}", len + 1); + debug!(target: "verification", "Removing verification thread #{}", len - 1); if let Some(handle) = verifiers.pop() { handle.conclude();