2019-01-07 11:33:07 +01:00
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
2016-07-25 16:09:47 +02:00
2019-01-07 11:33:07 +01:00
// Parity Ethereum is free software: you can redistribute it and/or modify
2016-07-25 16:09:47 +02:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
2019-01-07 11:33:07 +01:00
// Parity Ethereum is distributed in the hope that it will be useful,
2016-07-25 16:09:47 +02:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
2019-01-07 11:33:07 +01:00
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
2016-07-25 16:09:47 +02:00
2020-08-05 06:08:03 +02:00
use std ::{
any ::Any ,
sync ::{ atomic , Arc , Weak } ,
thread ,
time ::{ Duration , Instant } ,
} ;
2017-07-10 13:21:11 +02:00
2020-08-05 06:08:03 +02:00
use account_utils ;
2018-05-09 08:47:21 +02:00
use ansi_term ::Colour ;
2020-08-05 06:08:03 +02:00
use cache ::CacheConfig ;
use db ;
use dir ::{ DatabaseDirectories , Directories } ;
use ethcore ::{
2020-08-14 12:24:16 +02:00
client ::{ BlockChainClient , BlockInfo , Client , DatabaseCompactionProfile , Mode , VMType } ,
2020-08-05 06:08:03 +02:00
miner ::{ self , stratum , Miner , MinerOptions , MinerService } ,
snapshot ::{ self , SnapshotConfiguration } ,
verification ::queue ::VerifierSettings ,
} ;
2018-01-11 17:49:10 +01:00
use ethcore_logger ::{ Config as LogConfig , RotatingLogger } ;
2018-03-13 11:49:57 +01:00
use ethcore_service ::ClientService ;
2020-08-05 06:08:03 +02:00
use helpers ::{ execute_upgrades , passwords_from_files , to_client_config } ;
2020-08-13 18:25:19 +02:00
use informant ::{ FullNodeInformantData , Informant } ;
2018-01-11 17:49:10 +01:00
use journaldb ::Algorithm ;
2020-08-05 06:08:03 +02:00
use jsonrpc_core ;
2020-09-14 16:08:57 +02:00
use metrics ::{ start_prometheus_metrics , MetricsConfiguration } ;
2020-08-05 06:08:03 +02:00
use miner ::{ external ::ExternalMiner , work_notify ::WorkPoster } ;
use modules ;
2018-01-11 17:49:10 +01:00
use node_filter ::NodeFilter ;
2020-08-05 06:08:03 +02:00
use params ::{
fatdb_switch_to_bool , mode_switch_to_bool , tracing_switch_to_bool , AccountsConfig ,
GasPricerConfig , MinerExtras , Pruning , SpecType , Switch ,
} ;
2019-02-07 14:34:24 +01:00
use parity_rpc ::{
2020-08-05 06:08:03 +02:00
informant , is_major_importing , FutureOutput , FutureResponse , FutureResult , Metadata ,
NetworkSettings , Origin , PubSubSession ,
2019-02-07 14:34:24 +01:00
} ;
2020-08-05 06:08:03 +02:00
use parity_runtime ::Runtime ;
2017-12-22 14:37:39 +01:00
use parity_version ::version ;
2016-07-25 16:09:47 +02:00
use rpc ;
2017-05-24 12:24:07 +02:00
use rpc_apis ;
use secretstore ;
use signer ;
2020-09-02 17:43:14 +02:00
use sync ::{ self , SyncConfig } ;
2020-08-05 06:08:03 +02:00
use user_defaults ::UserDefaults ;
2016-07-25 16:09:47 +02:00
2016-09-02 20:24:59 +02:00
// how often to take periodic snapshots.
2017-08-22 11:24:56 +02:00
const SNAPSHOT_PERIOD : u64 = 5000 ;
2016-09-02 20:24:59 +02:00
// how many blocks to wait before starting a periodic snapshot.
2016-10-30 15:39:36 +01:00
const SNAPSHOT_HISTORY : u64 = 100 ;
2016-09-02 18:48:07 +02:00
2018-09-28 15:26:38 +02:00
// Full client number of DNS threads
const FETCH_FULL_NUM_DNS_THREADS : usize = 4 ;
2016-07-25 16:09:47 +02:00
#[ derive(Debug, PartialEq) ]
pub struct RunCmd {
2020-08-05 06:08:03 +02:00
pub cache_config : CacheConfig ,
pub dirs : Directories ,
pub spec : SpecType ,
pub pruning : Pruning ,
pub pruning_history : u64 ,
pub pruning_memory : usize ,
/// Some if execution should be daemonized. Contains pid_file path.
pub daemon : Option < String > ,
pub logger_config : LogConfig ,
pub miner_options : MinerOptions ,
pub gas_price_percentile : usize ,
pub poll_lifetime : u32 ,
pub ws_conf : rpc ::WsConfiguration ,
pub http_conf : rpc ::HttpConfiguration ,
pub ipc_conf : rpc ::IpcConfiguration ,
pub net_conf : sync ::NetworkConfiguration ,
pub network_id : Option < u64 > ,
pub warp_sync : bool ,
pub warp_barrier : Option < u64 > ,
pub acc_conf : AccountsConfig ,
pub gas_pricer_conf : GasPricerConfig ,
pub miner_extras : MinerExtras ,
pub mode : Option < Mode > ,
pub tracing : Switch ,
pub fat_db : Switch ,
pub compaction : DatabaseCompactionProfile ,
pub vm_type : VMType ,
pub experimental_rpcs : bool ,
pub net_settings : NetworkSettings ,
pub secretstore_conf : secretstore ::Configuration ,
pub name : String ,
pub custom_bootnodes : bool ,
pub stratum : Option < stratum ::Options > ,
pub snapshot_conf : SnapshotConfiguration ,
pub check_seal : bool ,
pub allow_missing_blocks : bool ,
pub download_old_blocks : bool ,
pub verifier_settings : VerifierSettings ,
pub no_persistent_txqueue : bool ,
pub max_round_blocks_to_import : usize ,
2020-09-14 16:08:57 +02:00
pub metrics_conf : MetricsConfiguration ,
2016-07-25 16:09:47 +02:00
}
2017-02-20 17:21:55 +01:00
// node info fetcher for the local store.
struct FullNodeInfo {
2020-08-05 06:08:03 +02:00
miner : Option < Arc < Miner > > , // TODO: only TXQ needed, just use that after decoupling.
2017-02-20 17:21:55 +01:00
}
impl ::local_store ::NodeInfo for FullNodeInfo {
2020-08-05 06:08:03 +02:00
fn pending_transactions ( & self ) -> Vec < ::types ::transaction ::PendingTransaction > {
let miner = match self . miner . as_ref ( ) {
Some ( m ) = > m ,
None = > return Vec ::new ( ) ,
} ;
miner
. local_transactions ( )
. values ( )
. filter_map ( | status | match * status {
::miner ::pool ::local_transactions ::Status ::Pending ( ref tx ) = > {
Some ( tx . pending ( ) . clone ( ) )
}
_ = > None ,
} )
. collect ( )
}
2017-02-20 17:21:55 +01:00
}
2017-01-06 16:05:58 +01:00
2020-08-13 18:25:19 +02:00
/// Executes the given run command.
///
/// On error, returns what to print on stderr.
2020-08-14 12:24:16 +02:00
pub fn execute ( cmd : RunCmd , logger : Arc < RotatingLogger > ) -> Result < RunningClient , String > {
2020-08-05 06:08:03 +02:00
// load spec
let spec = cmd . spec . spec ( & cmd . dirs . cache ) ? ;
// load genesis hash
let genesis_hash = spec . genesis_header ( ) . hash ( ) ;
// database paths
let db_dirs = cmd . dirs . database (
genesis_hash ,
cmd . spec . legacy_fork_name ( ) ,
spec . data_dir . clone ( ) ,
) ;
// user defaults path
let user_defaults_path = db_dirs . user_defaults_path ( ) ;
// load user defaults
let mut user_defaults = UserDefaults ::load ( & user_defaults_path ) ? ;
// select pruning algorithm
let algorithm = cmd . pruning . to_algorithm ( & user_defaults ) ;
// check if tracing is on
let tracing = tracing_switch_to_bool ( cmd . tracing , & user_defaults ) ? ;
// check if fatdb is on
let fat_db = fatdb_switch_to_bool ( cmd . fat_db , & user_defaults , algorithm ) ? ;
// get the mode
let mode = mode_switch_to_bool ( cmd . mode , & user_defaults ) ? ;
trace! ( target : " mode " , " mode is {:?} " , mode ) ;
let network_enabled = match mode {
Mode ::Dark ( _ ) | Mode ::Off = > false ,
_ = > true ,
} ;
// prepare client and snapshot paths.
let client_path = db_dirs . client_path ( algorithm ) ;
let snapshot_path = db_dirs . snapshot_path ( ) ;
// execute upgrades
execute_upgrades ( & cmd . dirs . base , & db_dirs , algorithm , & cmd . compaction ) ? ;
// create dirs used by parity
cmd . dirs . create_dirs (
cmd . acc_conf . unlocked_accounts . len ( ) = = 0 ,
cmd . secretstore_conf . enabled ,
) ? ;
//print out running parity environment
print_running_environment ( & spec . data_dir , & cmd . dirs , & db_dirs ) ;
// display info about used pruning algorithm
info! (
" State DB configuration: {}{}{} " ,
Colour ::White . bold ( ) . paint ( algorithm . as_str ( ) ) ,
match fat_db {
true = > Colour ::White . bold ( ) . paint ( " +Fat " ) . to_string ( ) ,
false = > " " . to_owned ( ) ,
} ,
match tracing {
true = > Colour ::White . bold ( ) . paint ( " +Trace " ) . to_string ( ) ,
false = > " " . to_owned ( ) ,
}
) ;
info! (
" Operating mode: {} " ,
Colour ::White . bold ( ) . paint ( format! ( " {} " , mode ) )
) ;
// display warning about using experimental journaldb algorithm
if ! algorithm . is_stable ( ) {
warn! (
" Your chosen strategy is {}! You can re-run with --pruning to change. " ,
Colour ::Red . bold ( ) . paint ( " unstable " )
) ;
}
// create sync config
let mut sync_config = SyncConfig ::default ( ) ;
sync_config . network_id = match cmd . network_id {
Some ( id ) = > id ,
None = > spec . network_id ( ) ,
} ;
if spec . subprotocol_name ( ) . len ( ) ! = 3 {
warn! ( " Your chain specification's subprotocol length is not 3. Ignoring. " ) ;
} else {
sync_config
. subprotocol_name
. clone_from_slice ( spec . subprotocol_name ( ) . as_bytes ( ) ) ;
}
sync_config . fork_block = spec . fork_block ( ) ;
let mut warp_sync = spec . engine . supports_warp ( ) & & cmd . warp_sync ;
if warp_sync {
// Logging is not initialized yet, so we print directly to stderr
if fat_db {
warn! ( " Warning: Warp Sync is disabled because Fat DB is turned on. " ) ;
warp_sync = false ;
} else if tracing {
warn! ( " Warning: Warp Sync is disabled because tracing is turned on. " ) ;
warp_sync = false ;
} else if algorithm ! = Algorithm ::OverlayRecent {
warn! ( " Warning: Warp Sync is disabled because of non-default pruning mode. " ) ;
warp_sync = false ;
}
}
sync_config . warp_sync = match ( warp_sync , cmd . warp_barrier ) {
( true , Some ( block ) ) = > sync ::WarpSync ::OnlyAndAfter ( block ) ,
( true , _ ) = > sync ::WarpSync ::Enabled ,
_ = > sync ::WarpSync ::Disabled ,
} ;
sync_config . download_old_blocks = cmd . download_old_blocks ;
let passwords = passwords_from_files ( & cmd . acc_conf . password_files ) ? ;
// prepare account provider
let account_provider = Arc ::new ( account_utils ::prepare_account_provider (
& cmd . spec ,
& cmd . dirs ,
& spec . data_dir ,
cmd . acc_conf ,
& passwords ,
) ? ) ;
// spin up event loop
let runtime = Runtime ::with_default_thread_count ( ) ;
// fetch service
let fetch = fetch ::Client ::new ( FETCH_FULL_NUM_DNS_THREADS )
. map_err ( | e | format! ( " Error starting fetch client: {:?} " , e ) ) ? ;
let txpool_size = cmd . miner_options . pool_limits . max_count ;
// create miner
let miner = Arc ::new ( Miner ::new (
cmd . miner_options ,
cmd . gas_pricer_conf
. to_gas_pricer ( fetch . clone ( ) , runtime . executor ( ) ) ,
& spec ,
(
cmd . miner_extras . local_accounts ,
account_utils ::miner_local_accounts ( account_provider . clone ( ) ) ,
) ,
) ) ;
miner . set_author ( miner ::Author ::External ( cmd . miner_extras . author ) ) ;
miner . set_gas_range_target ( cmd . miner_extras . gas_range_target ) ;
miner . set_extra_data ( cmd . miner_extras . extra_data ) ;
if ! cmd . miner_extras . work_notify . is_empty ( ) {
miner . add_work_listener ( Box ::new ( WorkPoster ::new (
& cmd . miner_extras . work_notify ,
fetch . clone ( ) ,
runtime . executor ( ) ,
) ) ) ;
}
let engine_signer = cmd . miner_extras . engine_signer ;
if engine_signer ! = Default ::default ( ) {
if let Some ( author ) = account_utils ::miner_author (
& cmd . spec ,
& cmd . dirs ,
& account_provider ,
engine_signer ,
& passwords ,
) ? {
miner . set_author ( author ) ;
}
}
// create client config
let mut client_config = to_client_config (
& cmd . cache_config ,
spec . name . to_lowercase ( ) ,
mode . clone ( ) ,
tracing ,
fat_db ,
cmd . compaction ,
cmd . vm_type ,
cmd . name ,
algorithm ,
cmd . pruning_history ,
cmd . pruning_memory ,
cmd . check_seal ,
cmd . max_round_blocks_to_import ,
) ;
client_config . queue . verifier_settings = cmd . verifier_settings ;
client_config . transaction_verification_queue_size = ::std ::cmp ::max ( 2048 , txpool_size / 4 ) ;
client_config . snapshot = cmd . snapshot_conf . clone ( ) ;
// set up bootnodes
let mut net_conf = cmd . net_conf ;
if ! cmd . custom_bootnodes {
net_conf . boot_nodes = spec . nodes . clone ( ) ;
}
// set network path.
net_conf . net_config_path = Some ( db_dirs . network_path ( ) . to_string_lossy ( ) . into_owned ( ) ) ;
let restoration_db_handler = db ::restoration_db_handler ( & client_path , & client_config ) ;
let client_db = restoration_db_handler
. open ( & client_path )
. map_err ( | e | format! ( " Failed to open database {:?} " , e ) ) ? ;
// create client service.
let service = ClientService ::start (
client_config ,
& spec ,
client_db ,
& snapshot_path ,
restoration_db_handler ,
& cmd . dirs . ipc_path ( ) ,
miner . clone ( ) ,
)
. map_err ( | e | format! ( " Client service error: {:?} " , e ) ) ? ;
let connection_filter_address = spec . params ( ) . node_permission_contract ;
// drop the spec to free up genesis state.
2020-09-21 14:48:14 +02:00
let forks = spec . hard_forks . clone ( ) ;
2020-08-05 06:08:03 +02:00
drop ( spec ) ;
// take handle to client
let client = service . client ( ) ;
// Update miners block gas limit
miner . update_transaction_queue_limits ( * client . best_block_header ( ) . gas_limit ( ) ) ;
let connection_filter = connection_filter_address . map ( | a | {
Arc ::new ( NodeFilter ::new (
2020-07-29 10:36:15 +02:00
Arc ::downgrade ( & client ) as Weak < dyn BlockChainClient > ,
2020-08-05 06:08:03 +02:00
a ,
) )
} ) ;
let snapshot_service = service . snapshot_service ( ) ;
// initialize the local node information store.
let store = {
let db = service . db ( ) ;
let node_info = FullNodeInfo {
miner : match cmd . no_persistent_txqueue {
true = > None ,
false = > Some ( miner . clone ( ) ) ,
} ,
} ;
let store = ::local_store ::create (
db . key_value ( ) . clone ( ) ,
::ethcore_db ::COL_NODE_INFO ,
node_info ,
) ;
if cmd . no_persistent_txqueue {
info! ( " Running without a persistent transaction queue. " ) ;
if let Err ( e ) = store . clear ( ) {
warn! ( " Error clearing persistent transaction queue: {} " , e ) ;
}
}
// re-queue pending transactions.
match store . pending_transactions ( ) {
Ok ( pending ) = > {
for pending_tx in pending {
if let Err ( e ) = miner . import_own_transaction ( & * client , pending_tx ) {
warn! ( " Error importing saved transaction: {} " , e )
}
}
}
Err ( e ) = > warn! ( " Error loading cached pending transactions from disk: {} " , e ) ,
}
Arc ::new ( store )
} ;
// register it as an IO service to update periodically.
service
. register_io_handler ( store )
. map_err ( | _ | " Unable to register local store handler " . to_owned ( ) ) ? ;
// create external miner
let external_miner = Arc ::new ( ExternalMiner ::default ( ) ) ;
// start stratum
if let Some ( ref stratum_config ) = cmd . stratum {
stratum ::Stratum ::register ( stratum_config , miner . clone ( ) , Arc ::downgrade ( & client ) )
. map_err ( | e | format! ( " Stratum start error: {:?} " , e ) ) ? ;
}
// create sync object
let ( sync_provider , manage_network , chain_notify , priority_tasks ) = modules ::sync (
sync_config ,
net_conf . clone ( ) . into ( ) ,
client . clone ( ) ,
2020-09-21 14:48:14 +02:00
forks ,
2020-08-05 06:08:03 +02:00
snapshot_service . clone ( ) ,
& cmd . logger_config ,
connection_filter
. clone ( )
2020-07-29 10:36:15 +02:00
. map ( | f | f as Arc < dyn crate ::sync ::ConnectionFilter + 'static > ) ,
2020-08-05 06:08:03 +02:00
)
. map_err ( | e | format! ( " Sync error: {} " , e ) ) ? ;
service . add_notify ( chain_notify . clone ( ) ) ;
// Propagate transactions as soon as they are imported.
let tx = ::parking_lot ::Mutex ::new ( priority_tasks ) ;
let is_ready = Arc ::new ( atomic ::AtomicBool ::new ( true ) ) ;
miner . add_transactions_listener ( Box ::new ( move | _hashes | {
// we want to have only one PendingTransactions task in the queue.
if is_ready . compare_and_swap ( true , false , atomic ::Ordering ::SeqCst ) {
let task =
::sync ::PriorityTask ::PropagateTransactions ( Instant ::now ( ) , is_ready . clone ( ) ) ;
// we ignore error cause it means that we are closing
let _ = tx . lock ( ) . send ( task ) ;
}
} ) ) ;
// start network
if network_enabled {
chain_notify . start ( ) ;
}
// set up dependencies for rpc servers
let rpc_stats = Arc ::new ( informant ::RpcStats ::default ( ) ) ;
let secret_store = account_provider . clone ( ) ;
let signer_service = Arc ::new ( signer ::new_service ( & cmd . ws_conf , & cmd . logger_config ) ) ;
let deps_for_rpc_apis = Arc ::new ( rpc_apis ::FullDependencies {
signer_service : signer_service ,
snapshot : snapshot_service . clone ( ) ,
client : client . clone ( ) ,
sync : sync_provider . clone ( ) ,
net : manage_network . clone ( ) ,
accounts : secret_store ,
miner : miner . clone ( ) ,
external_miner : external_miner . clone ( ) ,
logger : logger . clone ( ) ,
settings : Arc ::new ( cmd . net_settings . clone ( ) ) ,
net_service : manage_network . clone ( ) ,
experimental_rpcs : cmd . experimental_rpcs ,
ws_address : cmd . ws_conf . address ( ) ,
fetch : fetch . clone ( ) ,
executor : runtime . executor ( ) ,
gas_price_percentile : cmd . gas_price_percentile ,
poll_lifetime : cmd . poll_lifetime ,
allow_missing_blocks : cmd . allow_missing_blocks ,
no_ancient_blocks : ! cmd . download_old_blocks ,
} ) ;
let dependencies = rpc ::Dependencies {
apis : deps_for_rpc_apis . clone ( ) ,
executor : runtime . executor ( ) ,
stats : rpc_stats . clone ( ) ,
} ;
// start rpc servers
let rpc_direct = rpc ::setup_apis ( rpc_apis ::ApiSet ::All , & dependencies ) ;
let ws_server = rpc ::new_ws ( cmd . ws_conf . clone ( ) , & dependencies ) ? ;
let ipc_server = rpc ::new_ipc ( cmd . ipc_conf , & dependencies ) ? ;
2020-09-14 16:08:57 +02:00
// start the prometheus metrics server
start_prometheus_metrics ( & cmd . metrics_conf , & dependencies ) ? ;
2020-08-05 06:08:03 +02:00
let http_server = rpc ::new_http (
" HTTP JSON-RPC " ,
" jsonrpc " ,
cmd . http_conf . clone ( ) ,
& dependencies ,
) ? ;
// secret store key server
let secretstore_deps = secretstore ::Dependencies {
client : client . clone ( ) ,
sync : sync_provider . clone ( ) ,
miner : miner . clone ( ) ,
account_provider ,
accounts_passwords : & passwords ,
} ;
let secretstore_key_server = secretstore ::start (
cmd . secretstore_conf . clone ( ) ,
secretstore_deps ,
runtime . executor ( ) ,
) ? ;
// the informant
let informant = Arc ::new ( Informant ::new (
FullNodeInformantData {
client : service . client ( ) ,
sync : Some ( sync_provider . clone ( ) ) ,
net : Some ( manage_network . clone ( ) ) ,
} ,
Some ( snapshot_service . clone ( ) ) ,
Some ( rpc_stats . clone ( ) ) ,
cmd . logger_config . color ,
) ) ;
service . add_notify ( informant . clone ( ) ) ;
service
. register_io_handler ( informant . clone ( ) )
. map_err ( | _ | " Unable to register informant handler " . to_owned ( ) ) ? ;
// save user defaults
user_defaults . is_first_launch = false ;
user_defaults . pruning = algorithm ;
user_defaults . tracing = tracing ;
user_defaults . fat_db = fat_db ;
user_defaults . set_mode ( mode ) ;
user_defaults . save ( & user_defaults_path ) ? ;
// tell client how to save the default mode if it gets changed.
client . on_user_defaults_change ( move | mode : Option < Mode > | {
if let Some ( mode ) = mode {
user_defaults . set_mode ( mode ) ;
}
let _ = user_defaults . save ( & user_defaults_path ) ; // discard failures - there's nothing we can do
} ) ;
// the watcher must be kept alive.
2020-09-15 16:51:49 +02:00
let watcher = match cmd . snapshot_conf . enable {
false = > None ,
true = > {
2020-08-05 06:08:03 +02:00
let sync = sync_provider . clone ( ) ;
let client = client . clone ( ) ;
let watcher = Arc ::new ( snapshot ::Watcher ::new (
service . client ( ) ,
move | | is_major_importing ( Some ( sync . status ( ) . state ) , client . queue_info ( ) ) ,
service . io ( ) . channel ( ) ,
SNAPSHOT_PERIOD ,
SNAPSHOT_HISTORY ,
) ) ;
service . add_notify ( watcher . clone ( ) ) ;
Some ( watcher )
}
} ;
Ok ( RunningClient {
inner : RunningClientInner ::Full {
rpc : rpc_direct ,
informant ,
client ,
client_service : Arc ::new ( service ) ,
keep_alive : Box ::new ( (
watcher ,
ws_server ,
http_server ,
ipc_server ,
secretstore_key_server ,
runtime ,
) ) ,
} ,
} )
2018-04-04 11:50:28 +02:00
}
2017-05-24 12:24:07 +02:00
2018-05-09 08:47:21 +02:00
/// Parity client currently executing in background threads.
///
/// Should be destroyed by calling `shutdown()`, otherwise execution will continue in the
/// background.
pub struct RunningClient {
2020-08-05 06:08:03 +02:00
inner : RunningClientInner ,
2018-05-09 08:47:21 +02:00
}
enum RunningClientInner {
2020-08-05 06:08:03 +02:00
Full {
rpc :
jsonrpc_core ::MetaIoHandler < Metadata , informant ::Middleware < informant ::ClientNotifier > > ,
informant : Arc < Informant < FullNodeInformantData > > ,
client : Arc < Client > ,
client_service : Arc < ClientService > ,
2020-07-29 10:36:15 +02:00
keep_alive : Box < dyn Any > ,
2020-08-05 06:08:03 +02:00
} ,
2018-04-04 11:50:28 +02:00
}
2016-09-03 10:31:29 +02:00
2018-04-04 11:50:28 +02:00
impl RunningClient {
2020-08-05 06:08:03 +02:00
/// Performs an asynchronous RPC query.
// FIXME: [tomaka] This API should be better, with for example a Future
pub fn rpc_query (
& self ,
request : & str ,
session : Option < Arc < PubSubSession > > ,
) -> FutureResult < FutureResponse , FutureOutput > {
let metadata = Metadata {
origin : Origin ::CApi ,
session ,
} ;
match self . inner {
RunningClientInner ::Full { ref rpc , .. } = > rpc . handle_request ( request , metadata ) ,
}
}
/// Shuts down the client.
pub fn shutdown ( self ) {
match self . inner {
RunningClientInner ::Full {
rpc ,
informant ,
client ,
client_service ,
keep_alive ,
} = > {
info! ( " Finishing work, please wait... " ) ;
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc ::downgrade ( & client ) ;
// Shutdown and drop the ClientService
client_service . shutdown ( ) ;
trace! ( target : " shutdown " , " ClientService shut down " ) ;
drop ( client_service ) ;
trace! ( target : " shutdown " , " ClientService dropped " ) ;
// drop this stuff as soon as exit detected.
drop ( rpc ) ;
trace! ( target : " shutdown " , " RPC dropped " ) ;
drop ( keep_alive ) ;
trace! ( target : " shutdown " , " KeepAlive dropped " ) ;
// to make sure timer does not spawn requests while shutdown is in progress
informant . shutdown ( ) ;
trace! ( target : " shutdown " , " Informant shut down " ) ;
// just Arc is dropping here, to allow other reference release in its default time
drop ( informant ) ;
trace! ( target : " shutdown " , " Informant dropped " ) ;
drop ( client ) ;
trace! ( target : " shutdown " , " Client dropped " ) ;
// This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]`
// trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count());
trace! ( target : " shutdown " , " Waiting for refs to Client to shutdown " ) ;
wait_for_drop ( weak_client ) ;
}
}
}
2018-01-31 11:41:29 +01:00
}
2018-09-09 00:43:24 +02:00
fn print_running_environment ( data_dir : & str , dirs : & Directories , db_dirs : & DatabaseDirectories ) {
2020-08-05 06:08:03 +02:00
info! ( " Starting {} " , Colour ::White . bold ( ) . paint ( version ( ) ) ) ;
info! (
" Keys path {} " ,
Colour ::White
. bold ( )
. paint ( dirs . keys_path ( data_dir ) . to_string_lossy ( ) . into_owned ( ) )
) ;
info! (
" DB path {} " ,
Colour ::White
. bold ( )
. paint ( db_dirs . db_root_path ( ) . to_string_lossy ( ) . into_owned ( ) )
) ;
2017-06-22 20:08:56 +02:00
}
2018-01-31 11:41:29 +01:00
fn wait_for_drop < T > ( w : Weak < T > ) {
2020-08-05 06:08:03 +02:00
const SLEEP_DURATION : Duration = Duration ::from_secs ( 1 ) ;
const WARN_TIMEOUT : Duration = Duration ::from_secs ( 60 ) ;
const MAX_TIMEOUT : Duration = Duration ::from_secs ( 300 ) ;
2018-01-31 11:41:29 +01:00
2020-08-05 06:08:03 +02:00
let instant = Instant ::now ( ) ;
let mut warned = false ;
2018-01-31 11:41:29 +01:00
2020-08-05 06:08:03 +02:00
while instant . elapsed ( ) < MAX_TIMEOUT {
if w . upgrade ( ) . is_none ( ) {
return ;
}
2018-01-31 11:41:29 +01:00
2020-08-05 06:08:03 +02:00
if ! warned & & instant . elapsed ( ) > WARN_TIMEOUT {
warned = true ;
warn! ( " Shutdown is taking longer than expected. " ) ;
}
2018-01-31 11:41:29 +01:00
2020-08-05 06:08:03 +02:00
thread ::sleep ( SLEEP_DURATION ) ;
2019-06-25 15:38:29 +02:00
2020-08-05 06:08:03 +02:00
// When debugging shutdown issues on a nightly build it can help to enable this with the
// `#![feature(weak_counts)]` added to lib.rs (TODO: enable when
// https://github.com/rust-lang/rust/issues/57977 is stable)
// trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count());
trace! ( target : " shutdown " , " Waiting for client to drop " ) ;
}
2018-01-31 11:41:29 +01:00
2020-08-05 06:08:03 +02:00
warn! ( " Shutdown timeout reached, exiting uncleanly. " ) ;
2018-01-31 11:41:29 +01:00
}