Cleaner binary shutdown system (#8284)

* Cleaner shutdown system when executing

* Simplify set_exit_handler for Client

* Minor change

* Fix submodule
This commit is contained in:
Pierre Krieger 2018-04-04 11:50:28 +02:00 committed by Rando
parent 0455aa96bf
commit e12a5159a8
2 changed files with 100 additions and 77 deletions

View File

@ -220,7 +220,7 @@ pub struct Client {
registrar_address: Option<Address>, registrar_address: Option<Address>,
/// A closure to call when we want to restart the client /// A closure to call when we want to restart the client
exit_handler: Mutex<Option<Box<Fn(bool, Option<String>) + 'static + Send>>>, exit_handler: Mutex<Option<Box<Fn(String) + 'static + Send>>>,
importer: Importer, importer: Importer,
} }
@ -825,8 +825,11 @@ impl Client {
self.notify.write().push(Arc::downgrade(&target)); self.notify.write().push(Arc::downgrade(&target));
} }
/// Set a closure to call when we want to restart the client /// Set a closure to call when the client wants to be restarted.
pub fn set_exit_handler<F>(&self, f: F) where F: Fn(bool, Option<String>) + 'static + Send { ///
/// The parameter passed to the callback is the name of the new chain spec to use after
/// the restart.
pub fn set_exit_handler<F>(&self, f: F) where F: Fn(String) + 'static + Send {
*self.exit_handler.lock() = Some(Box::new(f)); *self.exit_handler.lock() = Some(Box::new(f));
} }
@ -1625,7 +1628,7 @@ impl BlockChainClient for Client {
return; return;
} }
if let Some(ref h) = *self.exit_handler.lock() { if let Some(ref h) = *self.exit_handler.lock() {
(*h)(true, Some(new_spec_name)); (*h)(new_spec_name);
} else { } else {
warn!("Not hypervised; cannot change chain."); warn!("Not hypervised; cannot change chain.");
} }

View File

