Merge branch 'master' into opt-in-verifier-scaling

This commit is contained in:
Robert Habermeier
2016-12-06 15:37:10 +01:00
208 changed files with 2157 additions and 2253 deletions

View File

@@ -17,7 +17,7 @@
//! A queue of blocks. Sits between network or other I/O and the `BlockChain`.
//! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self};
use std::thread::{self, JoinHandle};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*;
@@ -86,35 +86,11 @@ impl Default for VerifierSettings {
}
}
struct VerifierHandle {
deleting: Arc<AtomicBool>,
sleep: Arc<AtomicBool>,
thread: JoinHandle<()>,
}
impl VerifierHandle {
// signal to the verifier thread that it should sleep.
fn sleep(&self) {
self.sleep.store(true, AtomicOrdering::SeqCst);
}
// signal to the verifier thread that it should wake up.
fn wake_up(&self) {
self.sleep.store(false, AtomicOrdering::SeqCst);
self.thread.thread().unpark();
}
// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.wake_up();
self.deleting.store(true, AtomicOrdering::Release);
}
// join the verifier thread.
fn join(self) {
self.thread.join().expect("Verifier thread panicked");
}
// pool states
enum State {
// all threads with id < inner value are to work.
Work(usize),
Exit,
}
/// An item which is in the process of being verified.
@@ -153,7 +129,6 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>,
verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>,
@@ -162,6 +137,8 @@ pub struct VerificationQueue<K: Kind> {
max_queue_size: usize,
max_mem_use: usize,
scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>,
}
struct QueueSignal {
@@ -248,8 +225,9 @@ impl<K: Kind> VerificationQueue<K> {
let num_cpus = ::num_cpus::get();
let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let mut verifiers = Vec::with_capacity(max_verifiers);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });
@@ -257,33 +235,31 @@ impl<K: Kind> VerificationQueue<K> {
for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i);
let deleting = deleting.clone();
let panic_handler = panic_handler.clone();
let verification = verification.clone();
let engine = engine.clone();
let wait = more_to_verify.clone();
let ready = ready_signal.clone();
let empty = empty.clone();
let state = state.clone();
// enable only the first few verifiers.
let sleep = if i < default_amount {
Arc::new(AtomicBool::new(false))
} else {
Arc::new(AtomicBool::new(true))
};
verifiers.push(VerifierHandle {
deleting: deleting.clone(),
sleep: sleep.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
}).unwrap()
})
.expect("Failed to create verifier thread.")
});
let handle = thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(
verification,
engine,
wait,
ready,
empty,
state,
i,
)
}).unwrap()
})
.expect("Failed to create verifier thread.");
verifier_handles.push(handle);
}
VerificationQueue {
@@ -292,7 +268,6 @@ impl<K: Kind> VerificationQueue<K> {
ready_signal: ready_signal,
more_to_verify: more_to_verify,
verification: verification,
verifiers: Mutex::new((verifiers, default_amount)),
deleting: deleting,
processing: RwLock::new(HashSet::new()),
empty: empty,
@@ -300,6 +275,8 @@ impl<K: Kind> VerificationQueue<K> {
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
scale_verifiers: scale_verifiers,
verifier_handles: verifier_handles,
state: state,
}
}
@@ -308,23 +285,30 @@ impl<K: Kind> VerificationQueue<K> {
engine: Arc<Engine>,
wait: Arc<SCondvar>,
ready: Arc<QueueSignal>,
deleting: Arc<AtomicBool>,
empty: Arc<SCondvar>,
sleep: Arc<AtomicBool>,
state: Arc<(Mutex<State>, Condvar)>,
id: usize,
) {
while !deleting.load(AtomicOrdering::Acquire) {
loop {
// check current state.
{
while sleep.load(AtomicOrdering::SeqCst) {
trace!(target: "verification", "Verifier sleeping");
::std::thread::park();
trace!(target: "verification", "Verifier waking up");
let mut cur_state = state.0.lock();
while let State::Work(x) = *cur_state {
// sleep until this thread is required.
if id < x { break }
if deleting.load(AtomicOrdering::Acquire) {
return;
}
debug!(target: "verification", "verifier {} sleeping", id);
state.1.wait(&mut cur_state);
debug!(target: "verification", "verifier {} waking up", id);
}
if let State::Exit = *cur_state {
debug!(target: "verification", "verifier {} exiting", id);
break;
}
}
// wait for work if empty.
{
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
@@ -332,15 +316,22 @@ impl<K: Kind> VerificationQueue<K> {
empty.notify_all();
}
while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) {
while verification.unverified.lock().is_empty() {
if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
return;
}
more_to_verify = wait.wait(more_to_verify).unwrap();
}
if deleting.load(AtomicOrdering::Acquire) {
if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
return;
}
}
// do work.
let item = {
// acquire these locks before getting the item to verify.
let mut unverified = verification.unverified.lock();
@@ -595,6 +586,14 @@ impl<K: Kind> VerificationQueue<K> {
}
}
/// Get the current number of working verifiers.
pub fn num_verifiers(&self) -> usize {
match *self.state.0.lock() {
State::Work(x) => x,
State::Exit => panic!("state only set to exit on drop; queue live now; qed"),
}
}
/// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload.
pub fn collect_garbage(&self) {
@@ -633,7 +632,7 @@ impl<K: Kind> VerificationQueue<K> {
return;
}
let current = self.verifiers.lock().1;
let current = self.num_verifiers();
let diff = (v_len - u_len).abs();
let total = v_len + u_len;
@@ -655,27 +654,14 @@ impl<K: Kind> VerificationQueue<K> {
// possible, never going over the amount of initially allocated threads
// or below 1.
fn scale_verifiers(&self, target: usize) {
let mut verifiers = self.verifiers.lock();
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;
let target = min(verifiers.len(), target);
let current = self.num_verifiers();
let target = min(self.verifier_handles.len(), target);
let target = max(1, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", current, target);
// scaling up
for i in *verifier_count..target {
debug!(target: "verification", "Waking up verifier {}", i);
verifiers[i].wake_up();
}
// scaling down.
for i in target..*verifier_count {
debug!(target: "verification", "Putting verifier {} to sleep", i);
verifiers[i].sleep();
}
*verifier_count = target;
*self.state.0.lock() = State::Work(target);
self.state.1.notify_all();
}
}
@@ -689,22 +675,18 @@ impl<K: Kind> Drop for VerificationQueue<K> {
fn drop(&mut self) {
trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear();
self.deleting.store(true, AtomicOrdering::Release);
self.deleting.store(true, AtomicOrdering::SeqCst);
let mut verifiers = self.verifiers.get_mut();
let mut verifiers = &mut verifiers.0;
// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
}
// set exit state; should be done before `more_to_verify` notification.
*self.state.0.lock() = State::Exit;
self.state.1.notify_all();
// wake up all threads waiting for more work.
self.more_to_verify.notify_all();
// second pass to join.
for handle in verifiers.drain(..) {
handle.join();
// wait for all verifier threads to join.
for thread in self.verifier_handles.drain(..) {
thread.join().expect("Propagating verifier thread panic on shutdown");
}
trace!(target: "shutdown", "[VerificationQueue] Closed.");
@@ -716,7 +698,7 @@ mod tests {
use util::*;
use io::*;
use spec::*;
use super::{BlockQueue, Config};
use super::{BlockQueue, Config, State};
use super::kind::blocks::Unverified;
use tests::helpers::*;
use error::*;
@@ -818,11 +800,11 @@ mod tests {
let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);
assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
queue.scale_verifiers(0);
assert!(queue.verifiers.lock().1 == 1);
assert!(queue.num_verifiers() == 1);
}
#[test]
@@ -831,16 +813,7 @@ mod tests {
// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.
let num_verifiers = {
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].sleep();
}
verifiers.1
};
queue.scale_verifiers(num_verifiers - 1);
*queue.state.0.lock() = State::Work(0);
for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed");
@@ -848,20 +821,12 @@ mod tests {
// almost all unverified == bump verifier count.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers);
// wake them up again and verify everything.
{
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].wake_up();
}
}
assert_eq!(queue.num_verifiers(), 1);
queue.flush();
// nothing to verify == use minimum number of verifiers.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, 1);
assert_eq!(queue.num_verifiers(), 1);
}
}