diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 686a1d093..de4428f02 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -53,6 +53,8 @@ pub struct Config { /// Maximum heap memory to use. /// When the limit is reached, is_full returns true. pub max_mem_use: usize, + /// Settings for the number of verifiers and adaptation strategy. + pub verifier_settings: VerifierSettings, } impl Default for Config { @@ -60,6 +62,26 @@ impl Default for Config { Config { max_queue_size: 30000, max_mem_use: 50 * 1024 * 1024, + verifier_settings: VerifierSettings::default(), + } + } +} + +/// Verifier settings. +#[derive(Debug, PartialEq, Clone)] +pub struct VerifierSettings { + /// Whether to scale amount of verifiers according to load. + // Todo: replace w/ strategy enum? + pub scale_verifiers: bool, + /// Beginning amount of verifiers. + pub num_verifiers: usize, +} + +impl Default for VerifierSettings { + fn default() -> Self { + VerifierSettings { + scale_verifiers: false, + num_verifiers: MAX_VERIFIERS, } } } @@ -139,6 +161,7 @@ pub struct VerificationQueue { ticks_since_adjustment: AtomicUsize, max_queue_size: usize, max_mem_use: usize, + scale_verifiers: bool, } struct QueueSignal { @@ -221,12 +244,15 @@ impl VerificationQueue { }); let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); + let scale_verifiers = config.verifier_settings.scale_verifiers; - let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); - let default_amount = max(::num_cpus::get(), 3) - 2; + let num_cpus = ::num_cpus::get(); + let max_verifiers = min(num_cpus, MAX_VERIFIERS); + let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers)); let mut verifiers = Vec::with_capacity(max_verifiers); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); + debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" }); for i in 0..max_verifiers { debug!(target: "verification", "Adding verification thread #{}", i); @@ -273,6 +299,7 @@ impl VerificationQueue { ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), + scale_verifiers: scale_verifiers, } } @@ -598,6 +625,8 @@ impl VerificationQueue { self.processing.write().shrink_to_fit(); + if !self.scale_verifiers { return } + if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD { self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); } else { @@ -693,10 +722,15 @@ mod tests { use error::*; use views::*; - fn get_test_queue() -> BlockQueue { + // create a test block queue. + // auto_scaling enables verifier adjustment. + fn get_test_queue(auto_scale: bool) -> BlockQueue { let spec = get_test_spec(); let engine = spec.engine; - BlockQueue::new(Config::default(), engine, IoChannel::disconnected(), true) + + let mut config = Config::default(); + config.verifier_settings.scale_verifiers = auto_scale; + BlockQueue::new(config, engine, IoChannel::disconnected(), true) } #[test] @@ -709,7 +743,7 @@ mod tests { #[test] fn can_import_blocks() { - let queue = get_test_queue(); + let queue = get_test_queue(false); if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { panic!("error importing block that is valid by definition({:?})", e); } @@ -717,7 +751,7 @@ mod tests { #[test] fn returns_error_for_duplicates() { - let queue = get_test_queue(); + let queue = get_test_queue(false); if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { panic!("error importing block that is valid by definition({:?})", e); } @@ -736,7 +770,7 @@ mod tests { #[test] fn returns_ok_for_drained_duplicates() { - let queue = get_test_queue(); + let queue = get_test_queue(false); let block = get_good_dummy_block(); let hash = BlockView::new(&block).header().hash().clone(); if let Err(e) = queue.import(Unverified::new(block)) { @@ -753,7 +787,7 @@ mod tests { #[test] fn returns_empty_once_finished() { - let queue = get_test_queue(); + let queue = get_test_queue(false); queue.import(Unverified::new(get_good_dummy_block())) .expect("error importing block that is valid by definition"); queue.flush(); @@ -781,7 +815,7 @@ mod tests { fn scaling_limits() { use super::MAX_VERIFIERS; - let queue = get_test_queue(); + let queue = get_test_queue(true); queue.scale_verifiers(MAX_VERIFIERS + 1); assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1); @@ -793,7 +827,7 @@ mod tests { #[test] fn readjust_verifiers() { - let queue = get_test_queue(); + let queue = get_test_queue(true); // put all the verifiers to sleep to ensure // the test isn't timing sensitive. @@ -806,13 +840,15 @@ mod tests { verifiers.1 }; + queue.scale_verifiers(num_verifiers - 1); + for block in get_good_dummy_block_seq(5000) { queue.import(Unverified::new(block)).expect("Block good by definition; qed"); } // almost all unverified == bump verifier count. queue.collect_garbage(); - assert_eq!(queue.verifiers.lock().1, num_verifiers + 1); + assert_eq!(queue.verifiers.lock().1, num_verifiers); // wake them up again and verify everything. { diff --git a/parity/blockchain.rs b/parity/blockchain.rs index 02d3e39fb..0750d369d 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -28,6 +28,7 @@ use ethcore::service::ClientService; use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, BlockImportError, BlockChainClient, BlockID}; use ethcore::error::ImportError; use ethcore::miner::Miner; +use ethcore::verification::queue::VerifierSettings; use cache::CacheConfig; use informant::{Informant, MillisecondDuration}; use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool}; @@ -84,6 +85,7 @@ pub struct ImportBlockchain { pub vm_type: VMType, pub check_seal: bool, pub with_color: bool, + pub verifier_settings: VerifierSettings, } #[derive(Debug, PartialEq)] @@ -175,7 +177,21 @@ fn execute_import(cmd: ImportBlockchain) -> Result { try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile(db_dirs.fork_path().as_path()))); // prepare client config - let client_config = to_client_config(&cmd.cache_config, Mode::Active, tracing, fat_db, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), algorithm, cmd.pruning_history, cmd.check_seal); + let mut client_config = to_client_config( + &cmd.cache_config, + Mode::Active, + tracing, + fat_db, + cmd.compaction, + cmd.wal, + cmd.vm_type, + "".into(), + algorithm, + cmd.pruning_history, + cmd.check_seal + ); + + client_config.queue.verifier_settings = cmd.verifier_settings; // build client let service = try!(ClientService::start( diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index d33c58d9d..30b273b38 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -242,6 +242,8 @@ usage! { or |c: &Config| otry!(c.footprint).db_compaction.clone(), flag_fat_db: String = "auto", or |c: &Config| otry!(c.footprint).fat_db.clone(), + flag_scale_verifiers: bool = false, or |_| None, + flag_num_verifiers: Option = None, or |_| None, // -- Import/Export Options flag_from: String = "1", or |_| None, diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index b67af6110..02e7e00ec 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -250,7 +250,7 @@ Footprint Options: the state cache (default: {flag_cache_size_state}). --cache-size MB Set total amount of discretionary memory to use for the entire system, overrides other cache and queue - options.a (default: {flag_cache_size:?}) + options. (default: {flag_cache_size:?}) --fast-and-loose Disables DB WAL, which gives a significant speed up but means an unclean exit is unrecoverable. (default: {flag_fast_and_loose}) --db-compaction TYPE Database compaction type. TYPE may be one of: @@ -261,6 +261,11 @@ Footprint Options: of all accounts and storage keys. Doubles the size of the state database. BOOL may be one of on, off or auto. (default: {flag_fat_db}) + --scale-verifiers Automatically scale amount of verifier threads based on + workload. Not guaranteed to be faster. + (default: {flag_scale_verifiers}) + --num-verifiers INT Amount of verifier threads to use or to begin with, if verifier + auto-scaling is enabled. (default: {flag_num_verifiers:?}) Import/Export Options: --from BLOCK Export from block BLOCK, which may be an index or diff --git a/parity/configuration.rs b/parity/configuration.rs index c4a54f747..7b8100bbb 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -25,6 +25,7 @@ use util::log::Colour; use ethsync::{NetworkConfiguration, is_valid_node_url, AllowIP}; use ethcore::client::VMType; use ethcore::miner::{MinerOptions, Banning}; +use ethcore::verification::queue::VerifierSettings; use rpc::{IpcConfiguration, HttpConfiguration}; use ethcore_rpc::NetworkSettings; @@ -158,6 +159,7 @@ impl Configuration { vm_type: vm_type, check_seal: !self.args.flag_no_seal_check, with_color: logger_config.color, + verifier_settings: self.verifier_settings(), }; Cmd::Blockchain(BlockchainCmd::Import(import_cmd)) } else if self.args.cmd_export { @@ -241,6 +243,8 @@ impl Configuration { None }; + let verifier_settings = self.verifier_settings(); + let run_cmd = RunCmd { cache_config: cache_config, dirs: dirs, @@ -275,6 +279,7 @@ impl Configuration { no_periodic_snapshot: self.args.flag_no_periodic_snapshot, check_seal: !self.args.flag_no_seal_check, download_old_blocks: !self.args.flag_no_ancient_blocks, + verifier_settings: verifier_settings, }; Cmd::Run(run_cmd) }; @@ -702,6 +707,16 @@ impl Configuration { !ui_disabled } + + fn verifier_settings(&self) -> VerifierSettings { + let mut settings = VerifierSettings::default(); + settings.scale_verifiers = self.args.flag_scale_verifiers; + if let Some(num_verifiers) = self.args.flag_num_verifiers { + settings.num_verifiers = num_verifiers; + } + + settings + } } #[cfg(test)] diff --git a/parity/run.rs b/parity/run.rs index f977c450c..20a372424 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -28,6 +28,7 @@ use ethcore::service::ClientService; use ethcore::account_provider::AccountProvider; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::snapshot; +use ethcore::verification::queue::VerifierSettings; use ethsync::SyncConfig; use informant::Informant; @@ -92,6 +93,7 @@ pub struct RunCmd { pub no_periodic_snapshot: bool, pub check_seal: bool, pub download_old_blocks: bool, + pub verifier_settings: VerifierSettings, } pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> { @@ -217,7 +219,7 @@ pub fn execute(cmd: RunCmd, logger: Arc) -> Result<(), String> { miner.set_transactions_limit(cmd.miner_extras.transactions_limit); // create client config - let client_config = to_client_config( + let mut client_config = to_client_config( &cmd.cache_config, mode.clone(), tracing, @@ -231,6 +233,8 @@ pub fn execute(cmd: RunCmd, logger: Arc) -> Result<(), String> { cmd.check_seal, ); + client_config.queue.verifier_settings = cmd.verifier_settings; + // set up bootnodes let mut net_conf = cmd.net_conf; if !cmd.custom_bootnodes {