Merge pull request #2445 from ethcore/adaptive_queue_threads

Use an adaptive number of threads in the verification queue
This commit is contained in:
Gav Wood 2016-11-22 17:39:11 +01:00 committed by GitHub
commit 03d3e585af

View File

@ -35,6 +35,9 @@ pub mod kind;
const MIN_MEM_LIMIT: usize = 16384; const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512; const MIN_QUEUE_LIMIT: usize = 512;
// maximum possible number of verification threads.
const MAX_VERIFIERS: usize = 8;
/// Type alias for block queue convenience. /// Type alias for block queue convenience.
pub type BlockQueue = VerificationQueue<self::kind::Blocks>; pub type BlockQueue = VerificationQueue<self::kind::Blocks>;
@ -61,6 +64,37 @@ impl Default for Config {
} }
} }
struct VerifierHandle {
deleting: Arc<AtomicBool>,
sleep: Arc<AtomicBool>,
thread: JoinHandle<()>,
}
impl VerifierHandle {
// signal to the verifier thread that it should sleep.
fn sleep(&self) {
self.sleep.store(true, AtomicOrdering::SeqCst);
}
// signal to the verifier thread that it should wake up.
fn wake_up(&self) {
self.sleep.store(false, AtomicOrdering::SeqCst);
self.thread.thread().unpark();
}
// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.wake_up();
self.deleting.store(true, AtomicOrdering::Release);
}
// join the verifier thread.
fn join(self) {
self.thread.join().expect("Verifier thread panicked");
}
}
/// An item which is in the process of being verified. /// An item which is in the process of being verified.
pub struct Verifying<K: Kind> { pub struct Verifying<K: Kind> {
hash: H256, hash: H256,
@ -97,11 +131,12 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>, engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>, more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
verifiers: Vec<JoinHandle<()>>, verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>, processing: RwLock<HashSet<H256>>,
ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
} }
@ -157,6 +192,7 @@ struct Verification<K: Kind> {
more_to_verify: SMutex<()>, more_to_verify: SMutex<()>,
empty: SMutex<()>, empty: SMutex<()>,
sizes: Sizes, sizes: Sizes,
check_seal: bool,
} }
impl<K: Kind> VerificationQueue<K> { impl<K: Kind> VerificationQueue<K> {
@ -173,7 +209,8 @@ impl<K: Kind> VerificationQueue<K> {
unverified: AtomicUsize::new(0), unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0), verifying: AtomicUsize::new(0),
verified: AtomicUsize::new(0), verified: AtomicUsize::new(0),
} },
check_seal: check_seal,
}); });
let more_to_verify = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
@ -185,44 +222,82 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
let thread_count = max(::num_cpus::get(), 3) - 2; let default_amount = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count { let mut verifiers = Vec::with_capacity(max_verifiers);
let verification = verification.clone();
let engine = engine.clone(); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
let more_to_verify = more_to_verify.clone();
let ready_signal = ready_signal.clone(); for i in 0..max_verifiers {
let empty = empty.clone(); debug!(target: "verification", "Adding verification thread #{}", i);
let deleting = deleting.clone(); let deleting = deleting.clone();
let panic_handler = panic_handler.clone(); let panic_handler = panic_handler.clone();
verifiers.push( let verification = verification.clone();
thread::Builder::new() let engine = engine.clone();
.name(format!("Verifier #{}", i)) let wait = more_to_verify.clone();
.spawn(move || { let ready = ready_signal.clone();
panic_handler.catch_panic(move || { let empty = empty.clone();
VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty, check_seal)
}).unwrap() // enable only the first few verifiers.
}) let sleep = if i < default_amount {
.expect("Error starting block verification thread") Arc::new(AtomicBool::new(false))
); } else {
Arc::new(AtomicBool::new(true))
};
verifiers.push(VerifierHandle {
deleting: deleting.clone(),
sleep: sleep.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
}).unwrap()
})
.expect("Failed to create verifier thread.")
});
} }
VerificationQueue { VerificationQueue {
engine: engine, engine: engine,
panic_handler: panic_handler, panic_handler: panic_handler,
ready_signal: ready_signal.clone(), ready_signal: ready_signal,
more_to_verify: more_to_verify.clone(), more_to_verify: more_to_verify,
verification: verification.clone(), verification: verification,
verifiers: verifiers, verifiers: Mutex::new((verifiers, default_amount)),
deleting: deleting.clone(), deleting: deleting,
processing: RwLock::new(HashSet::new()), processing: RwLock::new(HashSet::new()),
empty: empty.clone(), empty: empty,
ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
} }
} }
fn verify(verification: Arc<Verification<K>>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>, check_seal: bool) { fn verify(
verification: Arc<Verification<K>>,
engine: Arc<Engine>,
wait: Arc<SCondvar>,
ready: Arc<QueueSignal>,
deleting: Arc<AtomicBool>,
empty: Arc<SCondvar>,
sleep: Arc<AtomicBool>,
) {
while !deleting.load(AtomicOrdering::Acquire) { while !deleting.load(AtomicOrdering::Acquire) {
{
while sleep.load(AtomicOrdering::SeqCst) {
trace!(target: "verification", "Verifier sleeping");
::std::thread::park();
trace!(target: "verification", "Verifier waking up");
if deleting.load(AtomicOrdering::Acquire) {
return;
}
}
}
{ {
let mut more_to_verify = verification.more_to_verify.lock().unwrap(); let mut more_to_verify = verification.more_to_verify.lock().unwrap();
@ -255,7 +330,7 @@ impl<K: Kind> VerificationQueue<K> {
}; };
let hash = item.hash(); let hash = item.hash();
let is_ready = match K::verify(item, &*engine, check_seal) { let is_ready = match K::verify(item, &*engine, verification.check_seal) {
Ok(verified) => { Ok(verified) => {
let mut verifying = verification.verifying.lock(); let mut verifying = verification.verifying.lock();
let mut idx = None; let mut idx = None;
@ -302,9 +377,15 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>, sizes: &Sizes) { fn drain_verifying(
verifying: &mut VecDeque<Verifying<K>>,
verified: &mut VecDeque<K::Verified>,
bad: &mut HashSet<H256>,
sizes: &Sizes,
) {
let mut removed_size = 0; let mut removed_size = 0;
let mut inserted_size = 0; let mut inserted_size = 0;
while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
assert!(verifying.pop_front().is_some()); assert!(verifying.pop_front().is_some());
let size = output.heap_size_of_children(); let size = output.heap_size_of_children();
@ -487,14 +568,85 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
/// Optimise memory footprint of the heap fields. /// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload.
pub fn collect_garbage(&self) { pub fn collect_garbage(&self) {
{ // number of ticks to average queue stats over
self.verification.unverified.lock().shrink_to_fit(); // when deciding whether to change the number of verifiers.
#[cfg(not(test))]
const READJUSTMENT_PERIOD: usize = 12;
#[cfg(test)]
const READJUSTMENT_PERIOD: usize = 1;
let (u_len, v_len) = {
let u_len = {
let mut q = self.verification.unverified.lock();
q.shrink_to_fit();
q.len()
};
self.verification.verifying.lock().shrink_to_fit(); self.verification.verifying.lock().shrink_to_fit();
self.verification.verified.lock().shrink_to_fit();
} let v_len = {
let mut q = self.verification.verified.lock();
q.shrink_to_fit();
q.len()
};
(u_len as isize, v_len as isize)
};
self.processing.write().shrink_to_fit(); self.processing.write().shrink_to_fit();
if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else {
return;
}
let current = self.verifiers.lock().1;
let diff = (v_len - u_len).abs();
let total = v_len + u_len;
self.scale_verifiers(
if u_len < 20 {
1
} else if diff <= total / 10 {
current
} else if v_len > u_len {
current - 1
} else {
current + 1
}
);
}
// wake up or sleep verifiers to get as close to the target as
// possible, never going over the amount of initially allocated threads
// or below 1.
fn scale_verifiers(&self, target: usize) {
let mut verifiers = self.verifiers.lock();
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;
let target = min(verifiers.len(), target);
let target = max(1, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);
// scaling up
for i in *verifier_count..target {
debug!(target: "verification", "Waking up verifier {}", i);
verifiers[i].wake_up();
}
// scaling down.
for i in target..*verifier_count {
debug!(target: "verification", "Putting verifier {} to sleep", i);
verifiers[i].sleep();
}
*verifier_count = target;
} }
} }
@ -509,10 +661,23 @@ impl<K: Kind> Drop for VerificationQueue<K> {
trace!(target: "shutdown", "[VerificationQueue] Closing..."); trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear(); self.clear();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) { let mut verifiers = self.verifiers.get_mut();
t.join().unwrap(); let mut verifiers = &mut verifiers.0;
// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
} }
self.more_to_verify.notify_all();
// second pass to join.
for handle in verifiers.drain(..) {
handle.join();
}
trace!(target: "shutdown", "[VerificationQueue] Closed."); trace!(target: "shutdown", "[VerificationQueue] Closed.");
} }
} }
@ -611,4 +776,56 @@ mod tests {
} }
assert!(queue.queue_info().is_full()); assert!(queue.queue_info().is_full());
} }
#[test]
fn scaling_limits() {
use super::MAX_VERIFIERS;
let queue = get_test_queue();
queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);
queue.scale_verifiers(0);
assert!(queue.verifiers.lock().1 == 1);
}
#[test]
fn readjust_verifiers() {
let queue = get_test_queue();
// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.
let num_verifiers = {
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].sleep();
}
verifiers.1
};
for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed");
}
// almost all unverified == bump verifier count.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1);
// wake them up again and verify everything.
{
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].wake_up();
}
}
queue.flush();
// nothing to verify == use minimum number of verifiers.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, 1);
}
} }