Merge pull request #3709 from ethcore/opt-in-verifier-scaling

queue: CLI for auto-scaling and num verifiers
This commit is contained in:
Robert Habermeier 2016-12-07 13:40:42 +01:00 committed by GitHub
commit 89726356a3
8 changed files with 103 additions and 14 deletions

View File

@ -53,6 +53,8 @@ pub struct Config {
/// Maximum heap memory to use. /// Maximum heap memory to use.
/// When the limit is reached, is_full returns true. /// When the limit is reached, is_full returns true.
pub max_mem_use: usize, pub max_mem_use: usize,
/// Settings for the number of verifiers and adaptation strategy.
pub verifier_settings: VerifierSettings,
} }
impl Default for Config { impl Default for Config {
@ -60,6 +62,26 @@ impl Default for Config {
Config { Config {
max_queue_size: 30000, max_queue_size: 30000,
max_mem_use: 50 * 1024 * 1024, max_mem_use: 50 * 1024 * 1024,
verifier_settings: VerifierSettings::default(),
}
}
}
/// Verifier settings.
#[derive(Debug, PartialEq, Clone)]
pub struct VerifierSettings {
/// Whether to scale amount of verifiers according to load.
// Todo: replace w/ strategy enum?
pub scale_verifiers: bool,
/// Beginning amount of verifiers.
pub num_verifiers: usize,
}
impl Default for VerifierSettings {
fn default() -> Self {
VerifierSettings {
scale_verifiers: false,
num_verifiers: MAX_VERIFIERS,
} }
} }
} }
@ -114,6 +136,7 @@ pub struct VerificationQueue<K: Kind> {
ticks_since_adjustment: AtomicUsize, ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>, verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>, state: Arc<(Mutex<State>, Condvar)>,
} }
@ -198,13 +221,16 @@ impl<K: Kind> VerificationQueue<K> {
}); });
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let scale_verifiers = config.verifier_settings.scale_verifiers;
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); let num_cpus = ::num_cpus::get();
let default_amount = max(::num_cpus::get(), 3) - 2; let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new())); let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers); let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });
for i in 0..max_verifiers { for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i); debug!(target: "verification", "Adding verification thread #{}", i);
@ -248,6 +274,7 @@ impl<K: Kind> VerificationQueue<K> {
ticks_since_adjustment: AtomicUsize::new(0), ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
scale_verifiers: scale_verifiers,
verifier_handles: verifier_handles, verifier_handles: verifier_handles,
state: state, state: state,
} }
@ -597,6 +624,8 @@ impl<K: Kind> VerificationQueue<K> {
self.processing.write().shrink_to_fit(); self.processing.write().shrink_to_fit();
if !self.scale_verifiers { return }
if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD { if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else { } else {
@ -675,10 +704,15 @@ mod tests {
use error::*; use error::*;
use views::*; use views::*;
fn get_test_queue() -> BlockQueue { // create a test block queue.
// auto_scaling enables verifier adjustment.
fn get_test_queue(auto_scale: bool) -> BlockQueue {
let spec = get_test_spec(); let spec = get_test_spec();
let engine = spec.engine; let engine = spec.engine;
BlockQueue::new(Config::default(), engine, IoChannel::disconnected(), true)
let mut config = Config::default();
config.verifier_settings.scale_verifiers = auto_scale;
BlockQueue::new(config, engine, IoChannel::disconnected(), true)
} }
#[test] #[test]
@ -691,7 +725,7 @@ mod tests {
#[test] #[test]
fn can_import_blocks() { fn can_import_blocks() {
let queue = get_test_queue(); let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e); panic!("error importing block that is valid by definition({:?})", e);
} }
@ -699,7 +733,7 @@ mod tests {
#[test] #[test]
fn returns_error_for_duplicates() { fn returns_error_for_duplicates() {
let queue = get_test_queue(); let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) { if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e); panic!("error importing block that is valid by definition({:?})", e);
} }
@ -718,7 +752,7 @@ mod tests {
#[test] #[test]
fn returns_ok_for_drained_duplicates() { fn returns_ok_for_drained_duplicates() {
let queue = get_test_queue(); let queue = get_test_queue(false);
let block = get_good_dummy_block(); let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone(); let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import(Unverified::new(block)) { if let Err(e) = queue.import(Unverified::new(block)) {
@ -735,7 +769,7 @@ mod tests {
#[test] #[test]
fn returns_empty_once_finished() { fn returns_empty_once_finished() {
let queue = get_test_queue(); let queue = get_test_queue(false);
queue.import(Unverified::new(get_good_dummy_block())) queue.import(Unverified::new(get_good_dummy_block()))
.expect("error importing block that is valid by definition"); .expect("error importing block that is valid by definition");
queue.flush(); queue.flush();
@ -763,7 +797,7 @@ mod tests {
fn scaling_limits() { fn scaling_limits() {
use super::MAX_VERIFIERS; use super::MAX_VERIFIERS;
let queue = get_test_queue(); let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1); queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.num_verifiers() < MAX_VERIFIERS + 1); assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
@ -775,7 +809,7 @@ mod tests {
#[test] #[test]
fn readjust_verifiers() { fn readjust_verifiers() {
let queue = get_test_queue(); let queue = get_test_queue(true);
// put all the verifiers to sleep to ensure // put all the verifiers to sleep to ensure
// the test isn't timing sensitive. // the test isn't timing sensitive.

View File

@ -28,6 +28,7 @@ use ethcore::service::ClientService;
use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, BlockImportError, BlockChainClient, BlockID}; use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, BlockImportError, BlockChainClient, BlockID};
use ethcore::error::ImportError; use ethcore::error::ImportError;
use ethcore::miner::Miner; use ethcore::miner::Miner;
use ethcore::verification::queue::VerifierSettings;
use cache::CacheConfig; use cache::CacheConfig;
use informant::{Informant, MillisecondDuration}; use informant::{Informant, MillisecondDuration};
use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool}; use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool};
@ -84,6 +85,7 @@ pub struct ImportBlockchain {
pub vm_type: VMType, pub vm_type: VMType,
pub check_seal: bool, pub check_seal: bool,
pub with_color: bool, pub with_color: bool,
pub verifier_settings: VerifierSettings,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -175,7 +177,21 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile(db_dirs.fork_path().as_path()))); try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile(db_dirs.fork_path().as_path())));
// prepare client config // prepare client config
let client_config = to_client_config(&cmd.cache_config, Mode::Active, tracing, fat_db, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), algorithm, cmd.pruning_history, cmd.check_seal); let mut client_config = to_client_config(
&cmd.cache_config,
Mode::Active,
tracing,
fat_db,
cmd.compaction,
cmd.wal,
cmd.vm_type,
"".into(),
algorithm,
cmd.pruning_history,
cmd.check_seal
);
client_config.queue.verifier_settings = cmd.verifier_settings;
// build client // build client
let service = try!(ClientService::start( let service = try!(ClientService::start(

View File

@ -94,6 +94,8 @@ cache_size = 128 # Overrides above caches with total size
fast_and_loose = false fast_and_loose = false
db_compaction = "ssd" db_compaction = "ssd"
fat_db = "auto" fat_db = "auto"
scale_verifiers = true
num_verifiers = 6
[snapshots] [snapshots]
disable_periodic = false disable_periodic = false

View File

@ -57,6 +57,7 @@ cache_size_queue = 100
cache_size_state = 25 cache_size_state = 25
db_compaction = "ssd" db_compaction = "ssd"
fat_db = "off" fat_db = "off"
scale_verifiers = false
[snapshots] [snapshots]
disable_periodic = true disable_periodic = true

View File

@ -242,6 +242,10 @@ usage! {
or |c: &Config| otry!(c.footprint).db_compaction.clone(), or |c: &Config| otry!(c.footprint).db_compaction.clone(),
flag_fat_db: String = "auto", flag_fat_db: String = "auto",
or |c: &Config| otry!(c.footprint).fat_db.clone(), or |c: &Config| otry!(c.footprint).fat_db.clone(),
flag_scale_verifiers: bool = false,
or |c: &Config| otry!(c.footprint).scale_verifiers.clone(),
flag_num_verifiers: Option<usize> = None,
or |c: &Config| otry!(c.footprint).num_verifiers.clone().map(Some),
// -- Import/Export Options // -- Import/Export Options
flag_from: String = "1", or |_| None, flag_from: String = "1", or |_| None,
@ -402,6 +406,8 @@ struct Footprint {
cache_size_state: Option<u32>, cache_size_state: Option<u32>,
db_compaction: Option<String>, db_compaction: Option<String>,
fat_db: Option<String>, fat_db: Option<String>,
scale_verifiers: Option<bool>,
num_verifiers: Option<usize>,
} }
#[derive(Default, Debug, PartialEq, RustcDecodable)] #[derive(Default, Debug, PartialEq, RustcDecodable)]
@ -602,6 +608,8 @@ mod tests {
flag_fast_and_loose: false, flag_fast_and_loose: false,
flag_db_compaction: "ssd".into(), flag_db_compaction: "ssd".into(),
flag_fat_db: "auto".into(), flag_fat_db: "auto".into(),
flag_scale_verifiers: true,
flag_num_verifiers: Some(6),
// -- Import/Export Options // -- Import/Export Options
flag_from: "1".into(), flag_from: "1".into(),
@ -771,6 +779,8 @@ mod tests {
cache_size_state: Some(25), cache_size_state: Some(25),
db_compaction: Some("ssd".into()), db_compaction: Some("ssd".into()),
fat_db: Some("off".into()), fat_db: Some("off".into()),
scale_verifiers: Some(false),
num_verifiers: None,
}), }),
snapshots: Some(Snapshots { snapshots: Some(Snapshots {
disable_periodic: Some(true), disable_periodic: Some(true),

View File

@ -250,7 +250,7 @@ Footprint Options:
the state cache (default: {flag_cache_size_state}). the state cache (default: {flag_cache_size_state}).
--cache-size MB Set total amount of discretionary memory to use for --cache-size MB Set total amount of discretionary memory to use for
the entire system, overrides other cache and queue the entire system, overrides other cache and queue
options.a (default: {flag_cache_size:?}) options. (default: {flag_cache_size:?})
--fast-and-loose Disables DB WAL, which gives a significant speed up --fast-and-loose Disables DB WAL, which gives a significant speed up
but means an unclean exit is unrecoverable. (default: {flag_fast_and_loose}) but means an unclean exit is unrecoverable. (default: {flag_fast_and_loose})
--db-compaction TYPE Database compaction type. TYPE may be one of: --db-compaction TYPE Database compaction type. TYPE may be one of:
@ -261,6 +261,11 @@ Footprint Options:
of all accounts and storage keys. Doubles the size of all accounts and storage keys. Doubles the size
of the state database. BOOL may be one of on, off of the state database. BOOL may be one of on, off
or auto. (default: {flag_fat_db}) or auto. (default: {flag_fat_db})
--scale-verifiers Automatically scale amount of verifier threads based on
workload. Not guaranteed to be faster.
(default: {flag_scale_verifiers})
--num-verifiers INT Amount of verifier threads to use or to begin with, if verifier
auto-scaling is enabled. (default: {flag_num_verifiers:?})
Import/Export Options: Import/Export Options:
--from BLOCK Export from block BLOCK, which may be an index or --from BLOCK Export from block BLOCK, which may be an index or

View File

@ -25,6 +25,7 @@ use util::log::Colour;
use ethsync::{NetworkConfiguration, is_valid_node_url, AllowIP}; use ethsync::{NetworkConfiguration, is_valid_node_url, AllowIP};
use ethcore::client::VMType; use ethcore::client::VMType;
use ethcore::miner::{MinerOptions, Banning}; use ethcore::miner::{MinerOptions, Banning};
use ethcore::verification::queue::VerifierSettings;
use rpc::{IpcConfiguration, HttpConfiguration}; use rpc::{IpcConfiguration, HttpConfiguration};
use ethcore_rpc::NetworkSettings; use ethcore_rpc::NetworkSettings;
@ -158,6 +159,7 @@ impl Configuration {
vm_type: vm_type, vm_type: vm_type,
check_seal: !self.args.flag_no_seal_check, check_seal: !self.args.flag_no_seal_check,
with_color: logger_config.color, with_color: logger_config.color,
verifier_settings: self.verifier_settings(),
}; };
Cmd::Blockchain(BlockchainCmd::Import(import_cmd)) Cmd::Blockchain(BlockchainCmd::Import(import_cmd))
} else if self.args.cmd_export { } else if self.args.cmd_export {
@ -241,6 +243,8 @@ impl Configuration {
None None
}; };
let verifier_settings = self.verifier_settings();
let run_cmd = RunCmd { let run_cmd = RunCmd {
cache_config: cache_config, cache_config: cache_config,
dirs: dirs, dirs: dirs,
@ -275,6 +279,7 @@ impl Configuration {
no_periodic_snapshot: self.args.flag_no_periodic_snapshot, no_periodic_snapshot: self.args.flag_no_periodic_snapshot,
check_seal: !self.args.flag_no_seal_check, check_seal: !self.args.flag_no_seal_check,
download_old_blocks: !self.args.flag_no_ancient_blocks, download_old_blocks: !self.args.flag_no_ancient_blocks,
verifier_settings: verifier_settings,
}; };
Cmd::Run(run_cmd) Cmd::Run(run_cmd)
}; };
@ -702,6 +707,16 @@ impl Configuration {
!ui_disabled !ui_disabled
} }
fn verifier_settings(&self) -> VerifierSettings {
let mut settings = VerifierSettings::default();
settings.scale_verifiers = self.args.flag_scale_verifiers;
if let Some(num_verifiers) = self.args.flag_num_verifiers {
settings.num_verifiers = num_verifiers;
}
settings
}
} }
#[cfg(test)] #[cfg(test)]
@ -798,6 +813,7 @@ mod tests {
vm_type: VMType::Interpreter, vm_type: VMType::Interpreter,
check_seal: true, check_seal: true,
with_color: !cfg!(windows), with_color: !cfg!(windows),
verifier_settings: Default::default(),
}))); })));
} }
@ -921,6 +937,7 @@ mod tests {
no_periodic_snapshot: false, no_periodic_snapshot: false,
check_seal: true, check_seal: true,
download_old_blocks: true, download_old_blocks: true,
verifier_settings: Default::default(),
})); }));
} }

