Fix client not being dropped on shutdown (#7695)

* parity: wait for client to drop on shutdown

* parity: fix grumbles in shutdown wait

* parity: increase shutdown timeouts
This commit is contained in:
André Silva 2018-01-31 10:41:29 +00:00 committed by Afri Schoedon
parent fee88d04d4
commit 4763887a68

View File

@ -16,6 +16,8 @@
use std::fmt; use std::fmt;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use std::thread;
use std::net::{TcpListener}; use std::net::{TcpListener};
use ansi_term::Colour; use ansi_term::Colour;
@ -172,8 +174,10 @@ impl ::local_store::NodeInfo for FullNodeInfo {
} }
} }
type LightClient = ::light::client::Client<::light_helpers::EpochFetch>;
// helper for light execution. // helper for light execution.
fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> { fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<LightClient>), String> {
use light::client as light_client; use light::client as light_client;
use ethsync::{LightSyncParams, LightSync, ManageNetwork}; use ethsync::{LightSyncParams, LightSync, ManageNetwork};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
@ -238,8 +242,9 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let service = light_client::Service::start(config, &spec, fetch, &db_dirs.client_path(algorithm), cache.clone()) let service = light_client::Service::start(config, &spec, fetch, &db_dirs.client_path(algorithm), cache.clone())
.map_err(|e| format!("Error starting light client: {}", e))?; .map_err(|e| format!("Error starting light client: {}", e))?;
let client = service.client();
let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default())); let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default()));
let provider = ::light::provider::LightProvider::new(service.client().clone(), txq.clone()); let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone());
// start network. // start network.
// set up bootnodes // set up bootnodes
@ -276,7 +281,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
// queue cull service. // queue cull service.
let queue_cull = Arc::new(::light_helpers::QueueCull { let queue_cull = Arc::new(::light_helpers::QueueCull {
client: service.client().clone(), client: client.clone(),
sync: light_sync.clone(), sync: light_sync.clone(),
on_demand: on_demand.clone(), on_demand: on_demand.clone(),
txq: txq.clone(), txq: txq.clone(),
@ -300,7 +305,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config)); let signer_service = Arc::new(signer::new_service(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config));
let (node_health, dapps_deps) = { let (node_health, dapps_deps) = {
let contract_client = Arc::new(::dapps::LightRegistrar { let contract_client = Arc::new(::dapps::LightRegistrar {
client: service.client().clone(), client: client.clone(),
sync: light_sync.clone(), sync: light_sync.clone(),
on_demand: on_demand.clone(), on_demand: on_demand.clone(),
}); });
@ -343,7 +348,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let dapps_service = dapps::service(&dapps_middleware); let dapps_service = dapps::service(&dapps_middleware);
let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies { let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies {
signer_service: signer_service, signer_service: signer_service,
client: service.client().clone(), client: client.clone(),
sync: light_sync.clone(), sync: light_sync.clone(),
net: light_sync.clone(), net: light_sync.clone(),
health: node_health, health: node_health,
@ -383,7 +388,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
// the informant // the informant
let informant = Arc::new(Informant::new( let informant = Arc::new(Informant::new(
LightNodeInformantData { LightNodeInformantData {
client: service.client().clone(), client: client.clone(),
sync: light_sync.clone(), sync: light_sync.clone(),
cache: cache, cache: cache,
}, },
@ -398,26 +403,13 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let res = wait_for_exit(None, None, can_restart); let res = wait_for_exit(None, None, can_restart);
informant.shutdown(); informant.shutdown();
Ok(res) // Create a weak reference to the client so that we can wait on shutdown until it is dropped
let weak_client = Arc::downgrade(&client);
Ok((res, weak_client))
} }
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> { pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<Client>), String> {
if cmd.ui && cmd.dapps_conf.enabled {
// Check if Parity is already running
let addr = format!("{}:{}", cmd.ui_conf.interface, cmd.ui_conf.port);
if !TcpListener::bind(&addr as &str).is_ok() {
return open_ui(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config).map(|_| (false, None));
}
}
// increase max number of open files
raise_fd_limit();
// run as light client.
if cmd.light {
return execute_light(cmd, can_restart, logger);
}
// load spec // load spec
let spec = cmd.spec.spec(&cmd.dirs.cache)?; let spec = cmd.spec.spec(&cmd.dirs.cache)?;
@ -856,6 +848,9 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?; open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?;
} }
// Create a weak reference to the client so that we can wait on shutdown until it is dropped
let weak_client = Arc::downgrade(&client);
// Handle exit // Handle exit
let restart = wait_for_exit(Some(updater), Some(client), can_restart); let restart = wait_for_exit(Some(updater), Some(client), can_restart);
@ -869,7 +864,33 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// just Arc is dropping here, to allow other reference release in its default time // just Arc is dropping here, to allow other reference release in its default time
drop(informant); drop(informant);
Ok(restart) Ok((restart, weak_client))
}
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
if cmd.ui && cmd.dapps_conf.enabled {
// Check if Parity is already running
let addr = format!("{}:{}", cmd.ui_conf.interface, cmd.ui_conf.port);
if !TcpListener::bind(&addr as &str).is_ok() {
return open_ui(&cmd.ws_conf, &cmd.ui_conf, &cmd.logger_config).map(|_| (false, None));
}
}
// increase max number of open files
raise_fd_limit();
fn wait<T>(res: Result<((bool, Option<String>), Weak<T>), String>) -> Result<(bool, Option<String>), String> {
res.map(|(restart, weak_client)| {
wait_for_drop(weak_client);
restart
})
}
if cmd.light {
wait(execute_light_impl(cmd, can_restart, logger))
} else {
wait(execute_impl(cmd, can_restart, logger))
}
} }
#[cfg(not(windows))] #[cfg(not(windows))]
@ -1002,3 +1023,27 @@ fn wait_for_exit(
let _ = exit.1.wait(&mut l); let _ = exit.1.wait(&mut l);
l.clone() l.clone()
} }
fn wait_for_drop<T>(w: Weak<T>) {
let sleep_duration = Duration::from_secs(1);
let warn_timeout = Duration::from_secs(60);
let max_timeout = Duration::from_secs(300);
let instant = Instant::now();
let mut warned = false;
while instant.elapsed() < max_timeout {
if w.upgrade().is_none() {
return;
}
if !warned && instant.elapsed() > warn_timeout {
warned = true;
warn!("Shutdown is taking longer than expected.");
}
thread::sleep(sleep_duration);
}
warn!("Shutdown timeout reached, exiting uncleanly.");
}