hypervisor refactoring

This commit is contained in:
NikVolf 2016-07-15 19:50:17 +02:00
parent 4a9d57d2ce
commit b1a67bf18f
2 changed files with 33 additions and 26 deletions

View File

@ -35,52 +35,51 @@ use std::collections::HashMap;
pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID}; pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID};
type BinaryId = &'static str; pub type BinaryId = &'static str;
const CLIENT_BINARY: BinaryId = "client";
const SYNC_BINARY: BinaryId = "sync";
pub struct Hypervisor { pub struct Hypervisor {
ipc_addr: String, ipc_addr: String,
service: Arc<HypervisorService>, service: Arc<HypervisorService>,
ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>, ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>,
processes: RwLock<HashMap<BinaryId, Child>>, processes: RwLock<HashMap<BinaryId, Child>>,
db_path: String, modules: HashMap<IpcModuleId, (BinaryId, Vec<String>)>,
} }
impl Hypervisor { impl Hypervisor {
/// initializes the Hypervisor service with the open ipc socket for incoming clients /// initializes the Hypervisor service with the open ipc socket for incoming clients
pub fn new(db_path: &str) -> Hypervisor { pub fn new() -> Hypervisor {
Hypervisor::with_url(db_path, HYPERVISOR_IPC_URL) Hypervisor::with_url(HYPERVISOR_IPC_URL)
} }
/// Starts on the specified address for ipc listener pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: Vec<String>) -> Hypervisor {
fn with_url(db_path: &str, addr: &str) -> Hypervisor{ self.modules.insert(module_id, (binary_id, args));
Hypervisor::with_url_and_service(db_path, addr, HypervisorService::new()) self.service.add_module(module_id);
self
}
pub fn local_module(self, module_id: IpcModuleId) -> Hypervisor {
self.service.add_module(module_id);
self
} }
/// Starts with the specified address for the ipc listener and /// Starts with the specified address for the ipc listener and
/// the specified list of modules in form of created service /// the specified list of modules in form of created service
fn with_url_and_service(db_path: &str, addr: &str, service: Arc<HypervisorService>) -> Hypervisor { pub fn with_url(addr: &str) -> Hypervisor {
let service = HypervisorService::new();
let worker = nanoipc::Worker::new(&service); let worker = nanoipc::Worker::new(&service);
Hypervisor{ Hypervisor{
ipc_addr: addr.to_owned(), ipc_addr: addr.to_owned(),
service: service, service: service,
ipc_worker: RwLock::new(worker), ipc_worker: RwLock::new(worker),
processes: RwLock::new(HashMap::new()), processes: RwLock::new(HashMap::new()),
db_path: db_path.to_owned(), modules: HashMap::new(),
} }
} }
/// Since one binary can host multiple modules /// Since one binary can host multiple modules
/// we match binaries /// we match binaries
fn match_module(module_id: &IpcModuleId) -> Option<BinaryId> { fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, Vec<String>)> {
match *module_id { self.modules.get(module_id)
CLIENT_MODULE_ID => Some(CLIENT_BINARY),
SYNC_MODULE_ID => Some(SYNC_BINARY),
// none means the module is inside the main binary
_ => None
}
} }
/// Creates IPC listener and starts all binaries /// Creates IPC listener and starts all binaries
@ -97,7 +96,7 @@ impl Hypervisor {
/// Does nothing when it is already started on module is inside the /// Does nothing when it is already started on module is inside the
/// main binary /// main binary
fn start_module(&self, module_id: IpcModuleId) { fn start_module(&self, module_id: IpcModuleId) {
Self::match_module(&module_id).map(|binary_id| { self.match_module(&module_id).map(|&(ref binary_id, ref binary_args)| {
let mut processes = self.processes.write().unwrap(); let mut processes = self.processes.write().unwrap();
{ {
if processes.get(binary_id).is_some() { if processes.get(binary_id).is_some() {
@ -110,7 +109,12 @@ impl Hypervisor {
executable_path.pop(); executable_path.pop();
executable_path.push(binary_id); executable_path.push(binary_id);
let child = Command::new(&executable_path.to_str().unwrap()).arg(&self.db_path).spawn().unwrap_or_else( let mut command = Command::new(&executable_path.to_str().unwrap());
for arg in binary_args { command.arg(arg); }
trace!(target: "hypervisor", "Spawn executable: {:?}", command);
let child = command.spawn().unwrap_or_else(
|e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e)); |e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e));
processes.insert(binary_id, child); processes.insert(binary_id, child);
}); });
@ -134,7 +138,7 @@ impl Hypervisor {
let mut childs = self.processes.write().unwrap(); let mut childs = self.processes.write().unwrap();
for (ref mut binary, ref mut child) in childs.iter_mut() { for (ref mut binary, ref mut child) in childs.iter_mut() {
trace!(target: "hypervisor", "HYPERVISOR: Stopping process module: {}", binary); trace!(target: "hypervisor", "Stopping process module: {}", binary);
child.kill().unwrap(); child.kill().unwrap();
} }
} }
@ -151,7 +155,6 @@ mod tests {
use super::*; use super::*;
use std::sync::atomic::{AtomicBool,Ordering}; use std::sync::atomic::{AtomicBool,Ordering};
use std::sync::Arc; use std::sync::Arc;
use super::service::*;
use nanoipc; use nanoipc;
#[test] #[test]
@ -159,7 +162,7 @@ mod tests {
let url = "ipc:///tmp/test-parity-hypervisor-10.ipc"; let url = "ipc:///tmp/test-parity-hypervisor-10.ipc";
let test_module_id = 8080u64; let test_module_id = 8080u64;
let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id])); let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
assert_eq!(false, hypervisor.modules_ready()); assert_eq!(false, hypervisor.modules_ready());
} }
@ -179,7 +182,7 @@ mod tests {
client.module_ready(test_module_id); client.module_ready(test_module_id);
}); });
let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id])); let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
hypervisor.start(); hypervisor.start();
hypervisor_ready_local.store(true, Ordering::Relaxed); hypervisor_ready_local.store(true, Ordering::Relaxed);
hypervisor.wait_for_startup(); hypervisor.wait_for_startup();

View File

@ -46,7 +46,7 @@ impl HypervisorService {
impl HypervisorService { impl HypervisorService {
/// New service with the default list of modules /// New service with the default list of modules
pub fn new() -> Arc<HypervisorService> { pub fn new() -> Arc<HypervisorService> {
HypervisorService::with_modules(vec![CLIENT_MODULE_ID]) HypervisorService::with_modules(vec![])
} }
/// New service with list of modules that will report for being ready /// New service with list of modules that will report for being ready
@ -60,6 +60,10 @@ impl HypervisorService {
}) })
} }
pub fn add_module(&self, module_id: IpcModuleId) {
self.check_list.write().unwrap().insert(module_id, false);
}
/// Number of modules still being waited for check-in /// Number of modules still being waited for check-in
pub fn unchecked_count(&self) -> usize { pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count() self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count()