reuse add_verifier instrumentation, rolling sample of 5 ticks
This commit is contained in:
parent
5e382602dd
commit
2d28c703d6
@ -113,6 +113,7 @@ pub struct VerificationQueue<K: Kind> {
|
|||||||
ready_signal: Arc<QueueSignal>,
|
ready_signal: Arc<QueueSignal>,
|
||||||
empty: Arc<SCondvar>,
|
empty: Arc<SCondvar>,
|
||||||
processing: RwLock<HashSet<H256>>,
|
processing: RwLock<HashSet<H256>>,
|
||||||
|
rolling_sample: Mutex<VecDeque<(usize, usize)>>,
|
||||||
max_queue_size: usize,
|
max_queue_size: usize,
|
||||||
max_mem_use: usize,
|
max_mem_use: usize,
|
||||||
}
|
}
|
||||||
@ -175,41 +176,27 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
let empty = Arc::new(SCondvar::new());
|
let empty = Arc::new(SCondvar::new());
|
||||||
let panic_handler = PanicHandler::new_in_arc();
|
let panic_handler = PanicHandler::new_in_arc();
|
||||||
|
|
||||||
let mut verifiers: Vec<VerifierHandle> = Vec::new();
|
let queue = VerificationQueue {
|
||||||
let thread_count = max(::num_cpus::get(), 3) - 2;
|
|
||||||
for i in 0..thread_count {
|
|
||||||
let verification = verification.clone();
|
|
||||||
let engine = engine.clone();
|
|
||||||
let more_to_verify = more_to_verify.clone();
|
|
||||||
let ready_signal = ready_signal.clone();
|
|
||||||
let empty = empty.clone();
|
|
||||||
let deleting = Arc::new(AtomicBool::new(false));
|
|
||||||
let panic_handler = panic_handler.clone();
|
|
||||||
verifiers.push(VerifierHandle {
|
|
||||||
deleting: deleting.clone(),
|
|
||||||
thread: thread::Builder::new()
|
|
||||||
.name(format!("Verifier #{}", i))
|
|
||||||
.spawn(move || {
|
|
||||||
panic_handler.catch_panic(move || {
|
|
||||||
VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)
|
|
||||||
}).unwrap()
|
|
||||||
})
|
|
||||||
.expect("Error starting block verification thread")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
VerificationQueue {
|
|
||||||
engine: engine,
|
engine: engine,
|
||||||
panic_handler: panic_handler,
|
panic_handler: panic_handler,
|
||||||
ready_signal: ready_signal,
|
ready_signal: ready_signal,
|
||||||
more_to_verify: more_to_verify,
|
more_to_verify: more_to_verify,
|
||||||
verification: verification,
|
verification: verification,
|
||||||
verifiers: Mutex::new(verifiers),
|
verifiers: Mutex::new(Vec::with_capacity(::num_cpus::get())),
|
||||||
deleting: deleting,
|
deleting: deleting,
|
||||||
processing: RwLock::new(HashSet::new()),
|
processing: RwLock::new(HashSet::new()),
|
||||||
empty: empty,
|
empty: empty,
|
||||||
|
rolling_sample: Mutex::new(VecDeque::new()),
|
||||||
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
||||||
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
||||||
|
};
|
||||||
|
|
||||||
|
let thread_count = max(::num_cpus::get(), 3) - 2;
|
||||||
|
for _ in 0..thread_count {
|
||||||
|
queue.add_verifier();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queue
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify(verification: Arc<Verification<K>>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
|
fn verify(verification: Arc<Verification<K>>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
|
||||||
@ -448,7 +435,10 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
const ADD_THREAD_THRESHOLD: usize = 10;
|
const ADD_THREAD_THRESHOLD: usize = 10;
|
||||||
const DEL_THREAD_THRESHOLD: usize = 20;
|
const DEL_THREAD_THRESHOLD: usize = 20;
|
||||||
|
|
||||||
// TODO: sample over 5 or so ticks.
|
// number of ticks to average queue stats over
|
||||||
|
// when deciding whether to change the number of verifiers.
|
||||||
|
const SAMPLE_SIZE: usize = 5;
|
||||||
|
|
||||||
let (u_len, v_len) = {
|
let (u_len, v_len) = {
|
||||||
let u_len = {
|
let u_len = {
|
||||||
let mut v = self.verification.unverified.lock();
|
let mut v = self.verification.unverified.lock();
|
||||||
@ -468,6 +458,17 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
};
|
};
|
||||||
self.processing.write().shrink_to_fit();
|
self.processing.write().shrink_to_fit();
|
||||||
|
|
||||||
|
let (u_len, v_len) = {
|
||||||
|
let mut sample = self.rolling_sample.lock();
|
||||||
|
sample.push_back((u_len, v_len));
|
||||||
|
|
||||||
|
if sample.len() > SAMPLE_SIZE {
|
||||||
|
let _ = sample.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
sample.iter().cloned().fold((0, 0), |(u_t, v_t), (u_i, v_i)| (u_t + u_i, v_t + v_i))
|
||||||
|
};
|
||||||
|
|
||||||
// more than 10x as many unverified as verified.
|
// more than 10x as many unverified as verified.
|
||||||
if v_len * ADD_THREAD_THRESHOLD < u_len {
|
if v_len * ADD_THREAD_THRESHOLD < u_len {
|
||||||
self.add_verifier();
|
self.add_verifier();
|
||||||
@ -520,7 +521,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(target: "verification", "Removing verification thread #{}", len + 1);
|
debug!(target: "verification", "Removing verification thread #{}", len - 1);
|
||||||
|
|
||||||
if let Some(handle) = verifiers.pop() {
|
if let Some(handle) = verifiers.pop() {
|
||||||
handle.conclude();
|
handle.conclude();
|
||||||
|
Loading…
Reference in New Issue
Block a user