balance rates of draining and importing

This commit is contained in:
Robert Habermeier 2016-10-05 12:10:28 +02:00
parent abbf3b3c58
commit 8a5576d133

View File

@ -154,6 +154,7 @@ struct Verification<K: Kind> {
empty: SMutex<()>, empty: SMutex<()>,
verified_count: AtomicUsize, verified_count: AtomicUsize,
drained: AtomicUsize, drained: AtomicUsize,
imported: AtomicUsize,
} }
impl<K: Kind> VerificationQueue<K> { impl<K: Kind> VerificationQueue<K> {
@ -168,6 +169,7 @@ impl<K: Kind> VerificationQueue<K> {
empty: SMutex::new(()), empty: SMutex::new(()),
verified_count: AtomicUsize::new(0), verified_count: AtomicUsize::new(0),
drained: AtomicUsize::new(0), drained: AtomicUsize::new(0),
imported: AtomicUsize::new(0),
}); });
let more_to_verify = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
@ -345,6 +347,7 @@ impl<K: Kind> VerificationQueue<K> {
Ok(item) => { Ok(item) => {
self.processing.write().insert(h.clone()); self.processing.write().insert(h.clone());
self.verification.unverified.lock().push_back(item); self.verification.unverified.lock().push_back(item);
self.verification.imported.fetch_add(1, AtomicOrdering::AcqRel);
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
Ok(h) Ok(h)
}, },
@ -463,9 +466,14 @@ impl<K: Kind> VerificationQueue<K> {
let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire); let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire);
let drained = self.verification.drained.load(AtomicOrdering::Acquire); let drained = self.verification.drained.load(AtomicOrdering::Acquire);
let imported = self.verification.imported.load(AtomicOrdering::Acquire);
self.verification.verified_count.store(0, AtomicOrdering::Release); self.verification.verified_count.store(0, AtomicOrdering::Release);
self.verification.drained.store(0, AtomicOrdering::Release); self.verification.drained.store(0, AtomicOrdering::Release);
self.verification.imported.store(0, AtomicOrdering::Release);
// select which side of the queue is the bottleneck.
let target = min(drained, imported);
// compute the average rate of verification per thread and determine // compute the average rate of verification per thread and determine
// how many are necessary to match the rate of draining. // how many are necessary to match the rate of draining.
@ -474,11 +482,11 @@ impl<K: Kind> VerificationQueue<K> {
let needed = if v_count < 20 { let needed = if v_count < 20 {
1 1
} else { } else {
(drained as f64 / v_count_per as f64).ceil() as usize (target as f64 / v_count_per as f64).ceil() as usize
}; };
trace!(target: "verification", "v_rate_per={}, drained={}, scaling to {} verifiers", trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers",
v_count_per, drained, needed); v_count_per, target, needed);
for _ in num_verifiers..needed { for _ in num_verifiers..needed {
self.add_verifier(); self.add_verifier();