Control service for IPC (#2013)

* hypervisor extension

* sorted with shutdown-wait

* hypervisor lifecycle alter
This commit is contained in:
Nikolay Volf 2016-08-30 16:05:02 +04:00 committed by Arkadiy Paronyan
parent 6f321d9849
commit efc846bb3e
6 changed files with 123 additions and 33 deletions

View File

@ -33,7 +33,7 @@ use service::{HypervisorService, IpcModuleId};
use std::process::{Command,Child}; use std::process::{Command,Child};
use std::collections::HashMap; use std::collections::HashMap;
pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID}; pub use service::{HypervisorServiceClient, ControlService, CLIENT_MODULE_ID, SYNC_MODULE_ID};
pub type BinaryId = &'static str; pub type BinaryId = &'static str;
@ -174,6 +174,10 @@ impl Hypervisor {
self.service.unchecked_count() == 0 self.service.unchecked_count() == 0
} }
pub fn modules_shutdown(&self) -> bool {
self.service.running_count() == 0
}
/// Waits for every required module to check in /// Waits for every required module to check in
pub fn wait_for_startup(&self) { pub fn wait_for_startup(&self) {
let mut worker = self.ipc_worker.write().unwrap(); let mut worker = self.ipc_worker.write().unwrap();
@ -182,21 +186,30 @@ impl Hypervisor {
} }
} }
/// Shutdown the ipc and all managed child processes /// Waits for every required module to check in
pub fn shutdown(&self, wait_time: Option<std::time::Duration>) { pub fn wait_for_shutdown(&self) {
if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) } let mut worker = self.ipc_worker.write().unwrap();
while !self.modules_shutdown() {
let mut childs = self.processes.write().unwrap(); worker.poll()
for (ref mut module, ref mut child) in childs.iter_mut() {
trace!(target: "hypervisor", "Stopping process module: {}", module);
child.kill().unwrap();
} }
} }
/// Shutdown the ipc and all managed child processes
pub fn shutdown(&self) {
let mut childs = self.processes.write().unwrap();
for (ref mut module, _) in childs.iter_mut() {
trace!(target: "hypervisor", "Stopping process module: {}", module);
self.service.send_shutdown(**module);
}
trace!(target: "hypervisor", "Waiting for shutdown...");
self.wait_for_shutdown();
trace!(target: "hypervisor", "All modules reported shutdown");
}
} }
impl Drop for Hypervisor { impl Drop for Hypervisor {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown(Some(std::time::Duration::new(1, 0))); self.shutdown();
} }
} }

View File

@ -17,6 +17,7 @@
use std::sync::{RwLock,Arc}; use std::sync::{RwLock,Arc};
use ipc::IpcConfig; use ipc::IpcConfig;
use std::collections::HashMap; use std::collections::HashMap;
use nanoipc;
pub type IpcModuleId = u64; pub type IpcModuleId = u64;
@ -28,15 +29,43 @@ pub const SYNC_MODULE_ID: IpcModuleId = 2100;
/// IPC service that handles module management /// IPC service that handles module management
pub struct HypervisorService { pub struct HypervisorService {
check_list: RwLock<HashMap<IpcModuleId, bool>>, modules: RwLock<HashMap<IpcModuleId, ModuleState>>,
}
#[derive(Default)]
pub struct ModuleState {
started: bool,
control_url: String,
shutdown: bool,
}
#[derive(Ipc)]
pub trait ControlService {
fn shutdown(&self);
} }
#[derive(Ipc)] #[derive(Ipc)]
impl HypervisorService { impl HypervisorService {
fn module_ready(&self, module_id: u64) -> bool { // return type for making method synchronous
let mut check_list = self.check_list.write().unwrap(); fn module_ready(&self, module_id: u64, control_url: String) -> bool {
check_list.get_mut(&module_id).map(|mut status| *status = true); let mut modules = self.modules.write().unwrap();
check_list.iter().any(|(_, status)| !status) modules.get_mut(&module_id).map(|mut module| {
module.started = true;
module.control_url = control_url;
});
trace!(target: "hypervisor", "Module ready: {}", module_id);
true
}
// return type for making method synchronous
fn module_shutdown(&self, module_id: u64) -> bool {
let mut modules = self.modules.write().unwrap();
modules.get_mut(&module_id).map(|mut module| {
module.shutdown = true;
});
trace!(target: "hypervisor", "Module shutdown: {}", module_id);
true
} }
} }
@ -48,29 +77,46 @@ impl HypervisorService {
/// New service with list of modules that will report for being ready /// New service with list of modules that will report for being ready
pub fn with_modules(module_ids: Vec<IpcModuleId>) -> Arc<HypervisorService> { pub fn with_modules(module_ids: Vec<IpcModuleId>) -> Arc<HypervisorService> {
let mut check_list = HashMap::new(); let mut modules = HashMap::new();
for module_id in module_ids { for module_id in module_ids {
check_list.insert(module_id, false); modules.insert(module_id, ModuleState::default());
} }
Arc::new(HypervisorService { Arc::new(HypervisorService {
check_list: RwLock::new(check_list), modules: RwLock::new(modules),
}) })
} }
/// Add the module to the check-list /// Add the module to the check-list
pub fn add_module(&self, module_id: IpcModuleId) { pub fn add_module(&self, module_id: IpcModuleId) {
self.check_list.write().unwrap().insert(module_id, false); self.modules.write().unwrap().insert(module_id, ModuleState::default());
} }
/// Number of modules still being waited for check-in /// Number of modules still being waited for check-in
pub fn unchecked_count(&self) -> usize { pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count() self.modules.read().unwrap().iter().filter(|&(_, module)| !module.started).count()
} }
/// List of all modules within this service /// List of all modules within this service
pub fn module_ids(&self) -> Vec<IpcModuleId> { pub fn module_ids(&self) -> Vec<IpcModuleId> {
self.check_list.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect() self.modules.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect()
}
/// Number of modules started and running
pub fn running_count(&self) -> usize {
self.modules.read().unwrap().iter().filter(|&(_, module)| module.started && !module.shutdown).count()
}
pub fn send_shutdown(&self, module_id: IpcModuleId) {
let modules = self.modules.read().unwrap();
modules.get(&module_id).map(|module| {
trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url);
let client = nanoipc::init_client::<ControlServiceClient<_>>(&module.control_url).unwrap();
client.shutdown();
trace!(target: "hypervisor", "Sent shutdown to {}", module_id);
});
} }
} }
impl ::ipc::IpcConfig for HypervisorService {} impl ::ipc::IpcConfig for HypervisorService {}
impl ::ipc::IpcConfig for ControlService {}

