rewrite scaling logic
This commit is contained in:
parent
546cd00659
commit
8c0e511ebe
@ -191,9 +191,6 @@ struct Verification<K: Kind> {
|
|||||||
bad: Mutex<HashSet<H256>>,
|
bad: Mutex<HashSet<H256>>,
|
||||||
more_to_verify: SMutex<()>,
|
more_to_verify: SMutex<()>,
|
||||||
empty: SMutex<()>,
|
empty: SMutex<()>,
|
||||||
verified_count: AtomicUsize,
|
|
||||||
drained: AtomicUsize,
|
|
||||||
imported: AtomicUsize,
|
|
||||||
sizes: Sizes,
|
sizes: Sizes,
|
||||||
check_seal: bool,
|
check_seal: bool,
|
||||||
}
|
}
|
||||||
@ -208,9 +205,6 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
bad: Mutex::new(HashSet::new()),
|
bad: Mutex::new(HashSet::new()),
|
||||||
more_to_verify: SMutex::new(()),
|
more_to_verify: SMutex::new(()),
|
||||||
empty: SMutex::new(()),
|
empty: SMutex::new(()),
|
||||||
verified_count: AtomicUsize::new(0),
|
|
||||||
drained: AtomicUsize::new(0),
|
|
||||||
imported: AtomicUsize::new(0),
|
|
||||||
sizes: Sizes {
|
sizes: Sizes {
|
||||||
unverified: AtomicUsize::new(0),
|
unverified: AtomicUsize::new(0),
|
||||||
verifying: AtomicUsize::new(0),
|
verifying: AtomicUsize::new(0),
|
||||||
@ -355,7 +349,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
// we're next!
|
// we're next!
|
||||||
let mut verified = verification.verified.lock();
|
let mut verified = verification.verified.lock();
|
||||||
let mut bad = verification.bad.lock();
|
let mut bad = verification.bad.lock();
|
||||||
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count, &verification.sizes);
|
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
@ -370,7 +364,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
verifying.retain(|e| e.hash != hash);
|
verifying.retain(|e| e.hash != hash);
|
||||||
|
|
||||||
if verifying.front().map_or(false, |x| x.output.is_some()) {
|
if verifying.front().map_or(false, |x| x.output.is_some()) {
|
||||||
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count, &verification.sizes);
|
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
@ -388,10 +382,8 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
verifying: &mut VecDeque<Verifying<K>>,
|
verifying: &mut VecDeque<Verifying<K>>,
|
||||||
verified: &mut VecDeque<K::Verified>,
|
verified: &mut VecDeque<K::Verified>,
|
||||||
bad: &mut HashSet<H256>,
|
bad: &mut HashSet<H256>,
|
||||||
v_count: &AtomicUsize,
|
|
||||||
sizes: &Sizes,
|
sizes: &Sizes,
|
||||||
) {
|
) {
|
||||||
let start_len = verified.len();
|
|
||||||
let mut removed_size = 0;
|
let mut removed_size = 0;
|
||||||
let mut inserted_size = 0;
|
let mut inserted_size = 0;
|
||||||
|
|
||||||
@ -408,7 +400,6 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
v_count.fetch_add(verified.len() - start_len, AtomicOrdering::AcqRel);
|
|
||||||
sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst);
|
sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst);
|
||||||
sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst);
|
sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst);
|
||||||
}
|
}
|
||||||
@ -474,7 +465,6 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
|
|
||||||
self.processing.write().insert(h.clone());
|
self.processing.write().insert(h.clone());
|
||||||
self.verification.unverified.lock().push_back(item);
|
self.verification.unverified.lock().push_back(item);
|
||||||
self.verification.imported.fetch_add(1, AtomicOrdering::AcqRel);
|
|
||||||
self.more_to_verify.notify_all();
|
self.more_to_verify.notify_all();
|
||||||
Ok(h)
|
Ok(h)
|
||||||
},
|
},
|
||||||
@ -536,8 +526,6 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
let count = min(max, verified.len());
|
let count = min(max, verified.len());
|
||||||
let result = verified.drain(..count).collect::<Vec<_>>();
|
let result = verified.drain(..count).collect::<Vec<_>>();
|
||||||
|
|
||||||
self.verification.drained.fetch_add(result.len(), AtomicOrdering::AcqRel);
|
|
||||||
|
|
||||||
let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c);
|
let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c);
|
||||||
self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst);
|
self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst);
|
||||||
|
|
||||||
@ -588,11 +576,22 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
// when deciding whether to change the number of verifiers.
|
// when deciding whether to change the number of verifiers.
|
||||||
const READJUSTMENT_PERIOD: usize = 12;
|
const READJUSTMENT_PERIOD: usize = 12;
|
||||||
|
|
||||||
{
|
let (u_len, v_len) = {
|
||||||
self.verification.unverified.lock().shrink_to_fit();
|
let u_len = {
|
||||||
|
let mut q = self.verification.unverified.lock();
|
||||||
|
q.shrink_to_fit();
|
||||||
|
q.len()
|
||||||
|
};
|
||||||
self.verification.verifying.lock().shrink_to_fit();
|
self.verification.verifying.lock().shrink_to_fit();
|
||||||
self.verification.verified.lock().shrink_to_fit();
|
|
||||||
}
|
let v_len = {
|
||||||
|
let mut q = self.verification.verified.lock();
|
||||||
|
q.shrink_to_fit();
|
||||||
|
q.len()
|
||||||
|
};
|
||||||
|
|
||||||
|
(u_len as isize, v_len as isize)
|
||||||
|
};
|
||||||
|
|
||||||
self.processing.write().shrink_to_fit();
|
self.processing.write().shrink_to_fit();
|
||||||
|
|
||||||
@ -603,31 +602,22 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire);
|
let current = self.verifiers.lock().1;
|
||||||
let drained = self.verification.drained.load(AtomicOrdering::Acquire);
|
|
||||||
let imported = self.verification.imported.load(AtomicOrdering::Acquire);
|
|
||||||
|
|
||||||
self.verification.verified_count.store(0, AtomicOrdering::Release);
|
let diff = (v_len - u_len).abs();
|
||||||
self.verification.drained.store(0, AtomicOrdering::Release);
|
let total = v_len + u_len;
|
||||||
self.verification.imported.store(0, AtomicOrdering::Release);
|
|
||||||
|
|
||||||
// select which side of the queue is the bottleneck.
|
self.scale_verifiers(
|
||||||
let target = min(drained, imported);
|
if u_len < 20 {
|
||||||
|
1
|
||||||
// compute the average rate of verification per thread and determine
|
} else if diff <= total / 10 {
|
||||||
// how many are necessary to match the rate of draining.
|
current
|
||||||
let num_verifiers = self.verifiers.lock().1;
|
} else if v_len > u_len {
|
||||||
let v_count_per = v_count as f64 / num_verifiers as f64;
|
current - 1
|
||||||
let needed = if v_count < 20 {
|
} else {
|
||||||
1
|
current + 1
|
||||||
} else {
|
}
|
||||||
(target as f64 / v_count_per as f64).ceil() as usize
|
);
|
||||||
};
|
|
||||||
|
|
||||||
trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers",
|
|
||||||
v_count_per, target, needed);
|
|
||||||
|
|
||||||
self.scale_verifiers(needed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// wake up or sleep verifiers to get as close to the target as
|
// wake up or sleep verifiers to get as close to the target as
|
||||||
|
Loading…
Reference in New Issue
Block a user