diff --git a/parity/run.rs b/parity/run.rs index ae5a299e6..3ee4b9db6 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -16,6 +16,8 @@ use std::fmt; use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; +use std::thread; use std::net::{TcpListener}; use ansi_term::Colour; @@ -172,8 +174,10 @@ impl ::local_store::NodeInfo for FullNodeInfo { } } +type LightClient = ::light::client::Client<::light_helpers::EpochFetch>; + // helper for light execution. -fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<(bool, Option), String> { +fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<((bool, Option), Weak), String> { use light::client as light_client; use ethsync::{LightSyncParams, LightSync, ManageNetwork}; use parking_lot::{Mutex, RwLock}; @@ -238,8 +242,9 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let service = light_client::Service::start(config, &spec, fetch, &db_dirs.client_path(algorithm), cache.clone()) .map_err(|e| format!("Error starting light client: {}", e))?; + let client = service.client(); let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default())); - let provider = ::light::provider::LightProvider::new(service.client().clone(), txq.clone()); + let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone()); // start network. // set up bootnodes @@ -276,7 +281,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> // queue cull service. let queue_cull = Arc::new(::light_helpers::QueueCull { - client: service.client().clone(), + client: client.clone(), sync: light_sync.clone(), on_demand: on_demand.clone(), txq: txq.clone(), @@ -300,7 +305,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config)); let (node_health, dapps_deps) = { let contract_client = Arc::new(::dapps::LightRegistrar { - client: service.client().clone(), + client: client.clone(), sync: light_sync.clone(), on_demand: on_demand.clone(), }); @@ -343,7 +348,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let dapps_service = dapps::service(&dapps_middleware); let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies { signer_service: signer_service, - client: service.client().clone(), + client: client.clone(), sync: light_sync.clone(), net: light_sync.clone(), health: node_health, @@ -383,7 +388,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> // the informant let informant = Arc::new(Informant::new( LightNodeInformantData { - client: service.client().clone(), + client: client.clone(), sync: light_sync.clone(), cache: cache, }, @@ -398,26 +403,13 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let res = wait_for_exit(None, None, can_restart); informant.shutdown(); - Ok(res) + // Create a weak reference to the client so that we can wait on shutdown until it is dropped + let weak_client = Arc::downgrade(&client); + + Ok((res, weak_client)) } -pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<(bool, Option), String> { - if cmd.ui && cmd.dapps_conf.enabled { - // Check if Parity is already running - let addr = format!("{}:{}", cmd.ui_conf.interface, cmd.ui_conf.port); - if !TcpListener::bind(&addr as &str).is_ok() { - return open_ui(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config).map(|_| (false, None)); - } - } - - // increase max number of open files - raise_fd_limit(); - - // run as light client. - if cmd.light { - return execute_light(cmd, can_restart, logger); - } - +pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<((bool, Option), Weak), String> { // load spec let spec = cmd.spec.spec(&cmd.dirs.cache)?; @@ -856,6 +848,9 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?; } + // Create a weak reference to the client so that we can wait on shutdown until it is dropped + let weak_client = Arc::downgrade(&client); + // Handle exit let restart = wait_for_exit(Some(updater), Some(client), can_restart); @@ -869,7 +864,33 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R // just Arc is dropping here, to allow other reference release in its default time drop(informant); - Ok(restart) + Ok((restart, weak_client)) +} + +pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<(bool, Option), String> { + if cmd.ui && cmd.dapps_conf.enabled { + // Check if Parity is already running + let addr = format!("{}:{}", cmd.ui_conf.interface, cmd.ui_conf.port); + if !TcpListener::bind(&addr as &str).is_ok() { + return open_ui(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config).map(|_| (false, None)); + } + } + + // increase max number of open files + raise_fd_limit(); + + fn wait(res: Result<((bool, Option), Weak), String>) -> Result<(bool, Option), String> { + res.map(|(restart, weak_client)| { + wait_for_drop(weak_client); + restart + }) + } + + if cmd.light { + wait(execute_light_impl(cmd, can_restart, logger)) + } else { + wait(execute_impl(cmd, can_restart, logger)) + } } #[cfg(not(windows))] @@ -1002,3 +1023,27 @@ fn wait_for_exit( let _ = exit.1.wait(&mut l); l.clone() } + +fn wait_for_drop(w: Weak) { + let sleep_duration = Duration::from_secs(1); + let warn_timeout = Duration::from_secs(60); + let max_timeout = Duration::from_secs(300); + + let instant = Instant::now(); + let mut warned = false; + + while instant.elapsed() < max_timeout { + if w.upgrade().is_none() { + return; + } + + if !warned && instant.elapsed() > warn_timeout { + warned = true; + warn!("Shutdown is taking longer than expected."); + } + + thread::sleep(sleep_duration); + } + + warn!("Shutdown timeout reached, exiting uncleanly."); +}