child processes spawn

This commit is contained in:
NikVolf 2016-04-14 21:45:53 +03:00
parent 7ac985dded
commit 6f4a98333e
2 changed files with 59 additions and 11 deletions

View File

@ -16,7 +16,7 @@
//! Parity interprocess hypervisor module //! Parity interprocess hypervisor module
// while not in binary // while not included in binary
#![allow(dead_code)] #![allow(dead_code)]
pub mod service; pub mod service;
@ -26,30 +26,74 @@ pub const HYPERVISOR_IPC_URL: &'static str = "ipc:///tmp/parity-internal-hyper-s
use nanoipc; use nanoipc;
use std::sync::{Arc,RwLock}; use std::sync::{Arc,RwLock};
use hypervisor::service::*; use hypervisor::service::*;
use std::process::{Command,Child};
use std::collections::HashMap;
type BinaryId = &'static str;
const BLOCKCHAIN_DB_BINARY: BinaryId = "blockchain";
pub struct Hypervisor { pub struct Hypervisor {
ipc_addr: String,
service: Arc<HypervisorService>, service: Arc<HypervisorService>,
ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>, ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>,
processes: RwLock<HashMap<BinaryId, Child>>,
} }
impl Hypervisor { impl Hypervisor {
/// initializes the Hypervisor service with the open ipc socket for incoming clients /// initializes the Hypervisor service with the open ipc socket for incoming clients
pub fn init() -> Arc<Hypervisor>{ pub fn new() -> Hypervisor {
Hypervisor::with_url(HYPERVISOR_IPC_URL) Hypervisor::with_url(HYPERVISOR_IPC_URL)
} }
fn with_url(addr: &str) -> Arc<Hypervisor>{ fn with_url(addr: &str) -> Hypervisor{
Hypervisor::with_url_and_service(addr, HypervisorService::new()) Hypervisor::with_url_and_service(addr, HypervisorService::new())
} }
fn with_url_and_service(addr: &str, service: Arc<HypervisorService>) -> Arc<Hypervisor> { fn with_url_and_service(addr: &str, service: Arc<HypervisorService>) -> Hypervisor {
let mut worker = nanoipc::Worker::new(&service); let worker = nanoipc::Worker::new(&service);
worker.add_reqrep(addr).expect("Hypervisor ipc worker can not start - critical!"); Hypervisor{
ipc_addr: addr.to_owned(),
Arc::new(Hypervisor{
service: service, service: service,
ipc_worker: RwLock::new(worker), ipc_worker: RwLock::new(worker),
}) processes: RwLock::new(HashMap::new()),
}
}
/// Since one binary can host multiple modules
/// we match binaries
fn match_module(module_id: &IpcModuleId) -> Option<BinaryId> {
match *module_id {
BLOCKCHAIN_MODULE_ID => Some(BLOCKCHAIN_DB_BINARY),
// none means the module is inside the main binary
_ => None
}
}
fn start(&self) {
let mut worker = self.ipc_worker.write().unwrap();
worker.add_reqrep(&self.ipc_addr).unwrap_or_else(|e| panic!("Hypervisor ipc worker can not start - critical! ({:?})", e));
for module_id in self.service.module_ids() {
self.start_module(module_id);
}
}
fn start_module(&self, module_id: IpcModuleId) {
Self::match_module(&module_id).map(|binary_id|
{
let mut processes = self.processes.write().unwrap();
{
let process = processes.get(binary_id);
if process.is_some() {
// already started for another module
return;
}
}
let child = Command::new(binary_id).spawn().unwrap_or_else(
|e| panic!("Hypervisor cannot start binary: {}", e));
processes.insert(binary_id, child);
});
} }
pub fn modules_ready(&self) -> bool { pub fn modules_ready(&self) -> bool {
@ -98,6 +142,7 @@ mod tests {
}); });
let hypervisor = Hypervisor::with_url_and_service(url, HypervisorService::with_modules(vec![test_module_id])); let hypervisor = Hypervisor::with_url_and_service(url, HypervisorService::with_modules(vec![test_module_id]));
hypervisor.start();
hypervisor_ready_local.store(true, Ordering::Relaxed); hypervisor_ready_local.store(true, Ordering::Relaxed);
hypervisor.wait_for_startup(); hypervisor.wait_for_startup();

View File

@ -21,8 +21,7 @@ use std::collections::HashMap;
pub type IpcModuleId = u64; pub type IpcModuleId = u64;
pub const _DATABASE_MODULE_ID: IpcModuleId = 1000; pub const BLOCKCHAIN_MODULE_ID: IpcModuleId = 2000;
pub const _BLOCKCHAIN_MODULE_ID: IpcModuleId = 2000;
pub struct HypervisorService { pub struct HypervisorService {
check_list: RwLock<HashMap<IpcModuleId, bool>>, check_list: RwLock<HashMap<IpcModuleId, bool>>,
@ -55,6 +54,10 @@ impl HypervisorService {
pub fn unchecked_count(&self) -> usize { pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count() self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count()
} }
pub fn module_ids(&self) -> Vec<IpcModuleId> {
self.check_list.read().unwrap().iter().map(|(module_id, _)| module_id).cloned().collect()
}
} }
impl ::ipc::IpcConfig for HypervisorService {} impl ::ipc::IpcConfig for HypervisorService {}