View File

@ -28,6 +28,7 @@ use ethcore::service::ClientService;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
use ethcore::snapshot; use ethcore::snapshot;
use ethcore::verification::queue::VerifierSettings;
use ethsync::SyncConfig; use ethsync::SyncConfig;
use informant::Informant; use informant::Informant;
@ -92,6 +93,7 @@ pub struct RunCmd {
pub no_periodic_snapshot: bool, pub no_periodic_snapshot: bool,
pub check_seal: bool, pub check_seal: bool,
pub download_old_blocks: bool, pub download_old_blocks: bool,
pub verifier_settings: VerifierSettings,
} }
pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> { pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> {
@ -217,7 +219,7 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
miner.set_transactions_limit(cmd.miner_extras.transactions_limit); miner.set_transactions_limit(cmd.miner_extras.transactions_limit);
// create client config // create client config
let client_config = to_client_config( let mut client_config = to_client_config(
&cmd.cache_config, &cmd.cache_config,
mode.clone(), mode.clone(),
tracing, tracing,
@ -231,6 +233,8 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
cmd.check_seal, cmd.check_seal,
); );
client_config.queue.verifier_settings = cmd.verifier_settings;
// set up bootnodes // set up bootnodes
let mut net_conf = cmd.net_conf; let mut net_conf = cmd.net_conf;
if !cmd.custom_bootnodes { if !cmd.custom_bootnodes {