View File

@ -62,10 +62,10 @@ pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
.map_err(|binary_error| BootError::DecodeArgs(binary_error)) .map_err(|binary_error| BootError::DecodeArgs(binary_error))
} }
pub fn register(hv_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap(); let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(hv_url).unwrap();
hypervisor_client.handshake().unwrap(); hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id); hypervisor_client.module_ready(module_id, control_url.to_owned());
hypervisor_client hypervisor_client
} }

View File

@ -32,6 +32,7 @@ pub mod service_urls {
pub const SYNC: &'static str = "parity-sync.ipc"; pub const SYNC: &'static str = "parity-sync.ipc";
pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc"; pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc"; pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc";
pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc";
#[cfg(feature="stratum")] #[cfg(feature="stratum")]
pub const STRATUM: &'static str = "parity-stratum.ipc"; pub const STRATUM: &'static str = "parity-stratum.ipc";
#[cfg(feature="stratum")] #[cfg(feature="stratum")]

View File

@ -260,6 +260,10 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
// Handle exit // Handle exit
wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server); wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server);
// hypervisor should be shutdown first while everything still works and can be
// terminated gracefully
drop(hypervisor);
Ok(()) Ok(())
} }

View File

@ -16,14 +16,26 @@
//! Parity sync service //! Parity sync service
use std;
use std::sync::Arc; use std::sync::Arc;
use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL}; use std::sync::atomic::AtomicBool;
use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService};
use ethcore::client::{RemoteClient, ChainNotify}; use ethcore::client::{RemoteClient, ChainNotify};
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration}; use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use std::thread;
use modules::service_urls; use modules::service_urls;
use boot; use boot;
use nanoipc;
#[derive(Default)]
struct SyncControlService {
pub stop: Arc<AtomicBool>,
}
impl ControlService for SyncControlService {
fn shutdown(&self) {
trace!(target: "hypervisor", "Received shutdown from control service");
self.stop.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
}
pub fn main() { pub fn main() {
boot::setup_cli_logger("sync"); boot::setup_cli_logger("sync");
@ -33,31 +45,45 @@ pub fn main() {
let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT)); let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT));
let stop = boot::main_thread();
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap(); let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap();
let _ = boot::register( let _ = boot::main_thread();
let service_stop = Arc::new(AtomicBool::new(false));
let hypervisor = boot::register(
&service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL), &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL),
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL),
SYNC_MODULE_ID SYNC_MODULE_ID
); );
boot::host_service( boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC), &service_urls::with_base(&service_config.io_path, service_urls::SYNC),
stop.clone(), service_stop.clone(),
sync.clone() as Arc<SyncProvider> sync.clone() as Arc<SyncProvider>
); );
boot::host_service( boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::NETWORK_MANAGER), &service_urls::with_base(&service_config.io_path, service_urls::NETWORK_MANAGER),
stop.clone(), service_stop.clone(),
sync.clone() as Arc<ManageNetwork> sync.clone() as Arc<ManageNetwork>
); );
boot::host_service( boot::host_service(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_NOTIFY), &service_urls::with_base(&service_config.io_path, service_urls::SYNC_NOTIFY),
stop.clone(), service_stop.clone(),
sync.clone() as Arc<ChainNotify> sync.clone() as Arc<ChainNotify>
); );
while !stop.load(::std::sync::atomic::Ordering::Relaxed) { let control_service = Arc::new(SyncControlService::default());
thread::park_timeout(std::time::Duration::from_millis(1000)); let as_control = control_service.clone() as Arc<ControlService>;
let mut worker = nanoipc::Worker::<ControlService>::new(&as_control);
worker.add_reqrep(
&service_urls::with_base(&service_config.io_path, service_urls::SYNC_CONTROL)
).unwrap();
while !control_service.stop.load(::std::sync::atomic::Ordering::Relaxed) {
worker.poll();
} }
service_stop.store(true, ::std::sync::atomic::Ordering::Relaxed);
hypervisor.module_shutdown(SYNC_MODULE_ID);
trace!(target: "hypervisor", "Sync process terminated gracefully");
} }