// Copyright 2015-2020 Parity Technologies (UK) Ltd. // This file is part of OpenEthereum. // OpenEthereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // OpenEthereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with OpenEthereum. If not, see . use std::{ any::Any, str::FromStr, sync::{atomic, Arc, Weak}, thread, time::{Duration, Instant}, }; use crate::{ account_utils, cache::CacheConfig, db, helpers::{execute_upgrades, passwords_from_files, to_client_config}, informant::{FullNodeInformantData, Informant}, metrics::{start_prometheus_metrics, MetricsConfiguration}, miner::{external::ExternalMiner, work_notify::WorkPoster}, modules, params::{ fatdb_switch_to_bool, mode_switch_to_bool, tracing_switch_to_bool, AccountsConfig, GasPricerConfig, MinerExtras, Pruning, SpecType, Switch, }, rpc, rpc_apis, secretstore, signer, sync::{self, SyncConfig}, user_defaults::UserDefaults, }; use ansi_term::Colour; use dir::{DatabaseDirectories, Directories}; use ethcore::{ client::{BlockChainClient, BlockInfo, Client, DatabaseCompactionProfile, Mode, VMType}, miner::{self, stratum, Miner, MinerOptions, MinerService}, snapshot::{self, SnapshotConfiguration}, verification::queue::VerifierSettings, }; use ethcore_logger::{Config as LogConfig, RotatingLogger}; use ethcore_service::ClientService; use ethereum_types::{H256, U64}; use journaldb::Algorithm; use jsonrpc_core; use node_filter::NodeFilter; use parity_rpc::{ informant, is_major_importing, FutureOutput, FutureResponse, FutureResult, Metadata, NetworkSettings, Origin, PubSubSession, }; use parity_runtime::Runtime; use parity_version::version; // How often we attempt to take a snapshot: only snapshot on blocknumbers that are multiples of this. const SNAPSHOT_PERIOD: u64 = 20000; // Start snapshoting from `tip`-`history, with this we want to bypass reorgs. Should be smaller than prunning history. const SNAPSHOT_HISTORY: u64 = 50; // Full client number of DNS threads const FETCH_FULL_NUM_DNS_THREADS: usize = 4; #[derive(Debug, PartialEq)] pub struct RunCmd { pub cache_config: CacheConfig, pub dirs: Directories, pub spec: SpecType, pub pruning: Pruning, pub pruning_history: u64, pub pruning_memory: usize, /// Some if execution should be daemonized. Contains pid_file path. pub daemon: Option, pub logger_config: LogConfig, pub miner_options: MinerOptions, pub gas_price_percentile: usize, pub poll_lifetime: u32, pub ws_conf: rpc::WsConfiguration, pub http_conf: rpc::HttpConfiguration, pub ipc_conf: rpc::IpcConfiguration, pub net_conf: sync::NetworkConfiguration, pub network_id: Option, pub warp_sync: bool, pub warp_barrier: Option, pub acc_conf: AccountsConfig, pub gas_pricer_conf: GasPricerConfig, pub miner_extras: MinerExtras, pub mode: Option, pub tracing: Switch, pub fat_db: Switch, pub compaction: DatabaseCompactionProfile, pub vm_type: VMType, pub experimental_rpcs: bool, pub net_settings: NetworkSettings, pub secretstore_conf: secretstore::Configuration, pub name: String, pub custom_bootnodes: bool, pub stratum: Option, pub snapshot_conf: SnapshotConfiguration, pub check_seal: bool, pub allow_missing_blocks: bool, pub download_old_blocks: bool, pub verifier_settings: VerifierSettings, pub no_persistent_txqueue: bool, pub max_round_blocks_to_import: usize, pub metrics_conf: MetricsConfiguration, } // node info fetcher for the local store. struct FullNodeInfo { miner: Option>, // TODO: only TXQ needed, just use that after decoupling. } impl crate::local_store::NodeInfo for FullNodeInfo { fn pending_transactions(&self) -> Vec { let miner = match self.miner.as_ref() { Some(m) => m, None => return Vec::new(), }; miner .local_transactions() .values() .filter_map(|status| match *status { crate::miner::pool::local_transactions::Status::Pending(ref tx) => { Some(tx.pending().clone()) } _ => None, }) .collect() } } /// Executes the given run command. /// /// On error, returns what to print on stderr. pub fn execute(cmd: RunCmd, logger: Arc) -> Result { // load spec let spec = cmd.spec.spec(&cmd.dirs.cache)?; // load genesis hash let genesis_hash = spec.genesis_header().hash(); // database paths let db_dirs = cmd.dirs.database( genesis_hash, cmd.spec.legacy_fork_name(), spec.data_dir.clone(), ); // user defaults path let user_defaults_path = db_dirs.user_defaults_path(); // load user defaults let mut user_defaults = UserDefaults::load(&user_defaults_path)?; // select pruning algorithm let algorithm = cmd.pruning.to_algorithm(&user_defaults); // check if tracing is on let tracing = tracing_switch_to_bool(cmd.tracing, &user_defaults)?; // check if fatdb is on let fat_db = fatdb_switch_to_bool(cmd.fat_db, &user_defaults, algorithm)?; // get the mode let mode = mode_switch_to_bool(cmd.mode, &user_defaults)?; trace!(target: "mode", "mode is {:?}", mode); let network_enabled = match mode { Mode::Dark(_) | Mode::Off => false, _ => true, }; // prepare client and snapshot paths. let client_path = db_dirs.client_path(algorithm); let snapshot_path = db_dirs.snapshot_path(); // execute upgrades execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, &cmd.compaction)?; // create dirs used by parity cmd.dirs.create_dirs( cmd.acc_conf.unlocked_accounts.len() == 0, cmd.secretstore_conf.enabled, )?; //print out running parity environment print_running_environment(&spec.data_dir, &cmd.dirs, &db_dirs); // display info about used pruning algorithm info!( "State DB configuration: {}{}{}", Colour::White.bold().paint(algorithm.as_str()), match fat_db { true => Colour::White.bold().paint(" +Fat").to_string(), false => "".to_owned(), }, match tracing { true => Colour::White.bold().paint(" +Trace").to_string(), false => "".to_owned(), } ); info!( "Operating mode: {}", Colour::White.bold().paint(format!("{}", mode)) ); // display warning about using experimental journaldb algorithm if !algorithm.is_stable() { warn!( "Your chosen strategy is {}! You can re-run with --pruning to change.", Colour::Red.bold().paint("unstable") ); } // create sync config let mut sync_config = SyncConfig::default(); sync_config.network_id = match cmd.network_id { Some(id) => id, None => spec.network_id(), }; if spec.subprotocol_name().len() > 8 { warn!("Your chain specification's subprotocol length is more then 8. Ignoring."); } else { sync_config.subprotocol_name = U64::from(spec.subprotocol_name().as_bytes()) } sync_config.fork_block = spec.fork_block(); let mut warp_sync = spec.engine.supports_warp() && cmd.warp_sync; if warp_sync { // Logging is not initialized yet, so we print directly to stderr if fat_db { warn!("Warning: Warp Sync is disabled because Fat DB is turned on."); warp_sync = false; } else if tracing { warn!("Warning: Warp Sync is disabled because tracing is turned on."); warp_sync = false; } else if algorithm != Algorithm::OverlayRecent { warn!("Warning: Warp Sync is disabled because of non-default pruning mode."); warp_sync = false; } } sync_config.warp_sync = match (warp_sync, cmd.warp_barrier) { (true, Some(block)) => sync::WarpSync::OnlyAndAfter(block), (true, _) => sync::WarpSync::Enabled, _ => sync::WarpSync::Disabled, }; sync_config.download_old_blocks = cmd.download_old_blocks; let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; // prepare account provider let account_provider = Arc::new(account_utils::prepare_account_provider( &cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords, )?); // spin up event loop let runtime = Runtime::with_default_thread_count(); // fetch service let fetch = fetch::Client::new(FETCH_FULL_NUM_DNS_THREADS) .map_err(|e| format!("Error starting fetch client: {:?}", e))?; let txpool_size = cmd.miner_options.pool_limits.max_count; // create miner let miner = Arc::new(Miner::new( cmd.miner_options, cmd.gas_pricer_conf .to_gas_pricer(fetch.clone(), runtime.executor()), &spec, ( cmd.miner_extras.local_accounts, account_utils::miner_local_accounts(account_provider.clone()), ), )); miner.set_author(miner::Author::External(cmd.miner_extras.author)); miner.set_gas_range_target(cmd.miner_extras.gas_range_target); miner.set_extra_data(cmd.miner_extras.extra_data); if !cmd.miner_extras.work_notify.is_empty() { miner.add_work_listener(Box::new(WorkPoster::new( &cmd.miner_extras.work_notify, fetch.clone(), runtime.executor(), ))); } let engine_signer = cmd.miner_extras.engine_signer; if engine_signer != Default::default() { if let Some(author) = account_utils::miner_author( &cmd.spec, &cmd.dirs, &account_provider, engine_signer, &passwords, )? { miner.set_author(author); } } // create client config let mut client_config = to_client_config( &cmd.cache_config, spec.name.to_lowercase(), mode.clone(), tracing, fat_db, cmd.compaction, cmd.vm_type, cmd.name, algorithm, cmd.pruning_history, cmd.pruning_memory, cmd.check_seal, cmd.max_round_blocks_to_import, ); client_config.queue.verifier_settings = cmd.verifier_settings; client_config.queue.verifier_settings.bad_hashes = verification_bad_blocks(&cmd.spec); client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4); client_config.snapshot = cmd.snapshot_conf.clone(); // set up bootnodes let mut net_conf = cmd.net_conf; if !cmd.custom_bootnodes { net_conf.boot_nodes = spec.nodes.clone(); } // set network path. net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config); let client_db = restoration_db_handler .open(&client_path) .map_err(|e| format!("Failed to open database {:?}", e))?; // create client service. let service = ClientService::start( client_config, &spec, client_db, &snapshot_path, restoration_db_handler, &cmd.dirs.ipc_path(), miner.clone(), ) .map_err(|e| format!("Client service error: {:?}", e))?; let connection_filter_address = spec.params().node_permission_contract; // drop the spec to free up genesis state. let forks = spec.hard_forks.clone(); drop(spec); // take handle to client let client = service.client(); // Update miners block gas limit miner.update_transaction_queue_limits(*client.best_block_header().gas_limit()); let connection_filter = connection_filter_address.map(|a| { Arc::new(NodeFilter::new( Arc::downgrade(&client) as Weak, a, )) }); let snapshot_service = service.snapshot_service(); // initialize the local node information store. let store = { let db = service.db(); let node_info = FullNodeInfo { miner: match cmd.no_persistent_txqueue { true => None, false => Some(miner.clone()), }, }; let store = crate::local_store::create( db.key_value().clone(), ::ethcore_db::COL_NODE_INFO, node_info, ); if cmd.no_persistent_txqueue { info!("Running without a persistent transaction queue."); if let Err(e) = store.clear() { warn!("Error clearing persistent transaction queue: {}", e); } } // re-queue pending transactions. match store.pending_transactions() { Ok(pending) => { for pending_tx in pending { if let Err(e) = miner.import_own_transaction(&*client, pending_tx) { warn!("Error importing saved transaction: {}", e) } } } Err(e) => warn!("Error loading cached pending transactions from disk: {}", e), } Arc::new(store) }; // register it as an IO service to update periodically. service .register_io_handler(store) .map_err(|_| "Unable to register local store handler".to_owned())?; // create external miner let external_miner = Arc::new(ExternalMiner::default()); // start stratum if let Some(ref stratum_config) = cmd.stratum { stratum::Stratum::register(stratum_config, miner.clone(), Arc::downgrade(&client)) .map_err(|e| format!("Stratum start error: {:?}", e))?; } // create sync object let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( sync_config, net_conf.clone().into(), client.clone(), forks, snapshot_service.clone(), &cmd.logger_config, connection_filter .clone() .map(|f| f as Arc), ) .map_err(|e| format!("Sync error: {}", e))?; service.add_notify(chain_notify.clone()); // Propagate transactions as soon as they are imported. let tx = ::parking_lot::Mutex::new(priority_tasks); let is_ready = Arc::new(atomic::AtomicBool::new(true)); miner.add_transactions_listener(Box::new(move |_hashes| { // we want to have only one PendingTransactions task in the queue. if is_ready .compare_exchange( true, false, atomic::Ordering::SeqCst, atomic::Ordering::SeqCst, ) .is_ok() { let task = crate::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone()); // we ignore error cause it means that we are closing let _ = tx.lock().send(task); } })); // start network if network_enabled { chain_notify.start(); } // set up dependencies for rpc servers let rpc_stats = Arc::new(informant::RpcStats::default()); let secret_store = account_provider.clone(); let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.logger_config)); let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies { signer_service: signer_service, snapshot: snapshot_service.clone(), client: client.clone(), sync: sync_provider.clone(), net: manage_network.clone(), accounts: secret_store, miner: miner.clone(), external_miner: external_miner.clone(), logger: logger.clone(), settings: Arc::new(cmd.net_settings.clone()), net_service: manage_network.clone(), experimental_rpcs: cmd.experimental_rpcs, ws_address: cmd.ws_conf.address(), fetch: fetch.clone(), executor: runtime.executor(), gas_price_percentile: cmd.gas_price_percentile, poll_lifetime: cmd.poll_lifetime, allow_missing_blocks: cmd.allow_missing_blocks, no_ancient_blocks: !cmd.download_old_blocks, }); let dependencies = rpc::Dependencies { apis: deps_for_rpc_apis.clone(), executor: runtime.executor(), stats: rpc_stats.clone(), }; // start rpc servers let rpc_direct = rpc::setup_apis(rpc_apis::ApiSet::All, &dependencies); let ws_server = rpc::new_ws(cmd.ws_conf.clone(), &dependencies)?; let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?; // start the prometheus metrics server start_prometheus_metrics(&cmd.metrics_conf, &dependencies)?; let http_server = rpc::new_http( "HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, )?; // secret store key server let secretstore_deps = secretstore::Dependencies { client: client.clone(), sync: sync_provider.clone(), miner: miner.clone(), account_provider, accounts_passwords: &passwords, }; let secretstore_key_server = secretstore::start( cmd.secretstore_conf.clone(), secretstore_deps, runtime.executor(), )?; // the informant let informant = Arc::new(Informant::new( FullNodeInformantData { client: service.client(), sync: Some(sync_provider.clone()), net: Some(manage_network.clone()), }, Some(snapshot_service.clone()), Some(rpc_stats.clone()), cmd.logger_config.color, )); service.add_notify(informant.clone()); service .register_io_handler(informant.clone()) .map_err(|_| "Unable to register informant handler".to_owned())?; // save user defaults user_defaults.is_first_launch = false; user_defaults.pruning = algorithm; user_defaults.tracing = tracing; user_defaults.fat_db = fat_db; user_defaults.set_mode(mode); user_defaults.save(&user_defaults_path)?; // tell client how to save the default mode if it gets changed. client.on_user_defaults_change(move |mode: Option| { if let Some(mode) = mode { user_defaults.set_mode(mode); } let _ = user_defaults.save(&user_defaults_path); // discard failures - there's nothing we can do }); // the watcher must be kept alive. let watcher = match cmd.snapshot_conf.enable { false => None, true => { let sync = sync_provider.clone(); let client = client.clone(); let watcher = Arc::new(snapshot::Watcher::new( service.client(), move || is_major_importing(Some(sync.status().state), client.queue_info()), service.io().channel(), SNAPSHOT_PERIOD, SNAPSHOT_HISTORY, )); service.add_notify(watcher.clone()); Some(watcher) } }; Ok(RunningClient { inner: RunningClientInner::Full { rpc: rpc_direct, informant, client, client_service: Arc::new(service), keep_alive: Box::new(( watcher, ws_server, http_server, ipc_server, secretstore_key_server, runtime, )), }, }) } /// Set bad blocks in VerificationQeueu. By omiting header we can omit particular fork of chain. fn verification_bad_blocks(spec: &SpecType) -> Vec { match *spec { SpecType::Ropsten => { vec![ H256::from_str("1eac3d16c642411f13c287e29144c6f58fda859407c8f24c38deb168e1040714") .expect("Valid hex string"), ] } _ => vec![], } } /// Parity client currently executing in background threads. /// /// Should be destroyed by calling `shutdown()`, otherwise execution will continue in the /// background. pub struct RunningClient { inner: RunningClientInner, } enum RunningClientInner { Full { rpc: jsonrpc_core::MetaIoHandler>, informant: Arc>, client: Arc, client_service: Arc, keep_alive: Box, }, } impl RunningClient { /// Performs an asynchronous RPC query. // FIXME: [tomaka] This API should be better, with for example a Future pub fn rpc_query( &self, request: &str, session: Option>, ) -> FutureResult { let metadata = Metadata { origin: Origin::CApi, session, }; match self.inner { RunningClientInner::Full { ref rpc, .. } => rpc.handle_request(request, metadata), } } /// Shuts down the client. pub fn shutdown(self) { match self.inner { RunningClientInner::Full { rpc, informant, client, client_service, keep_alive, } => { info!("Finishing work, please wait..."); // Create a weak reference to the client so that we can wait on shutdown // until it is dropped let weak_client = Arc::downgrade(&client); // Shutdown and drop the ClientService client_service.shutdown(); trace!(target: "shutdown", "ClientService shut down"); drop(client_service); trace!(target: "shutdown", "ClientService dropped"); // drop this stuff as soon as exit detected. drop(rpc); trace!(target: "shutdown", "RPC dropped"); drop(keep_alive); trace!(target: "shutdown", "KeepAlive dropped"); // to make sure timer does not spawn requests while shutdown is in progress informant.shutdown(); trace!(target: "shutdown", "Informant shut down"); // just Arc is dropping here, to allow other reference release in its default time drop(informant); trace!(target: "shutdown", "Informant dropped"); drop(client); trace!(target: "shutdown", "Client dropped"); // This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]` // trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count()); trace!(target: "shutdown", "Waiting for refs to Client to shutdown"); wait_for_drop(weak_client); } } } } fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &DatabaseDirectories) { info!("Starting {}", Colour::White.bold().paint(version())); info!( "Keys path {}", Colour::White .bold() .paint(dirs.keys_path(data_dir).to_string_lossy().into_owned()) ); info!( "DB path {}", Colour::White .bold() .paint(db_dirs.db_root_path().to_string_lossy().into_owned()) ); } fn wait_for_drop(w: Weak) { const SLEEP_DURATION: Duration = Duration::from_secs(1); const WARN_TIMEOUT: Duration = Duration::from_secs(60); const MAX_TIMEOUT: Duration = Duration::from_secs(300); let instant = Instant::now(); let mut warned = false; while instant.elapsed() < MAX_TIMEOUT { if w.upgrade().is_none() { return; } if !warned && instant.elapsed() > WARN_TIMEOUT { warned = true; warn!("Shutdown is taking longer than expected."); } thread::sleep(SLEEP_DURATION); // When debugging shutdown issues on a nightly build it can help to enable this with the // `#![feature(weak_counts)]` added to lib.rs (TODO: enable when // https://github.com/rust-lang/rust/issues/57977 is stable) // trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count()); trace!(target: "shutdown", "Waiting for client to drop"); } warn!("Shutdown timeout reached, exiting uncleanly."); }