ethcore/VerificationQueue don't spawn up extra `worker-threads` when explictly specified not to (#9620)

* VerificationQueue don't spawn up extra threads

In the verification queue we spawn up worker threads to do the work.
However, if `num-verifiers` is specified we still spawn the maximum
number of threads which consume extra memory.

There is one catch though when `--scale-verifiers` is specified then
we can't do it because all threads are created upon initilization AFAIK.

In my opinion, is doesn't to use both `num-verifiers` and
`scale-verifiers` they are kind of contradictory!

* Fix nits in logic and add tests for verification

* refactor(verification queue) - rm hardcoded const

* Address grumbles in new tests
* Remove hardcoded `MAX_VERIFIERS` constant and replace it by relying
entirely on `num_cpu` crate instead inorder to support CPUs that have
more cores/logical cores
This commit is contained in:
Niklas Adolfsson 2018-09-26 15:11:50 +01:00 committed by Afri Schoedon
parent cc963d42a0
commit 3216b143c2
1 changed files with 71 additions and 14 deletions

View File

@ -39,9 +39,6 @@ pub mod kind;
const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512;
// maximum possible number of verification threads.
const MAX_VERIFIERS: usize = 8;
/// Type alias for block queue convenience.
pub type BlockQueue = VerificationQueue<self::kind::Blocks>;
@ -85,7 +82,7 @@ impl Default for VerifierSettings {
fn default() -> Self {
VerifierSettings {
scale_verifiers: false,
num_verifiers: MAX_VERIFIERS,
num_verifiers: ::num_cpus::get(),
}
}
}
@ -231,16 +228,24 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(Condvar::new());
let scale_verifiers = config.verifier_settings.scale_verifiers;
let num_cpus = ::num_cpus::get();
let max_verifiers = cmp::min(num_cpus, MAX_VERIFIERS);
let max_verifiers = ::num_cpus::get();
let default_amount = cmp::max(1, cmp::min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
// if `auto-scaling` is enabled spawn up extra threads as they might be needed
// otherwise just spawn the number of threads specified by the config
let number_of_threads = if scale_verifiers {
max_verifiers
} else {
cmp::min(default_amount, max_verifiers)
};
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(number_of_threads);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", number_of_threads, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });
for i in 0..max_verifiers {
for i in 0..number_of_threads {
debug!(target: "verification", "Adding verification thread #{}", i);
let verification = verification.clone();
@ -743,6 +748,13 @@ mod tests {
BlockQueue::new(config, engine, IoChannel::disconnected(), true)
}
fn get_test_config(num_verifiers: usize, is_auto_scale: bool) -> Config {
let mut config = Config::default();
config.verifier_settings.num_verifiers = num_verifiers;
config.verifier_settings.scale_verifiers = is_auto_scale;
config
}
fn new_unverified(bytes: Bytes) -> Unverified {
Unverified::from_rlp(bytes).expect("Should be valid rlp")
}
@ -843,12 +855,11 @@ mod tests {
#[test]
fn scaling_limits() {
use super::MAX_VERIFIERS;
let max_verifiers = ::num_cpus::get();
let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1);
queue.scale_verifiers(max_verifiers + 1);
assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
assert!(queue.num_verifiers() < max_verifiers + 1);
queue.scale_verifiers(0);
@ -877,4 +888,50 @@ mod tests {
queue.collect_garbage();
assert_eq!(queue.num_verifiers(), 1);
}
#[test]
fn worker_threads_honor_specified_number_without_scaling() {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(1, false);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);
assert_eq!(queue.num_verifiers(), 1);
}
#[test]
fn worker_threads_specified_to_zero_should_set_to_one() {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(0, false);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);
assert_eq!(queue.num_verifiers(), 1);
}
#[test]
fn worker_threads_should_only_accept_max_number_cpus() {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(10_000, false);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);
let num_cpus = ::num_cpus::get();
assert_eq!(queue.num_verifiers(), num_cpus);
}
#[test]
fn worker_threads_scaling_with_specifed_num_of_workers() {
let num_cpus = ::num_cpus::get();
// only run the test with at least 2 CPUs
if num_cpus > 1 {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(num_cpus - 1, true);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);
queue.scale_verifiers(num_cpus);
assert_eq!(queue.num_verifiers(), num_cpus);
}
}
}