queue: CLI for auto-scaling and num verifiers

This commit is contained in:
Robert Habermeier 2016-12-02 18:21:54 +01:00
parent 3837114eb2
commit cd5b6fdf59
6 changed files with 92 additions and 14 deletions

View File

@ -53,6 +53,8 @@ pub struct Config {
/// Maximum heap memory to use. /// Maximum heap memory to use.
/// When the limit is reached, is_full returns true. /// When the limit is reached, is_full returns true.
pub max_mem_use: usize, pub max_mem_use: usize,
/// Settings for the number of verifiers and adaptation strategy.
pub verifier_settings: VerifierSettings,
} }
impl Default for Config { impl Default for Config {
@ -60,6 +62,26 @@ impl Default for Config {
Config { Config {
max_queue_size: 30000, max_queue_size: 30000,
max_mem_use: 50 * 1024 * 1024, max_mem_use: 50 * 1024 * 1024,
verifier_settings: VerifierSettings::default(),
}
}
}
/// Verifier settings.
#[derive(Debug, PartialEq, Clone)]
pub struct VerifierSettings {
/// Whether to scale amount of verifiers according to load.
// Todo: replace w/ strategy enum?
pub scale_verifiers: bool,
/// Beginning amount of verifiers.
pub num_verifiers: usize,
}
impl Default for VerifierSettings {
fn default() -> Self {
VerifierSettings {
scale_verifiers: false,
num_verifiers: MAX_VERIFIERS,
} }
} }
} }
@ -139,6 +161,7 @@ pub struct VerificationQueue<K: Kind> {
ticks_since_adjustment: AtomicUsize, ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
scale_verifiers: bool,
} }
struct QueueSignal { struct QueueSignal {
@ -221,12 +244,15 @@ impl<K: Kind> VerificationQueue<K> {
}); });
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let scale_verifiers = config.verifier_settings.scale_verifiers;
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); let num_cpus = ::num_cpus::get();
let default_amount = max(::num_cpus::get(), 3) - 2; let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let mut verifiers = Vec::with_capacity(max_verifiers); let mut verifiers = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });
for i in 0..max_verifiers { for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i); debug!(target: "verification", "Adding verification thread #{}", i);
@ -273,6 +299,7 @@ impl<K: Kind> VerificationQueue<K> {
ticks_since_adjustment: AtomicUsize::new(0), ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
scale_verifiers: scale_verifiers,
} }
} }
@ -598,6 +625,8 @@ impl<K: Kind> VerificationQueue<K> {
self.processing.write().shrink_to_fit(); self.processing.write().shrink_to_fit();
if !self.scale_verifiers { return }
if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD { if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else { } else {
@ -693,10 +722,15 @@ mod tests {
use error::*; use error::*;
use views::*; use views::*;
fn get_test_queue() -> BlockQueue { // create a test block queue.
// auto_scaling enables verifier adjustment.
fn get_test_queue(auto_scale: bool) -> BlockQueue {
let spec = get_test_spec(); let spec = get_test_spec();
let engine = spec.engine; let engine = spec.engine;
BlockQueue::new(Config::default(), engine, IoChannel::disconnected(), true)
let mut config = Config::default();
config.verifier_settings.scale_verifiers = auto_scale;
BlockQueue::new(config, engine, IoChannel::disconnected(), true)
} }
#[test] #[test]
@ -709,7 +743,7 @@ mod tests {
#[test] #[test]
fn can_import_blocks() { fn can_import_blocks() {
let queue = get_test_queue(); let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e); panic!("error importing block that is valid by definition({:?})", e);
} }
@ -717,7 +751,7 @@ mod tests {
#[test] #[test]
fn returns_error_for_duplicates() { fn returns_error_for_duplicates() {
let queue = get_test_queue(); let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e); panic!("error importing block that is valid by definition({:?})", e);
} }
@ -736,7 +770,7 @@ mod tests {
#[test] #[test]
fn returns_ok_for_drained_duplicates() { fn returns_ok_for_drained_duplicates() {
let queue = get_test_queue(); let queue = get_test_queue(false);
let block = get_good_dummy_block(); let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone(); let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import(Unverified::new(block)) { if let Err(e) = queue.import(Unverified::new(block)) {
@ -753,7 +787,7 @@ mod tests {
#[test] #[test]
fn returns_empty_once_finished() { fn returns_empty_once_finished() {
let queue = get_test_queue(); let queue = get_test_queue(false);
queue.import(Unverified::new(get_good_dummy_block())) queue.import(Unverified::new(get_good_dummy_block()))
.expect("error importing block that is valid by definition"); .expect("error importing block that is valid by definition");
queue.flush(); queue.flush();
@ -781,7 +815,7 @@ mod tests {
fn scaling_limits() { fn scaling_limits() {
use super::MAX_VERIFIERS; use super::MAX_VERIFIERS;
let queue = get_test_queue(); let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1); queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1); assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);
@ -793,7 +827,7 @@ mod tests {
#[test] #[test]
fn readjust_verifiers() { fn readjust_verifiers() {
let queue = get_test_queue(); let queue = get_test_queue(true);
// put all the verifiers to sleep to ensure // put all the verifiers to sleep to ensure
// the test isn't timing sensitive. // the test isn't timing sensitive.
@ -806,13 +840,15 @@ mod tests {
verifiers.1 verifiers.1
}; };
queue.scale_verifiers(num_verifiers - 1);
for block in get_good_dummy_block_seq(5000) { for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed"); queue.import(Unverified::new(block)).expect("Block good by definition; qed");
} }
// almost all unverified == bump verifier count. // almost all unverified == bump verifier count.
queue.collect_garbage(); queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1); assert_eq!(queue.verifiers.lock().1, num_verifiers);
// wake them up again and verify everything. // wake them up again and verify everything.
{ {

View File

@ -28,6 +28,7 @@ use ethcore::service::ClientService;
use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, BlockImportError, BlockChainClient, BlockID}; use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, BlockImportError, BlockChainClient, BlockID};
use ethcore::error::ImportError; use ethcore::error::ImportError;
use ethcore::miner::Miner; use ethcore::miner::Miner;
use ethcore::verification::queue::VerifierSettings;
use cache::CacheConfig; use cache::CacheConfig;
use informant::{Informant, MillisecondDuration}; use informant::{Informant, MillisecondDuration};
use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool}; use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool};
@ -84,6 +85,7 @@ pub struct ImportBlockchain {
pub vm_type: VMType, pub vm_type: VMType,
pub check_seal: bool, pub check_seal: bool,
pub with_color: bool, pub with_color: bool,
pub verifier_settings: VerifierSettings,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -175,7 +177,21 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile(db_dirs.fork_path().as_path()))); try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile(db_dirs.fork_path().as_path())));
// prepare client config // prepare client config
let client_config = to_client_config(&cmd.cache_config, Mode::Active, tracing, fat_db, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), algorithm, cmd.pruning_history, cmd.check_seal); let mut client_config = to_client_config(
&cmd.cache_config,
Mode::Active,
tracing,
fat_db,
cmd.compaction,
cmd.wal,
cmd.vm_type,
"".into(),
algorithm,
cmd.pruning_history,
cmd.check_seal
);
client_config.queue.verifier_settings = cmd.verifier_settings;
// build client // build client
let service = try!(ClientService::start( let service = try!(ClientService::start(

View File

@ -242,6 +242,8 @@ usage! {
or |c: &Config| otry!(c.footprint).db_compaction.clone(), or |c: &Config| otry!(c.footprint).db_compaction.clone(),
flag_fat_db: String = "auto", flag_fat_db: String = "auto",
or |c: &Config| otry!(c.footprint).fat_db.clone(), or |c: &Config| otry!(c.footprint).fat_db.clone(),
flag_scale_verifiers: bool = false, or |_| None,
flag_num_verifiers: Option<usize> = None, or |_| None,
// -- Import/Export Options // -- Import/Export Options
flag_from: String = "1", or |_| None, flag_from: String = "1", or |_| None,

View File

@ -250,7 +250,7 @@ Footprint Options:
the state cache (default: {flag_cache_size_state}). the state cache (default: {flag_cache_size_state}).
--cache-size MB Set total amount of discretionary memory to use for --cache-size MB Set total amount of discretionary memory to use for
the entire system, overrides other cache and queue the entire system, overrides other cache and queue
options.a (default: {flag_cache_size:?}) options. (default: {flag_cache_size:?})
--fast-and-loose Disables DB WAL, which gives a significant speed up --fast-and-loose Disables DB WAL, which gives a significant speed up
but means an unclean exit is unrecoverable. (default: {flag_fast_and_loose}) but means an unclean exit is unrecoverable. (default: {flag_fast_and_loose})
--db-compaction TYPE Database compaction type. TYPE may be one of: --db-compaction TYPE Database compaction type. TYPE may be one of:
@ -261,6 +261,11 @@ Footprint Options:
of all accounts and storage keys. Doubles the size of all accounts and storage keys. Doubles the size
of the state database. BOOL may be one of on, off of the state database. BOOL may be one of on, off
or auto. (default: {flag_fat_db}) or auto. (default: {flag_fat_db})
--scale-verifiers Automatically scale amount of verifier threads based on
workload. Not guaranteed to be faster.
(default: {flag_scale_verifiers})
--num-verifiers INT Amount of verifier threads to use or to begin with, if verifier
auto-scaling is enabled. (default: {flag_num_verifiers:?})
Import/Export Options: Import/Export Options:
--from BLOCK Export from block BLOCK, which may be an index or --from BLOCK Export from block BLOCK, which may be an index or

View File

@ -25,6 +25,7 @@ use util::log::Colour;
use ethsync::{NetworkConfiguration, is_valid_node_url, AllowIP}; use ethsync::{NetworkConfiguration, is_valid_node_url, AllowIP};
use ethcore::client::VMType; use ethcore::client::VMType;
use ethcore::miner::{MinerOptions, Banning}; use ethcore::miner::{MinerOptions, Banning};
use ethcore::verification::queue::VerifierSettings;
use rpc::{IpcConfiguration, HttpConfiguration}; use rpc::{IpcConfiguration, HttpConfiguration};
use ethcore_rpc::NetworkSettings; use ethcore_rpc::NetworkSettings;
@ -158,6 +159,7 @@ impl Configuration {
vm_type: vm_type, vm_type: vm_type,
check_seal: !self.args.flag_no_seal_check, check_seal: !self.args.flag_no_seal_check,
with_color: logger_config.color, with_color: logger_config.color,
verifier_settings: self.verifier_settings(),
}; };
Cmd::Blockchain(BlockchainCmd::Import(import_cmd)) Cmd::Blockchain(BlockchainCmd::Import(import_cmd))
} else if self.args.cmd_export { } else if self.args.cmd_export {
@ -241,6 +243,8 @@ impl Configuration {
None None
}; };
let verifier_settings = self.verifier_settings();
let run_cmd = RunCmd { let run_cmd = RunCmd {
cache_config: cache_config, cache_config: cache_config,
dirs: dirs, dirs: dirs,
@ -275,6 +279,7 @@ impl Configuration {
no_periodic_snapshot: self.args.flag_no_periodic_snapshot, no_periodic_snapshot: self.args.flag_no_periodic_snapshot,
check_seal: !self.args.flag_no_seal_check, check_seal: !self.args.flag_no_seal_check,
download_old_blocks: !self.args.flag_no_ancient_blocks, download_old_blocks: !self.args.flag_no_ancient_blocks,
verifier_settings: verifier_settings,
}; };
Cmd::Run(run_cmd) Cmd::Run(run_cmd)
}; };
@ -702,6 +707,16 @@ impl Configuration {
!ui_disabled !ui_disabled
} }
fn verifier_settings(&self) -> VerifierSettings {
let mut settings = VerifierSettings::default();
settings.scale_verifiers = self.args.flag_scale_verifiers;
if let Some(num_verifiers) = self.args.flag_num_verifiers {
settings.num_verifiers = num_verifiers;
}
settings
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -28,6 +28,7 @@ use ethcore::service::ClientService;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
use ethcore::snapshot; use ethcore::snapshot;
use ethcore::verification::queue::VerifierSettings;
use ethsync::SyncConfig; use ethsync::SyncConfig;
use informant::Informant; use informant::Informant;
@ -92,6 +93,7 @@ pub struct RunCmd {
pub no_periodic_snapshot: bool, pub no_periodic_snapshot: bool,
pub check_seal: bool, pub check_seal: bool,
pub download_old_blocks: bool, pub download_old_blocks: bool,
pub verifier_settings: VerifierSettings,
} }
pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> { pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> {
@ -217,7 +219,7 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
miner.set_transactions_limit(cmd.miner_extras.transactions_limit); miner.set_transactions_limit(cmd.miner_extras.transactions_limit);
// create client config // create client config
let client_config = to_client_config( let mut client_config = to_client_config(
&cmd.cache_config, &cmd.cache_config,
mode.clone(), mode.clone(),
tracing, tracing,
@ -231,6 +233,8 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
cmd.check_seal, cmd.check_seal,
); );
client_config.queue.verifier_settings = cmd.verifier_settings;
// set up bootnodes // set up bootnodes
let mut net_conf = cmd.net_conf; let mut net_conf = cmd.net_conf;
if !cmd.custom_bootnodes { if !cmd.custom_bootnodes {