// Copyright 2015-2019 Parity Technologies (UK) Ltd. // This file is part of Parity Ethereum. // Parity Ethereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Ethereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . use std::any::Any; use std::sync::{Arc, Weak, atomic}; use std::time::{Duration, Instant}; use std::thread; use ansi_term::Colour; use bytes::Bytes; use call_contract::CallContract; use client_traits::{BlockInfo, BlockChainClient}; use ethcore::client::{Client, DatabaseCompactionProfile, VMType}; use ethcore::miner::{self, stratum, Miner, MinerService, MinerOptions}; use ethcore::snapshot::{self, SnapshotConfiguration}; use ethcore::spec::{SpecParams, OptimizeFor}; use ethcore::verification::queue::VerifierSettings; use ethcore_logger::{Config as LogConfig, RotatingLogger}; use ethcore_service::ClientService; use ethereum_types::Address; use futures::{IntoFuture, Stream}; use hash_fetch::{self, fetch}; use informant::{Informant, LightNodeInformantData, FullNodeInformantData}; use journaldb::Algorithm; use light::Cache as LightDataCache; use miner::external::ExternalMiner; use miner::work_notify::WorkPoster; use node_filter::NodeFilter; use parity_runtime::Runtime; use sync::{self, SyncConfig, PrivateTxHandler}; use types::{ client_types::Mode, ids::BlockId, }; use parity_rpc::{ Origin, Metadata, NetworkSettings, informant, PubSubSession, FutureResult, FutureResponse, FutureOutput }; use updater::{UpdatePolicy, Updater}; use parity_version::version; use ethcore_private_tx::{ProviderConfig, EncryptorConfig, SecretStoreEncryptor}; use params::{ SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch, tracing_switch_to_bool, fatdb_switch_to_bool, mode_switch_to_bool }; use account_utils; use helpers::{to_client_config, execute_upgrades, passwords_from_files}; use dir::{Directories, DatabaseDirectories}; use cache::CacheConfig; use user_defaults::UserDefaults; use ipfs; use jsonrpc_core; use modules; use registrar::{RegistrarClient, Asynchronous}; use rpc; use rpc_apis; use secretstore; use signer; use db; // how often to take periodic snapshots. const SNAPSHOT_PERIOD: u64 = 5000; // how many blocks to wait before starting a periodic snapshot. const SNAPSHOT_HISTORY: u64 = 100; // Number of minutes before a given gas price corpus should expire. // Light client only. const GAS_CORPUS_EXPIRATION_MINUTES: u64 = 60 * 6; // Full client number of DNS threads const FETCH_FULL_NUM_DNS_THREADS: usize = 4; // Light client number of DNS threads const FETCH_LIGHT_NUM_DNS_THREADS: usize = 1; #[derive(Debug, PartialEq)] pub struct RunCmd { pub cache_config: CacheConfig, pub dirs: Directories, pub spec: SpecType, pub pruning: Pruning, pub pruning_history: u64, pub pruning_memory: usize, /// Some if execution should be daemonized. Contains pid_file path. pub daemon: Option, pub logger_config: LogConfig, pub miner_options: MinerOptions, pub gas_price_percentile: usize, pub poll_lifetime: u32, pub ws_conf: rpc::WsConfiguration, pub http_conf: rpc::HttpConfiguration, pub ipc_conf: rpc::IpcConfiguration, pub net_conf: sync::NetworkConfiguration, pub network_id: Option, pub warp_sync: bool, pub warp_barrier: Option, pub acc_conf: AccountsConfig, pub gas_pricer_conf: GasPricerConfig, pub miner_extras: MinerExtras, pub update_policy: UpdatePolicy, pub mode: Option, pub tracing: Switch, pub fat_db: Switch, pub compaction: DatabaseCompactionProfile, pub vm_type: VMType, pub geth_compatibility: bool, pub experimental_rpcs: bool, pub net_settings: NetworkSettings, pub ipfs_conf: ipfs::Configuration, pub secretstore_conf: secretstore::Configuration, pub private_provider_conf: ProviderConfig, pub private_encryptor_conf: EncryptorConfig, pub private_tx_enabled: bool, pub name: String, pub custom_bootnodes: bool, pub stratum: Option, pub snapshot_conf: SnapshotConfiguration, pub check_seal: bool, pub allow_missing_blocks: bool, pub download_old_blocks: bool, pub verifier_settings: VerifierSettings, pub serve_light: bool, pub light: bool, pub no_persistent_txqueue: bool, pub no_hardcoded_sync: bool, pub max_round_blocks_to_import: usize, pub on_demand_response_time_window: Option, pub on_demand_request_backoff_start: Option, pub on_demand_request_backoff_max: Option, pub on_demand_request_backoff_rounds_max: Option, pub on_demand_request_consecutive_failures: Option, } // node info fetcher for the local store. struct FullNodeInfo { miner: Option>, // TODO: only TXQ needed, just use that after decoupling. } impl ::local_store::NodeInfo for FullNodeInfo { fn pending_transactions(&self) -> Vec<::types::transaction::PendingTransaction> { let miner = match self.miner.as_ref() { Some(m) => m, None => return Vec::new(), }; miner.local_transactions() .values() .filter_map(|status| match *status { ::miner::pool::local_transactions::Status::Pending(ref tx) => Some(tx.pending().clone()), _ => None, }) .collect() } } type LightClient = ::light::client::Client<::light_helpers::EpochFetch>; // helper for light execution. fn execute_light_impl(cmd: RunCmd, logger: Arc, on_client_rq: Cr) -> Result where Cr: Fn(String) + 'static + Send { use light::client as light_client; use sync::{LightSyncParams, LightSync, ManageNetwork}; use parking_lot::{Mutex, RwLock}; // load spec let spec = cmd.spec.spec(SpecParams::new(cmd.dirs.cache.as_ref(), OptimizeFor::Memory))?; // load genesis hash let genesis_hash = spec.genesis_header().hash(); // database paths let db_dirs = cmd.dirs.database(genesis_hash, cmd.spec.legacy_fork_name(), spec.data_dir.clone()); // user defaults path let user_defaults_path = db_dirs.user_defaults_path(); // load user defaults let user_defaults = UserDefaults::load(&user_defaults_path)?; // select pruning algorithm let algorithm = cmd.pruning.to_algorithm(&user_defaults); // execute upgrades execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, &cmd.compaction)?; // create dirs used by parity cmd.dirs.create_dirs(cmd.acc_conf.unlocked_accounts.len() == 0, cmd.secretstore_conf.enabled)?; //print out running parity environment print_running_environment(&spec.data_dir, &cmd.dirs, &db_dirs); info!("Running in experimental {} mode.", Colour::Blue.bold().paint("Light Client")); // TODO: configurable cache size. let cache = LightDataCache::new(Default::default(), Duration::from_secs(60 * GAS_CORPUS_EXPIRATION_MINUTES)); let cache = Arc::new(Mutex::new(cache)); // start client and create transaction queue. let mut config = light_client::Config { queue: Default::default(), chain_column: ::ethcore_db::COL_LIGHT_CHAIN, verify_full: true, check_seal: cmd.check_seal, no_hardcoded_sync: cmd.no_hardcoded_sync, }; config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; config.queue.verifier_settings = cmd.verifier_settings; // start on_demand service. let response_time_window = cmd.on_demand_response_time_window.map_or( ::light::on_demand::DEFAULT_RESPONSE_TIME_TO_LIVE, |s| Duration::from_secs(s) ); let request_backoff_start = cmd.on_demand_request_backoff_start.map_or( ::light::on_demand::DEFAULT_REQUEST_MIN_BACKOFF_DURATION, |s| Duration::from_secs(s) ); let request_backoff_max = cmd.on_demand_request_backoff_max.map_or( ::light::on_demand::DEFAULT_REQUEST_MAX_BACKOFF_DURATION, |s| Duration::from_secs(s) ); let on_demand = Arc::new({ ::light::on_demand::OnDemand::new( cache.clone(), response_time_window, request_backoff_start, request_backoff_max, cmd.on_demand_request_backoff_rounds_max.unwrap_or(::light::on_demand::DEFAULT_MAX_REQUEST_BACKOFF_ROUNDS), cmd.on_demand_request_consecutive_failures.unwrap_or(::light::on_demand::DEFAULT_NUM_CONSECUTIVE_FAILED_REQUESTS) ) }); let sync_handle = Arc::new(RwLock::new(Weak::new())); let fetch = ::light_helpers::EpochFetch { on_demand: on_demand.clone(), sync: sync_handle.clone(), }; // initialize database. let db = db::open_db(&db_dirs.client_path(algorithm).to_str().expect("DB path could not be converted to string."), &cmd.cache_config, &cmd.compaction).map_err(|e| format!("Failed to open database {:?}", e))?; let service = light_client::Service::start(config, &spec, fetch, db, cache.clone()) .map_err(|e| format!("Error starting light client: {}", e))?; let client = service.client().clone(); let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default())); let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone()); // start network. // set up bootnodes let mut net_conf = cmd.net_conf; if !cmd.custom_bootnodes { net_conf.boot_nodes = spec.nodes.clone(); } // set network path. net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); let sync_params = LightSyncParams { network_config: net_conf.into_basic().map_err(|e| format!("Failed to produce network config: {}", e))?, client: Arc::new(provider), network_id: cmd.network_id.unwrap_or(spec.network_id()), subprotocol_name: sync::LIGHT_PROTOCOL, handlers: vec![on_demand.clone()], }; let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?; let light_sync = Arc::new(light_sync); *sync_handle.write() = Arc::downgrade(&light_sync); // spin up event loop let runtime = Runtime::with_default_thread_count(); // start the network. light_sync.start_network(); // fetch service let fetch = fetch::Client::new(FETCH_LIGHT_NUM_DNS_THREADS).map_err(|e| format!("Error starting fetch client: {:?}", e))?; let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; // prepare account provider let account_provider = Arc::new(account_utils::prepare_account_provider(&cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords)?); let rpc_stats = Arc::new(informant::RpcStats::default()); // the dapps server let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.logger_config)); // start RPCs let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies { signer_service: signer_service, client: client.clone(), sync: light_sync.clone(), net: light_sync.clone(), accounts: account_provider, logger: logger, settings: Arc::new(cmd.net_settings), on_demand: on_demand, cache: cache.clone(), transaction_queue: txq, ws_address: cmd.ws_conf.address(), fetch: fetch, geth_compatibility: cmd.geth_compatibility, experimental_rpcs: cmd.experimental_rpcs, executor: runtime.executor(), private_tx_service: None, //TODO: add this to client. gas_price_percentile: cmd.gas_price_percentile, poll_lifetime: cmd.poll_lifetime }); let dependencies = rpc::Dependencies { apis: deps_for_rpc_apis.clone(), executor: runtime.executor(), stats: rpc_stats.clone(), }; // start rpc servers let rpc_direct = rpc::setup_apis(rpc_apis::ApiSet::All, &dependencies); let ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?; let http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies)?; let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?; // the informant let informant = Arc::new(Informant::new( LightNodeInformantData { client: client.clone(), sync: light_sync.clone(), cache: cache, }, None, Some(rpc_stats), cmd.logger_config.color, )); service.add_notify(informant.clone()); service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?; client.set_exit_handler(on_client_rq); Ok(RunningClient { inner: RunningClientInner::Light { rpc: rpc_direct, informant, client, keep_alive: Box::new((service, ws_server, http_server, ipc_server, runtime)), } }) } fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: Cr, on_updater_rq: Rr) -> Result where Cr: Fn(String) + 'static + Send, Rr: Fn() + 'static + Send { // load spec let spec = cmd.spec.spec(&cmd.dirs.cache)?; // load genesis hash let genesis_hash = spec.genesis_header().hash(); // database paths let db_dirs = cmd.dirs.database(genesis_hash, cmd.spec.legacy_fork_name(), spec.data_dir.clone()); // user defaults path let user_defaults_path = db_dirs.user_defaults_path(); // load user defaults let mut user_defaults = UserDefaults::load(&user_defaults_path)?; // select pruning algorithm let algorithm = cmd.pruning.to_algorithm(&user_defaults); // check if tracing is on let tracing = tracing_switch_to_bool(cmd.tracing, &user_defaults)?; // check if fatdb is on let fat_db = fatdb_switch_to_bool(cmd.fat_db, &user_defaults, algorithm)?; // get the mode let mode = mode_switch_to_bool(cmd.mode, &user_defaults)?; trace!(target: "mode", "mode is {:?}", mode); let network_enabled = match mode { Mode::Dark(_) | Mode::Off => false, _ => true, }; // get the update policy let update_policy = cmd.update_policy; // prepare client and snapshot paths. let client_path = db_dirs.client_path(algorithm); let snapshot_path = db_dirs.snapshot_path(); // execute upgrades execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, &cmd.compaction)?; // create dirs used by parity cmd.dirs.create_dirs(cmd.acc_conf.unlocked_accounts.len() == 0, cmd.secretstore_conf.enabled)?; //print out running parity environment print_running_environment(&spec.data_dir, &cmd.dirs, &db_dirs); // display info about used pruning algorithm info!("State DB configuration: {}{}{}", Colour::White.bold().paint(algorithm.as_str()), match fat_db { true => Colour::White.bold().paint(" +Fat").to_string(), false => "".to_owned(), }, match tracing { true => Colour::White.bold().paint(" +Trace").to_string(), false => "".to_owned(), } ); info!("Operating mode: {}", Colour::White.bold().paint(format!("{}", mode))); // display warning about using experimental journaldb algorithm if !algorithm.is_stable() { warn!("Your chosen strategy is {}! You can re-run with --pruning to change.", Colour::Red.bold().paint("unstable")); } // create sync config let mut sync_config = SyncConfig::default(); sync_config.network_id = match cmd.network_id { Some(id) => id, None => spec.network_id(), }; if spec.subprotocol_name().len() != 3 { warn!("Your chain specification's subprotocol length is not 3. Ignoring."); } else { sync_config.subprotocol_name.clone_from_slice(spec.subprotocol_name().as_bytes()); } sync_config.fork_block = spec.fork_block(); let mut warp_sync = spec.engine.supports_warp() && cmd.warp_sync; if warp_sync { // Logging is not initialized yet, so we print directly to stderr if fat_db { warn!("Warning: Warp Sync is disabled because Fat DB is turned on."); warp_sync = false; } else if tracing { warn!("Warning: Warp Sync is disabled because tracing is turned on."); warp_sync = false; } else if algorithm != Algorithm::OverlayRecent { warn!("Warning: Warp Sync is disabled because of non-default pruning mode."); warp_sync = false; } } sync_config.warp_sync = match (warp_sync, cmd.warp_barrier) { (true, Some(block)) => sync::WarpSync::OnlyAndAfter(block), (true, _) => sync::WarpSync::Enabled, _ => sync::WarpSync::Disabled, }; sync_config.download_old_blocks = cmd.download_old_blocks; sync_config.serve_light = cmd.serve_light; let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; // prepare account provider let account_provider = Arc::new(account_utils::prepare_account_provider(&cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords)?); // spin up event loop let runtime = Runtime::with_default_thread_count(); // fetch service let fetch = fetch::Client::new(FETCH_FULL_NUM_DNS_THREADS).map_err(|e| format!("Error starting fetch client: {:?}", e))?; let txpool_size = cmd.miner_options.pool_limits.max_count; // create miner let miner = Arc::new(Miner::new( cmd.miner_options, cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), runtime.executor()), &spec, ( cmd.miner_extras.local_accounts, account_utils::miner_local_accounts(account_provider.clone()), ) )); miner.set_author(miner::Author::External(cmd.miner_extras.author)); miner.set_gas_range_target(cmd.miner_extras.gas_range_target); miner.set_extra_data(cmd.miner_extras.extra_data); if !cmd.miner_extras.work_notify.is_empty() { miner.add_work_listener(Box::new( WorkPoster::new(&cmd.miner_extras.work_notify, fetch.clone(), runtime.executor()) )); } let engine_signer = cmd.miner_extras.engine_signer; if engine_signer != Default::default() { if let Some(author) = account_utils::miner_author(&cmd.spec, &cmd.dirs, &account_provider, engine_signer, &passwords)? { miner.set_author(author); } } // display warning if using --no-hardcoded-sync if cmd.no_hardcoded_sync { warn!("The --no-hardcoded-sync flag has no effect if you don't use --light"); } // create client config let mut client_config = to_client_config( &cmd.cache_config, spec.name.to_lowercase(), mode.clone(), tracing, fat_db, cmd.compaction, cmd.vm_type, cmd.name, algorithm, cmd.pruning_history, cmd.pruning_memory, cmd.check_seal, cmd.max_round_blocks_to_import, ); client_config.queue.verifier_settings = cmd.verifier_settings; client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4); client_config.snapshot = cmd.snapshot_conf.clone(); // set up bootnodes let mut net_conf = cmd.net_conf; if !cmd.custom_bootnodes { net_conf.boot_nodes = spec.nodes.clone(); } // set network path. net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config); let client_db = restoration_db_handler.open(&client_path) .map_err(|e| format!("Failed to open database {:?}", e))?; let private_tx_signer = account_utils::private_tx_signer(account_provider.clone(), &passwords)?; // create client service. let service = ClientService::start( client_config, &spec, client_db, &snapshot_path, restoration_db_handler, &cmd.dirs.ipc_path(), miner.clone(), private_tx_signer.clone(), Box::new(SecretStoreEncryptor::new(cmd.private_encryptor_conf.clone(), fetch.clone(), private_tx_signer).map_err(|e| e.to_string())?), cmd.private_provider_conf, cmd.private_encryptor_conf, ).map_err(|e| format!("Client service error: {:?}", e))?; let connection_filter_address = spec.params().node_permission_contract; // drop the spec to free up genesis state. drop(spec); // take handle to client let client = service.client(); // Update miners block gas limit miner.update_transaction_queue_limits(*client.best_block_header().gas_limit()); // take handle to private transactions service let private_tx_service = service.private_tx_service(); let private_tx_provider = private_tx_service.provider(); let connection_filter = connection_filter_address.map(|a| Arc::new(NodeFilter::new(Arc::downgrade(&client) as Weak, a))); let snapshot_service = service.snapshot_service(); if let Some(filter) = connection_filter.clone() { service.add_notify(filter.clone()); } // initialize the local node information store. let store = { let db = service.db(); let node_info = FullNodeInfo { miner: match cmd.no_persistent_txqueue { true => None, false => Some(miner.clone()), } }; let store = ::local_store::create(db.key_value().clone(), ::ethcore_db::COL_NODE_INFO, node_info); if cmd.no_persistent_txqueue { info!("Running without a persistent transaction queue."); if let Err(e) = store.clear() { warn!("Error clearing persistent transaction queue: {}", e); } } // re-queue pending transactions. match store.pending_transactions() { Ok(pending) => { for pending_tx in pending { if let Err(e) = miner.import_own_transaction(&*client, pending_tx) { warn!("Error importing saved transaction: {}", e) } } } Err(e) => warn!("Error loading cached pending transactions from disk: {}", e), } Arc::new(store) }; // register it as an IO service to update periodically. service.register_io_handler(store).map_err(|_| "Unable to register local store handler".to_owned())?; // create external miner let external_miner = Arc::new(ExternalMiner::default()); // start stratum if let Some(ref stratum_config) = cmd.stratum { stratum::Stratum::register(stratum_config, miner.clone(), Arc::downgrade(&client)) .map_err(|e| format!("Stratum start error: {:?}", e))?; } let (private_tx_sync, private_state) = match cmd.private_tx_enabled { true => (Some(private_tx_service.clone() as Arc), Some(private_tx_provider.private_state_db())), false => (None, None), }; // create sync object let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( sync_config, runtime.executor(), net_conf.clone().into(), client.clone(), snapshot_service.clone(), private_tx_sync, private_state, client.clone(), &cmd.logger_config, connection_filter.clone().map(|f| f as Arc<::sync::ConnectionFilter + 'static>), ).map_err(|e| format!("Sync error: {}", e))?; service.add_notify(chain_notify.clone()); // Propagate transactions as soon as they are imported. let tx = ::parking_lot::Mutex::new(priority_tasks); let is_ready = Arc::new(atomic::AtomicBool::new(true)); let executor = runtime.executor(); let pool_receiver = miner.pending_transactions_receiver(); executor.spawn( pool_receiver.for_each(move |_hashes| { // we want to have only one PendingTransactions task in the queue. if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) { let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone()); // we ignore error cause it means that we are closing let _ = tx.lock().send(task); } Ok(()) }) ); // provider not added to a notification center is effectively disabled // TODO [debris] refactor it later on if cmd.private_tx_enabled { service.add_notify(private_tx_provider.clone()); // TODO [ToDr] PrivateTX should use separate notifications // re-using ChainNotify for this is a bit abusive. private_tx_provider.add_notify(chain_notify.clone()); } // start network if network_enabled { chain_notify.start(); } let contract_client = { struct FullRegistrar { client: Arc } impl RegistrarClient for FullRegistrar { type Call = Asynchronous; fn registrar_address(&self) -> Result { self.client.registrar_address() .ok_or_else(|| "Registrar not defined.".into()) } fn call_contract(&self, address: Address, data: Bytes) -> Self::Call { Box::new(self.client.call_contract(BlockId::Latest, address, data).into_future()) } } Arc::new(FullRegistrar { client: client.clone() }) }; // the updater service let updater_fetch = fetch.clone(); let updater = Updater::new( &Arc::downgrade(&(service.client() as Arc)), &Arc::downgrade(&sync_provider), update_policy, hash_fetch::Client::with_fetch(contract_client.clone(), updater_fetch, runtime.executor()) ); service.add_notify(updater.clone()); // set up dependencies for rpc servers let rpc_stats = Arc::new(informant::RpcStats::default()); let secret_store = account_provider.clone(); let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.logger_config)); let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies { signer_service: signer_service, snapshot: snapshot_service.clone(), client: client.clone(), sync: sync_provider.clone(), net: manage_network.clone(), accounts: secret_store, miner: miner.clone(), external_miner: external_miner.clone(), logger: logger.clone(), settings: Arc::new(cmd.net_settings.clone()), net_service: manage_network.clone(), updater: updater.clone(), geth_compatibility: cmd.geth_compatibility, experimental_rpcs: cmd.experimental_rpcs, ws_address: cmd.ws_conf.address(), fetch: fetch.clone(), executor: runtime.executor(), private_tx_service: Some(private_tx_service.clone()), gas_price_percentile: cmd.gas_price_percentile, poll_lifetime: cmd.poll_lifetime, allow_missing_blocks: cmd.allow_missing_blocks, no_ancient_blocks: !cmd.download_old_blocks, }); let dependencies = rpc::Dependencies { apis: deps_for_rpc_apis.clone(), executor: runtime.executor(), stats: rpc_stats.clone(), }; // start rpc servers let rpc_direct = rpc::setup_apis(rpc_apis::ApiSet::All, &dependencies); let ws_server = rpc::new_ws(cmd.ws_conf.clone(), &dependencies)?; let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?; let http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies)?; // secret store key server let secretstore_deps = secretstore::Dependencies { client: client.clone(), sync: sync_provider.clone(), miner: miner.clone(), account_provider, accounts_passwords: &passwords, }; let secretstore_key_server = secretstore::start(cmd.secretstore_conf.clone(), secretstore_deps, runtime.executor())?; // the ipfs server let ipfs_server = ipfs::start_server(cmd.ipfs_conf.clone(), client.clone())?; // the informant let informant = Arc::new(Informant::new( FullNodeInformantData { client: service.client(), sync: Some(sync_provider.clone()), net: Some(manage_network.clone()), }, Some(snapshot_service.clone()), Some(rpc_stats.clone()), cmd.logger_config.color, )); service.add_notify(informant.clone()); service.register_io_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?; // save user defaults user_defaults.is_first_launch = false; user_defaults.pruning = algorithm; user_defaults.tracing = tracing; user_defaults.fat_db = fat_db; user_defaults.set_mode(mode); user_defaults.save(&user_defaults_path)?; // tell client how to save the default mode if it gets changed. client.on_user_defaults_change(move |mode: Option| { if let Some(mode) = mode { user_defaults.set_mode(mode); } let _ = user_defaults.save(&user_defaults_path); // discard failures - there's nothing we can do }); // the watcher must be kept alive. let watcher = match cmd.snapshot_conf.no_periodic { true => None, false => { let sync = sync_provider.clone(); let watcher = Arc::new(snapshot::Watcher::new( service.client(), move || sync.is_major_syncing(), service.io().channel(), SNAPSHOT_PERIOD, SNAPSHOT_HISTORY, )); service.add_notify(watcher.clone()); Some(watcher) }, }; client.set_exit_handler(on_client_rq); updater.set_exit_handler(on_updater_rq); Ok(RunningClient { inner: RunningClientInner::Full { rpc: rpc_direct, informant, client, client_service: Arc::new(service), keep_alive: Box::new((watcher, updater, ws_server, http_server, ipc_server, secretstore_key_server, ipfs_server, runtime)), } }) } /// Parity client currently executing in background threads. /// /// Should be destroyed by calling `shutdown()`, otherwise execution will continue in the /// background. pub struct RunningClient { inner: RunningClientInner, } enum RunningClientInner { Light { rpc: jsonrpc_core::MetaIoHandler>, informant: Arc>, client: Arc, keep_alive: Box, }, Full { rpc: jsonrpc_core::MetaIoHandler>, informant: Arc>, client: Arc, client_service: Arc, keep_alive: Box, }, } impl RunningClient { /// Performs an asynchronous RPC query. // FIXME: [tomaka] This API should be better, with for example a Future pub fn rpc_query(&self, request: &str, session: Option>) -> FutureResult { let metadata = Metadata { origin: Origin::CApi, session, }; match self.inner { RunningClientInner::Light { ref rpc, .. } => rpc.handle_request(request, metadata), RunningClientInner::Full { ref rpc, .. } => rpc.handle_request(request, metadata), } } /// Shuts down the client. pub fn shutdown(self) { match self.inner { RunningClientInner::Light { rpc, informant, client, keep_alive } => { // Create a weak reference to the client so that we can wait on shutdown // until it is dropped let weak_client = Arc::downgrade(&client); drop(rpc); drop(keep_alive); informant.shutdown(); drop(informant); drop(client); wait_for_drop(weak_client); }, RunningClientInner::Full { rpc, informant, client, client_service, keep_alive } => { info!("Finishing work, please wait..."); // Create a weak reference to the client so that we can wait on shutdown // until it is dropped let weak_client = Arc::downgrade(&client); // Shutdown and drop the ClientService client_service.shutdown(); trace!(target: "shutdown", "ClientService shut down"); drop(client_service); trace!(target: "shutdown", "ClientService dropped"); // drop this stuff as soon as exit detected. drop(rpc); trace!(target: "shutdown", "RPC dropped"); drop(keep_alive); trace!(target: "shutdown", "KeepAlive dropped"); // to make sure timer does not spawn requests while shutdown is in progress informant.shutdown(); trace!(target: "shutdown", "Informant shut down"); // just Arc is dropping here, to allow other reference release in its default time drop(informant); trace!(target: "shutdown", "Informant dropped"); drop(client); trace!(target: "shutdown", "Client dropped"); // This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]` // trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count()); trace!(target: "shutdown", "Waiting for refs to Client to shutdown"); wait_for_drop(weak_client); } } } } /// Executes the given run command. /// /// `on_client_rq` is the action to perform when the client receives an RPC request to be restarted /// with a different chain. /// /// `on_updater_rq` is the action to perform when the updater has a new binary to execute. /// /// On error, returns what to print on stderr. pub fn execute(cmd: RunCmd, logger: Arc, on_client_rq: Cr, on_updater_rq: Rr) -> Result where Cr: Fn(String) + 'static + Send, Rr: Fn() + 'static + Send { if cmd.light { execute_light_impl(cmd, logger, on_client_rq) } else { execute_impl(cmd, logger, on_client_rq, on_updater_rq) } } fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &DatabaseDirectories) { info!("Starting {}", Colour::White.bold().paint(version())); info!("Keys path {}", Colour::White.bold().paint(dirs.keys_path(data_dir).to_string_lossy().into_owned())); info!("DB path {}", Colour::White.bold().paint(db_dirs.db_root_path().to_string_lossy().into_owned())); } fn wait_for_drop(w: Weak) { const SLEEP_DURATION: Duration = Duration::from_secs(1); const WARN_TIMEOUT: Duration = Duration::from_secs(60); const MAX_TIMEOUT: Duration = Duration::from_secs(300); let instant = Instant::now(); let mut warned = false; while instant.elapsed() < MAX_TIMEOUT { if w.upgrade().is_none() { return; } if !warned && instant.elapsed() > WARN_TIMEOUT { warned = true; warn!("Shutdown is taking longer than expected."); } thread::sleep(SLEEP_DURATION); // When debugging shutdown issues on a nightly build it can help to enable this with the // `#![feature(weak_counts)]` added to lib.rs (TODO: enable when // https://github.com/rust-lang/rust/issues/57977 is stable) // trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count()); trace!(target: "shutdown", "Waiting for client to drop"); } warn!("Shutdown timeout reached, exiting uncleanly."); }