verification-rate based thread scaling
This commit is contained in:
parent
2d28c703d6
commit
abbf3b3c58
@ -18,7 +18,7 @@
|
|||||||
//! Sorts them ready for blockchain insertion.
|
//! Sorts them ready for blockchain insertion.
|
||||||
|
|
||||||
use std::thread::{JoinHandle, self};
|
use std::thread::{JoinHandle, self};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
|
||||||
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||||
use util::*;
|
use util::*;
|
||||||
use io::*;
|
use io::*;
|
||||||
@ -113,7 +113,7 @@ pub struct VerificationQueue<K: Kind> {
|
|||||||
ready_signal: Arc<QueueSignal>,
|
ready_signal: Arc<QueueSignal>,
|
||||||
empty: Arc<SCondvar>,
|
empty: Arc<SCondvar>,
|
||||||
processing: RwLock<HashSet<H256>>,
|
processing: RwLock<HashSet<H256>>,
|
||||||
rolling_sample: Mutex<VecDeque<(usize, usize)>>,
|
ticks_since_adjustment: AtomicUsize,
|
||||||
max_queue_size: usize,
|
max_queue_size: usize,
|
||||||
max_mem_use: usize,
|
max_mem_use: usize,
|
||||||
}
|
}
|
||||||
@ -152,6 +152,8 @@ struct Verification<K: Kind> {
|
|||||||
bad: Mutex<HashSet<H256>>,
|
bad: Mutex<HashSet<H256>>,
|
||||||
more_to_verify: SMutex<()>,
|
more_to_verify: SMutex<()>,
|
||||||
empty: SMutex<()>,
|
empty: SMutex<()>,
|
||||||
|
verified_count: AtomicUsize,
|
||||||
|
drained: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K: Kind> VerificationQueue<K> {
|
impl<K: Kind> VerificationQueue<K> {
|
||||||
@ -164,7 +166,8 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
bad: Mutex::new(HashSet::new()),
|
bad: Mutex::new(HashSet::new()),
|
||||||
more_to_verify: SMutex::new(()),
|
more_to_verify: SMutex::new(()),
|
||||||
empty: SMutex::new(()),
|
empty: SMutex::new(()),
|
||||||
|
verified_count: AtomicUsize::new(0),
|
||||||
|
drained: AtomicUsize::new(0),
|
||||||
});
|
});
|
||||||
let more_to_verify = Arc::new(SCondvar::new());
|
let more_to_verify = Arc::new(SCondvar::new());
|
||||||
let deleting = Arc::new(AtomicBool::new(false));
|
let deleting = Arc::new(AtomicBool::new(false));
|
||||||
@ -186,7 +189,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
deleting: deleting,
|
deleting: deleting,
|
||||||
processing: RwLock::new(HashSet::new()),
|
processing: RwLock::new(HashSet::new()),
|
||||||
empty: empty,
|
empty: empty,
|
||||||
rolling_sample: Mutex::new(VecDeque::new()),
|
ticks_since_adjustment: AtomicUsize::new(0),
|
||||||
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
||||||
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
||||||
};
|
};
|
||||||
@ -248,7 +251,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
// we're next!
|
// we're next!
|
||||||
let mut verified = verification.verified.lock();
|
let mut verified = verification.verified.lock();
|
||||||
let mut bad = verification.bad.lock();
|
let mut bad = verification.bad.lock();
|
||||||
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
|
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count);
|
||||||
ready.set();
|
ready.set();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -261,7 +264,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
verifying.retain(|e| e.hash != hash);
|
verifying.retain(|e| e.hash != hash);
|
||||||
|
|
||||||
if verifying.front().map_or(false, |x| x.output.is_some()) {
|
if verifying.front().map_or(false, |x| x.output.is_some()) {
|
||||||
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
|
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count);
|
||||||
ready.set();
|
ready.set();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -269,7 +272,13 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>) {
|
fn drain_verifying(
|
||||||
|
verifying: &mut VecDeque<Verifying<K>>,
|
||||||
|
verified: &mut VecDeque<K::Verified>,
|
||||||
|
bad: &mut HashSet<H256>,
|
||||||
|
v_count: &AtomicUsize
|
||||||
|
) {
|
||||||
|
let start_len = verified.len();
|
||||||
while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
|
while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
|
||||||
assert!(verifying.pop_front().is_some());
|
assert!(verifying.pop_front().is_some());
|
||||||
|
|
||||||
@ -279,6 +288,8 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
verified.push_back(output);
|
verified.push_back(output);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v_count.fetch_add(verified.len() - start_len, AtomicOrdering::AcqRel);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clear the queue and stop verification activity.
|
/// Clear the queue and stop verification activity.
|
||||||
@ -389,6 +400,8 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
let count = min(max, verified.len());
|
let count = min(max, verified.len());
|
||||||
let result = verified.drain(..count).collect::<Vec<_>>();
|
let result = verified.drain(..count).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
self.verification.drained.fetch_add(count, AtomicOrdering::AcqRel);
|
||||||
|
|
||||||
self.ready_signal.reset();
|
self.ready_signal.reset();
|
||||||
if !verified.is_empty() {
|
if !verified.is_empty() {
|
||||||
self.ready_signal.set();
|
self.ready_signal.set();
|
||||||
@ -429,53 +442,49 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
/// Optimise memory footprint of the heap fields, and adjust the number of threads
|
/// Optimise memory footprint of the heap fields, and adjust the number of threads
|
||||||
/// to better suit the workload.
|
/// to better suit the workload.
|
||||||
pub fn collect_garbage(&self) {
|
pub fn collect_garbage(&self) {
|
||||||
// thresholds for adding and removing verifier threads
|
|
||||||
// these are unbalanced since having all blocks verified
|
|
||||||
// is the desirable position.
|
|
||||||
const ADD_THREAD_THRESHOLD: usize = 10;
|
|
||||||
const DEL_THREAD_THRESHOLD: usize = 20;
|
|
||||||
|
|
||||||
// number of ticks to average queue stats over
|
// number of ticks to average queue stats over
|
||||||
// when deciding whether to change the number of verifiers.
|
// when deciding whether to change the number of verifiers.
|
||||||
const SAMPLE_SIZE: usize = 5;
|
const READJUSTMENT_PERIOD: usize = 5;
|
||||||
|
|
||||||
let (u_len, v_len) = {
|
|
||||||
let u_len = {
|
|
||||||
let mut v = self.verification.unverified.lock();
|
|
||||||
v.shrink_to_fit();
|
|
||||||
v.len()
|
|
||||||
};
|
|
||||||
|
|
||||||
|
{
|
||||||
|
self.verification.unverified.lock().shrink_to_fit();
|
||||||
self.verification.verifying.lock().shrink_to_fit();
|
self.verification.verifying.lock().shrink_to_fit();
|
||||||
|
self.verification.verified.lock().shrink_to_fit();
|
||||||
|
}
|
||||||
|
|
||||||
let v_len = {
|
|
||||||
let mut v = self.verification.verified.lock();
|
|
||||||
v.shrink_to_fit();
|
|
||||||
v.len()
|
|
||||||
};
|
|
||||||
|
|
||||||
(u_len, v_len)
|
|
||||||
};
|
|
||||||
self.processing.write().shrink_to_fit();
|
self.processing.write().shrink_to_fit();
|
||||||
|
|
||||||
let (u_len, v_len) = {
|
if self.ticks_since_adjustment.load(AtomicOrdering::SeqCst) == READJUSTMENT_PERIOD {
|
||||||
let mut sample = self.rolling_sample.lock();
|
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
|
||||||
sample.push_back((u_len, v_len));
|
} else {
|
||||||
|
self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if sample.len() > SAMPLE_SIZE {
|
let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire);
|
||||||
let _ = sample.pop_front();
|
let drained = self.verification.drained.load(AtomicOrdering::Acquire);
|
||||||
}
|
|
||||||
|
|
||||||
sample.iter().cloned().fold((0, 0), |(u_t, v_t), (u_i, v_i)| (u_t + u_i, v_t + v_i))
|
self.verification.verified_count.store(0, AtomicOrdering::Release);
|
||||||
|
self.verification.drained.store(0, AtomicOrdering::Release);
|
||||||
|
|
||||||
|
// compute the average rate of verification per thread and determine
|
||||||
|
// how many are necessary to match the rate of draining.
|
||||||
|
let num_verifiers = self.verifiers.lock().len();
|
||||||
|
let v_count_per = v_count as f64 / num_verifiers as f64;
|
||||||
|
let needed = if v_count < 20 {
|
||||||
|
1
|
||||||
|
} else {
|
||||||
|
(drained as f64 / v_count_per as f64).ceil() as usize
|
||||||
};
|
};
|
||||||
|
|
||||||
// more than 10x as many unverified as verified.
|
trace!(target: "verification", "v_rate_per={}, drained={}, scaling to {} verifiers",
|
||||||
if v_len * ADD_THREAD_THRESHOLD < u_len {
|
v_count_per, drained, needed);
|
||||||
|
|
||||||
|
for _ in num_verifiers..needed {
|
||||||
self.add_verifier();
|
self.add_verifier();
|
||||||
}
|
}
|
||||||
|
|
||||||
// more than 20x as many verified as unverified.
|
for _ in needed..num_verifiers {
|
||||||
if u_len * DEL_THREAD_THRESHOLD < v_len {
|
|
||||||
self.remove_verifier();
|
self.remove_verifier();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user