@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::any::Any;
use std::fmt; use std::fmt;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -182,7 +183,7 @@ impl ::local_store::NodeInfo for FullNodeInfo {
type LightClient = ::light::client::Client<::light_helpers::EpochFetch>; type LightClient = ::light::client::Client<::light_helpers::EpochFetch>;
// helper for light execution. // helper for light execution.
fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<LightClient>), String> { fn execute_light_impl(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient, String> {
use light::client as light_client; use light::client as light_client;
use ethsync::{LightSyncParams, LightSync, ManageNetwork}; use ethsync::{LightSyncParams, LightSync, ManageNetwork};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
@ -260,7 +261,7 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
let service = light_client::Service::start(config, &spec, fetch, db, cache.clone()) let service = light_client::Service::start(config, &spec, fetch, db, cache.clone())
.map_err(|e| format!("Error starting light client: {}", e))?; .map_err(|e| format!("Error starting light client: {}", e))?;
let client = service.client(); let client = service.client().clone();
let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default())); let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default()));
let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone()); let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone());
@ -402,10 +403,10 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
}; };
// start rpc servers // start rpc servers
let _ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?; let ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let _http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, dapps_middleware)?; let http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, dapps_middleware)?;
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?; let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?; let ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;
// the informant // the informant
let informant = Arc::new(Informant::new( let informant = Arc::new(Informant::new(
@ -421,17 +422,18 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?; service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?;
// wait for ctrl-c and then shut down the informant. Ok(RunningClient::Light {
let res = wait_for_exit(None, None, can_restart); informant,
informant.shutdown(); client,
keep_alive: Box::new((service, ws_server, http_server, ipc_server, ui_server)),
// Create a weak reference to the client so that we can wait on shutdown until it is dropped })
let weak_client = Arc::downgrade(&client);
Ok((res, weak_client))
} }
pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<Client>), String> { fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq: Cr,
on_updater_rq: Rr) -> Result<RunningClient, String>
where Cr: Fn(String) + 'static + Send,
Rr: Fn() + 'static + Send
{
// load spec // load spec
let spec = cmd.spec.spec(&cmd.dirs.cache)?; let spec = cmd.spec.spec(&cmd.dirs.cache)?;
@ -854,7 +856,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
}); });
// the watcher must be kept alive. // the watcher must be kept alive.
let _watcher = match cmd.no_periodic_snapshot { let watcher = match cmd.no_periodic_snapshot {
true => None, true => None,
false => { false => {
let sync = sync_provider.clone(); let sync = sync_provider.clone();
@ -881,23 +883,58 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?; open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?;
} }
// Create a weak reference to the client so that we can wait on shutdown until it is dropped client.set_exit_handler(on_client_rq);
let weak_client = Arc::downgrade(&client); updater.set_exit_handler(on_updater_rq);
// Handle exit Ok(RunningClient::Full {
let restart = wait_for_exit(Some(updater), Some(client), can_restart); informant,
client,
keep_alive: Box::new((watcher, service, updater, ws_server, http_server, ipc_server, ui_server, secretstore_key_server, ipfs_server, event_loop)),
})
}
info!("Finishing work, please wait..."); enum RunningClient {
Light {
informant: Arc<Informant<LightNodeInformantData>>,
client: Arc<LightClient>,
keep_alive: Box<Any>,
},
Full {
informant: Arc<Informant<FullNodeInformantData>>,
client: Arc<Client>,
keep_alive: Box<Any>,
},
}
// drop this stuff as soon as exit detected. impl RunningClient {
drop((ws_server, http_server, ipc_server, ui_server, secretstore_key_server, ipfs_server, event_loop)); fn shutdown(self) {
match self {
// to make sure timer does not spawn requests while shutdown is in progress RunningClient::Light { informant, client, keep_alive } => {
informant.shutdown(); // Create a weak reference to the client so that we can wait on shutdown
// just Arc is dropping here, to allow other reference release in its default time // until it is dropped
drop(informant); let weak_client = Arc::downgrade(&client);
drop(keep_alive);
Ok((restart, weak_client)) informant.shutdown();
drop(informant);
drop(client);
wait_for_drop(weak_client);
},
RunningClient::Full { informant, client, keep_alive } => {
info!("Finishing work, please wait...");
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
// drop this stuff as soon as exit detected.
drop(keep_alive);
// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
drop(client);
wait_for_drop(weak_client);
}
}
}
} }
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> { pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
@ -917,18 +954,34 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// increase max number of open files // increase max number of open files
raise_fd_limit(); raise_fd_limit();
fn wait<T>(res: Result<((bool, Option<String>), Weak<T>), String>) -> Result<(bool, Option<String>), String> { let exit = Arc::new((Mutex::new((false, None)), Condvar::new()));
res.map(|(restart, weak_client)| {
wait_for_drop(weak_client);
restart
})
}
if cmd.light { let running_client = if cmd.light {
wait(execute_light_impl(cmd, can_restart, logger)) execute_light_impl(cmd, logger)?
} else if can_restart {
let e1 = exit.clone();
let e2 = exit.clone();
execute_impl(cmd, logger,
move |new_chain: String| { *e1.0.lock() = (true, Some(new_chain)); e1.1.notify_all(); },
move || { *e2.0.lock() = (true, None); e2.1.notify_all(); })?
} else { } else {
wait(execute_impl(cmd, can_restart, logger)) trace!(target: "mode", "Not hypervised: not setting exit handlers.");
} execute_impl(cmd, logger, move |_| {}, move || {})?
};
// Handle possible exits
CtrlC::set_handler({
let e = exit.clone();
move || { e.1.notify_all(); }
});
// Wait for signal
let mut l = exit.0.lock();
let _ = exit.1.wait(&mut l);
running_client.shutdown();
Ok(l.clone())
} }
#[cfg(not(windows))] #[cfg(not(windows))]
@ -1029,39 +1082,6 @@ fn build_create_account_hint(spec: &SpecType, keys: &str) -> String {
format!("You can create an account via RPC, UI or `parity account new --chain {} --keys-path {}`.", spec, keys) format!("You can create an account via RPC, UI or `parity account new --chain {} --keys-path {}`.", spec, keys)
} }
fn wait_for_exit(
updater: Option<Arc<Updater>>,
client: Option<Arc<Client>>,
can_restart: bool
) -> (bool, Option<String>) {
let exit = Arc::new((Mutex::new((false, None)), Condvar::new()));
// Handle possible exits
let e = exit.clone();
CtrlC::set_handler(move || { e.1.notify_all(); });
if can_restart {
if let Some(updater) = updater {
// Handle updater wanting to restart us
let e = exit.clone();
updater.set_exit_handler(move || { *e.0.lock() = (true, None); e.1.notify_all(); });
}
if let Some(client) = client {
// Handle updater wanting to restart us
let e = exit.clone();
client.set_exit_handler(move |restart, new_chain: Option<String>| { *e.0.lock() = (restart, new_chain); e.1.notify_all(); });
}
} else {
trace!(target: "mode", "Not hypervised: not setting exit handlers.");
}
// Wait for signal
let mut l = exit.0.lock();
let _ = exit.1.wait(&mut l);
l.clone()
}
fn wait_for_drop<T>(w: Weak<T>) { fn wait_for_drop<T>(w: Weak<T>) {
let sleep_duration = Duration::from_secs(1); let sleep_duration = Duration::from_secs(1);
let warn_timeout = Duration::from_secs(60); let warn_timeout = Duration::from_secs(60);