diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index f4a9287d6..b4f2ab5a2 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -191,9 +191,6 @@ struct Verification { bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, - verified_count: AtomicUsize, - drained: AtomicUsize, - imported: AtomicUsize, sizes: Sizes, check_seal: bool, } @@ -208,9 +205,6 @@ impl VerificationQueue { bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), - verified_count: AtomicUsize::new(0), - drained: AtomicUsize::new(0), - imported: AtomicUsize::new(0), sizes: Sizes { unverified: AtomicUsize::new(0), verifying: AtomicUsize::new(0), @@ -355,7 +349,7 @@ impl VerificationQueue { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count, &verification.sizes); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); true } else { false @@ -370,7 +364,7 @@ impl VerificationQueue { verifying.retain(|e| e.hash != hash); if verifying.front().map_or(false, |x| x.output.is_some()) { - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count, &verification.sizes); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); true } else { false @@ -388,10 +382,8 @@ impl VerificationQueue { verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet, - v_count: &AtomicUsize, sizes: &Sizes, ) { - let start_len = verified.len(); let mut removed_size = 0; let mut inserted_size = 0; @@ -408,7 +400,6 @@ impl VerificationQueue { } } - v_count.fetch_add(verified.len() - start_len, AtomicOrdering::AcqRel); sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst); sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst); } @@ -474,7 +465,6 @@ impl VerificationQueue { self.processing.write().insert(h.clone()); self.verification.unverified.lock().push_back(item); - self.verification.imported.fetch_add(1, AtomicOrdering::AcqRel); self.more_to_verify.notify_all(); Ok(h) }, @@ -536,8 +526,6 @@ impl VerificationQueue { let count = min(max, verified.len()); let result = verified.drain(..count).collect::>(); - self.verification.drained.fetch_add(result.len(), AtomicOrdering::AcqRel); - let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c); self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst); @@ -588,11 +576,22 @@ impl VerificationQueue { // when deciding whether to change the number of verifiers. const READJUSTMENT_PERIOD: usize = 12; - { - self.verification.unverified.lock().shrink_to_fit(); + let (u_len, v_len) = { + let u_len = { + let mut q = self.verification.unverified.lock(); + q.shrink_to_fit(); + q.len() + }; self.verification.verifying.lock().shrink_to_fit(); - self.verification.verified.lock().shrink_to_fit(); - } + + let v_len = { + let mut q = self.verification.verified.lock(); + q.shrink_to_fit(); + q.len() + }; + + (u_len as isize, v_len as isize) + }; self.processing.write().shrink_to_fit(); @@ -603,31 +602,22 @@ impl VerificationQueue { return; } - let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire); - let drained = self.verification.drained.load(AtomicOrdering::Acquire); - let imported = self.verification.imported.load(AtomicOrdering::Acquire); + let current = self.verifiers.lock().1; - self.verification.verified_count.store(0, AtomicOrdering::Release); - self.verification.drained.store(0, AtomicOrdering::Release); - self.verification.imported.store(0, AtomicOrdering::Release); + let diff = (v_len - u_len).abs(); + let total = v_len + u_len; - // select which side of the queue is the bottleneck. - let target = min(drained, imported); - - // compute the average rate of verification per thread and determine - // how many are necessary to match the rate of draining. - let num_verifiers = self.verifiers.lock().1; - let v_count_per = v_count as f64 / num_verifiers as f64; - let needed = if v_count < 20 { - 1 - } else { - (target as f64 / v_count_per as f64).ceil() as usize - }; - - trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers", - v_count_per, target, needed); - - self.scale_verifiers(needed); + self.scale_verifiers( + if u_len < 20 { + 1 + } else if diff <= total / 10 { + current + } else if v_len > u_len { + current - 1 + } else { + current + 1 + } + ); } // wake up or sleep verifiers to get as close to the target as