allocate verifiers up front, hibernate when not needed
This commit is contained in:
parent
44dcd6bc3b
commit
546cd00659
@ -35,6 +35,9 @@ pub mod kind;
|
|||||||
const MIN_MEM_LIMIT: usize = 16384;
|
const MIN_MEM_LIMIT: usize = 16384;
|
||||||
const MIN_QUEUE_LIMIT: usize = 512;
|
const MIN_QUEUE_LIMIT: usize = 512;
|
||||||
|
|
||||||
|
// maximum possible number of verification threads.
|
||||||
|
const MAX_VERIFIERS: usize = 8;
|
||||||
|
|
||||||
/// Type alias for block queue convenience.
|
/// Type alias for block queue convenience.
|
||||||
pub type BlockQueue = VerificationQueue<self::kind::Blocks>;
|
pub type BlockQueue = VerificationQueue<self::kind::Blocks>;
|
||||||
|
|
||||||
@ -63,13 +66,26 @@ impl Default for Config {
|
|||||||
|
|
||||||
struct VerifierHandle {
|
struct VerifierHandle {
|
||||||
deleting: Arc<AtomicBool>,
|
deleting: Arc<AtomicBool>,
|
||||||
|
sleep: Arc<(Mutex<bool>, Condvar)>,
|
||||||
thread: JoinHandle<()>,
|
thread: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VerifierHandle {
|
impl VerifierHandle {
|
||||||
|
// signal to the verifier thread that it should sleep.
|
||||||
|
fn sleep(&self) {
|
||||||
|
*self.sleep.0.lock() = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// signal to the verifier thread that it should wake up.
|
||||||
|
fn wake_up(&self) {
|
||||||
|
*self.sleep.0.lock() = false;
|
||||||
|
self.sleep.1.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
// signal to the verifier thread that it should conclude its
|
// signal to the verifier thread that it should conclude its
|
||||||
// operations.
|
// operations.
|
||||||
fn conclude(&self) {
|
fn conclude(&self) {
|
||||||
|
self.wake_up();
|
||||||
self.deleting.store(true, AtomicOrdering::Release);
|
self.deleting.store(true, AtomicOrdering::Release);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,7 +131,7 @@ pub struct VerificationQueue<K: Kind> {
|
|||||||
engine: Arc<Engine>,
|
engine: Arc<Engine>,
|
||||||
more_to_verify: Arc<SCondvar>,
|
more_to_verify: Arc<SCondvar>,
|
||||||
verification: Arc<Verification<K>>,
|
verification: Arc<Verification<K>>,
|
||||||
verifiers: Mutex<Vec<VerifierHandle>>,
|
verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
|
||||||
deleting: Arc<AtomicBool>,
|
deleting: Arc<AtomicBool>,
|
||||||
ready_signal: Arc<QueueSignal>,
|
ready_signal: Arc<QueueSignal>,
|
||||||
empty: Arc<SCondvar>,
|
empty: Arc<SCondvar>,
|
||||||
@ -212,31 +228,83 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
let empty = Arc::new(SCondvar::new());
|
let empty = Arc::new(SCondvar::new());
|
||||||
let panic_handler = PanicHandler::new_in_arc();
|
let panic_handler = PanicHandler::new_in_arc();
|
||||||
|
|
||||||
let queue = VerificationQueue {
|
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
|
||||||
|
let default_amount = max(::num_cpus::get(), 3) - 2;
|
||||||
|
let mut verifiers = Vec::with_capacity(max_verifiers);
|
||||||
|
|
||||||
|
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
|
||||||
|
|
||||||
|
for i in 0..max_verifiers {
|
||||||
|
debug!(target: "verification", "Adding verification thread #{}", i);
|
||||||
|
|
||||||
|
let deleting = deleting.clone();
|
||||||
|
let panic_handler = panic_handler.clone();
|
||||||
|
let verification = verification.clone();
|
||||||
|
let engine = engine.clone();
|
||||||
|
let wait = more_to_verify.clone();
|
||||||
|
let ready = ready_signal.clone();
|
||||||
|
let empty = empty.clone();
|
||||||
|
|
||||||
|
// enable only the first few verifiers.
|
||||||
|
let sleep = if i < default_amount {
|
||||||
|
Arc::new((Mutex::new(false), Condvar::new()))
|
||||||
|
} else {
|
||||||
|
Arc::new((Mutex::new(true), Condvar::new()))
|
||||||
|
};
|
||||||
|
|
||||||
|
verifiers.push(VerifierHandle {
|
||||||
|
deleting: deleting.clone(),
|
||||||
|
sleep: sleep.clone(),
|
||||||
|
thread: thread::Builder::new()
|
||||||
|
.name(format!("Verifier #{}", i))
|
||||||
|
.spawn(move || {
|
||||||
|
panic_handler.catch_panic(move || {
|
||||||
|
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
|
||||||
|
}).unwrap()
|
||||||
|
})
|
||||||
|
.expect("Failed to create verifier thread.")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
VerificationQueue {
|
||||||
engine: engine,
|
engine: engine,
|
||||||
panic_handler: panic_handler,
|
panic_handler: panic_handler,
|
||||||
ready_signal: ready_signal,
|
ready_signal: ready_signal,
|
||||||
more_to_verify: more_to_verify,
|
more_to_verify: more_to_verify,
|
||||||
verification: verification,
|
verification: verification,
|
||||||
verifiers: Mutex::new(Vec::with_capacity(::num_cpus::get())),
|
verifiers: Mutex::new((verifiers, default_amount)),
|
||||||
deleting: deleting,
|
deleting: deleting,
|
||||||
processing: RwLock::new(HashSet::new()),
|
processing: RwLock::new(HashSet::new()),
|
||||||
empty: empty,
|
empty: empty,
|
||||||
ticks_since_adjustment: AtomicUsize::new(0),
|
ticks_since_adjustment: AtomicUsize::new(0),
|
||||||
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
||||||
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
||||||
};
|
|
||||||
|
|
||||||
let thread_count = max(::num_cpus::get(), 3) - 2;
|
|
||||||
for _ in 0..thread_count {
|
|
||||||
queue.add_verifier();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
queue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify(verification: Arc<Verification<K>>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
|
fn verify(
|
||||||
|
verification: Arc<Verification<K>>,
|
||||||
|
engine: Arc<Engine>,
|
||||||
|
wait: Arc<SCondvar>,
|
||||||
|
ready: Arc<QueueSignal>,
|
||||||
|
deleting: Arc<AtomicBool>,
|
||||||
|
empty: Arc<SCondvar>,
|
||||||
|
sleep: Arc<(Mutex<bool>, Condvar)>,
|
||||||
|
) {
|
||||||
while !deleting.load(AtomicOrdering::Acquire) {
|
while !deleting.load(AtomicOrdering::Acquire) {
|
||||||
|
{
|
||||||
|
let mut should_sleep = sleep.0.lock();
|
||||||
|
while *should_sleep {
|
||||||
|
trace!(target: "verification", "Verifier sleeping");
|
||||||
|
sleep.1.wait(&mut should_sleep);
|
||||||
|
trace!(target: "verification", "Verifier waking up");
|
||||||
|
|
||||||
|
if deleting.load(AtomicOrdering::Acquire) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
|
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
|
||||||
|
|
||||||
@ -548,7 +616,7 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
|
|
||||||
// compute the average rate of verification per thread and determine
|
// compute the average rate of verification per thread and determine
|
||||||
// how many are necessary to match the rate of draining.
|
// how many are necessary to match the rate of draining.
|
||||||
let num_verifiers = self.verifiers.lock().len();
|
let num_verifiers = self.verifiers.lock().1;
|
||||||
let v_count_per = v_count as f64 / num_verifiers as f64;
|
let v_count_per = v_count as f64 / num_verifiers as f64;
|
||||||
let needed = if v_count < 20 {
|
let needed = if v_count < 20 {
|
||||||
1
|
1
|
||||||
@ -559,63 +627,34 @@ impl<K: Kind> VerificationQueue<K> {
|
|||||||
trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers",
|
trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers",
|
||||||
v_count_per, target, needed);
|
v_count_per, target, needed);
|
||||||
|
|
||||||
for _ in num_verifiers..needed {
|
self.scale_verifiers(needed);
|
||||||
self.add_verifier();
|
|
||||||
}
|
|
||||||
|
|
||||||
for _ in needed..num_verifiers {
|
|
||||||
self.remove_verifier();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// add a verifier thread if possible.
|
// wake up or sleep verifiers to get as close to the target as
|
||||||
fn add_verifier(&self) {
|
// possible, never going over the amount of initially allocated threads
|
||||||
|
// or below 1.
|
||||||
|
fn scale_verifiers(&self, target: usize) {
|
||||||
let mut verifiers = self.verifiers.lock();
|
let mut verifiers = self.verifiers.lock();
|
||||||
let len = verifiers.len();
|
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;
|
||||||
if len == ::num_cpus::get() {
|
|
||||||
return;
|
let target = min(verifiers.capacity(), target);
|
||||||
|
let target = max(1, target);
|
||||||
|
|
||||||
|
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);
|
||||||
|
|
||||||
|
// scaling up
|
||||||
|
for i in *verifier_count..target {
|
||||||
|
debug!(target: "verification", "Waking up verifier {}", i);
|
||||||
|
verifiers[i].wake_up();
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(target: "verification", "Adding verification thread #{}", len);
|
// scaling down.
|
||||||
|
for i in target..*verifier_count {
|
||||||
let deleting = Arc::new(AtomicBool::new(false));
|
debug!(target: "verification", "Putting verifier {} to sleep", i);
|
||||||
let panic_handler = self.panic_handler.clone();
|
verifiers[i].sleep();
|
||||||
let verification = self.verification.clone();
|
|
||||||
let engine = self.engine.clone();
|
|
||||||
let wait = self.more_to_verify.clone();
|
|
||||||
let ready = self.ready_signal.clone();
|
|
||||||
let empty = self.empty.clone();
|
|
||||||
|
|
||||||
verifiers.push(VerifierHandle {
|
|
||||||
deleting: deleting.clone(),
|
|
||||||
thread: thread::Builder::new()
|
|
||||||
.name(format!("Verifier #{}", len))
|
|
||||||
.spawn(move || {
|
|
||||||
panic_handler.catch_panic(move || {
|
|
||||||
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty)
|
|
||||||
}).unwrap()
|
|
||||||
})
|
|
||||||
.expect("Failed to create verifier thread.")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove a verifier thread if possible.
|
|
||||||
fn remove_verifier(&self) {
|
|
||||||
let mut verifiers = self.verifiers.lock();
|
|
||||||
let len = verifiers.len();
|
|
||||||
|
|
||||||
// never remove the last thread.
|
|
||||||
if len == 1 {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(target: "verification", "Removing verification thread #{}", len - 1);
|
*verifier_count = target;
|
||||||
|
|
||||||
if let Some(handle) = verifiers.pop() {
|
|
||||||
handle.conclude();
|
|
||||||
self.more_to_verify.notify_all(); // to ensure it's joinable immediately.
|
|
||||||
handle.join();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -631,7 +670,8 @@ impl<K: Kind> Drop for VerificationQueue<K> {
|
|||||||
self.clear();
|
self.clear();
|
||||||
self.deleting.store(true, AtomicOrdering::Release);
|
self.deleting.store(true, AtomicOrdering::Release);
|
||||||
|
|
||||||
let mut verifiers = self.verifiers.lock();
|
let mut verifiers = self.verifiers.get_mut();
|
||||||
|
let mut verifiers = &mut verifiers.0;
|
||||||
|
|
||||||
// first pass to signal conclusion. must be done before
|
// first pass to signal conclusion. must be done before
|
||||||
// notify or deadlock possible.
|
// notify or deadlock possible.
|
||||||
|
Loading…
Reference in New Issue
Block a user