auto-adjust number of verification threads

This commit is contained in:
Robert Habermeier 2016-10-03 19:41:00 +02:00
parent e1d3b3fff8
commit 2d907f3322

View File

@ -61,6 +61,24 @@ impl Default for Config {
} }
} }
struct VerifierHandle {
deleting: Arc<AtomicBool>,
thread: JoinHandle<()>,
}
impl VerifierHandle {
// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.deleting.store(true, AtomicOrdering::Release);
}
// join the verifier thread.
fn join(self) {
self.thread.join().unwrap();
}
}
/// An item which is in the process of being verified. /// An item which is in the process of being verified.
pub struct Verifying<K: Kind> { pub struct Verifying<K: Kind> {
hash: H256, hash: H256,
@ -90,7 +108,7 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>, engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>, more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
verifiers: Vec<JoinHandle<()>>, verifiers: Mutex<Vec<VerifierHandle>>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
@ -157,7 +175,7 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<VerifierHandle> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count { for i in 0..thread_count {
let verification = verification.clone(); let verification = verification.clone();
@ -165,10 +183,11 @@ impl<K: Kind> VerificationQueue<K> {
let more_to_verify = more_to_verify.clone(); let more_to_verify = more_to_verify.clone();
let ready_signal = ready_signal.clone(); let ready_signal = ready_signal.clone();
let empty = empty.clone(); let empty = empty.clone();
let deleting = deleting.clone(); let deleting = Arc::new(AtomicBool::new(false));
let panic_handler = panic_handler.clone(); let panic_handler = panic_handler.clone();
verifiers.push( verifiers.push(VerifierHandle {
thread::Builder::new() deleting: deleting.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", i)) .name(format!("Verifier #{}", i))
.spawn(move || { .spawn(move || {
panic_handler.catch_panic(move || { panic_handler.catch_panic(move || {
@ -176,18 +195,18 @@ impl<K: Kind> VerificationQueue<K> {
}).unwrap() }).unwrap()
}) })
.expect("Error starting block verification thread") .expect("Error starting block verification thread")
); });
} }
VerificationQueue { VerificationQueue {
engine: engine, engine: engine,
panic_handler: panic_handler, panic_handler: panic_handler,
ready_signal: ready_signal.clone(), ready_signal: ready_signal,
more_to_verify: more_to_verify.clone(), more_to_verify: more_to_verify,
verification: verification.clone(), verification: verification,
verifiers: verifiers, verifiers: Mutex::new(verifiers),
deleting: deleting.clone(), deleting: deleting,
processing: RwLock::new(HashSet::new()), processing: RwLock::new(HashSet::new()),
empty: empty.clone(), empty: empty,
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
} }
@ -420,14 +439,93 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
/// Optimise memory footprint of the heap fields. /// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload.
pub fn collect_garbage(&self) { pub fn collect_garbage(&self) {
{ // thresholds for adding and removing verifier threads
self.verification.unverified.lock().shrink_to_fit(); // these are unbalanced since having all blocks verified
// is the desirable position.
const ADD_THREAD_THRESHOLD: usize = 10;
const DEL_THREAD_THRESHOLD: usize = 20;
// TODO: sample over 5 or so ticks.
let (u_len, v_len) = {
let u_len = {
let mut v = self.verification.unverified.lock();
v.shrink_to_fit();
v.len()
};
self.verification.verifying.lock().shrink_to_fit(); self.verification.verifying.lock().shrink_to_fit();
self.verification.verified.lock().shrink_to_fit();
} let v_len = {
let mut v = self.verification.verified.lock();
v.shrink_to_fit();
v.len()
};
(u_len, v_len)
};
self.processing.write().shrink_to_fit(); self.processing.write().shrink_to_fit();
// more than 10x as many unverified as verified.
if v_len * ADD_THREAD_THRESHOLD < u_len {
self.add_verifier();
}
// more than 20x as many verified as unverified.
if u_len * DEL_THREAD_THRESHOLD < v_len {
self.remove_verifier();
}
}
// add a verifier thread if possible.
fn add_verifier(&self) {
let mut verifiers = self.verifiers.lock();
let len = verifiers.len();
if len == ::num_cpus::get() {
return;
}
debug!(target: "verification", "Adding verification thread #{}", len);
let deleting = Arc::new(AtomicBool::new(false));
let panic_handler = self.panic_handler.clone();
let verification = self.verification.clone();
let engine = self.engine.clone();
let wait = self.more_to_verify.clone();
let ready = self.ready_signal.clone();
let empty = self.empty.clone();
verifiers.push(VerifierHandle {
deleting: deleting.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", len))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty)
}).unwrap()
})
.expect("Failed to create verifier thread.")
});
}
// remove a verifier thread if possible.
fn remove_verifier(&self) {
let mut verifiers = self.verifiers.lock();
let len = verifiers.len();
if len == 1 {
return;
}
debug!(target: "verification", "Removing verification thread #{}", len);
if let Some(handle) = verifiers.pop() {
handle.conclude();
self.more_to_verify.notify_all(); // to ensure it's joinable immediately.
handle.join();
}
} }
} }
@ -442,10 +540,22 @@ impl<K: Kind> Drop for VerificationQueue<K> {
trace!(target: "shutdown", "[VerificationQueue] Closing..."); trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear(); self.clear();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) { let mut verifiers = self.verifiers.lock();
t.join().unwrap();
// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
} }
self.more_to_verify.notify_all();
// second pass to join.
for handle in verifiers.drain(..) {
handle.join();
}
trace!(target: "shutdown", "[VerificationQueue] Closed."); trace!(target: "shutdown", "[VerificationQueue] Closed.");
} }
} }