hypervisor-service chain
This commit is contained in:
parent
4931a300f2
commit
4d527e152c
@ -54,7 +54,7 @@ impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
|
|||||||
/// Spawns client <`S`> over specified address
|
/// Spawns client <`S`> over specified address
|
||||||
/// creates socket and connects endpoint to it
|
/// creates socket and connects endpoint to it
|
||||||
/// for duplex (paired) connections with the service
|
/// for duplex (paired) connections with the service
|
||||||
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
|
pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
|
||||||
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
||||||
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
||||||
SocketError::DuplexLink
|
SocketError::DuplexLink
|
||||||
@ -71,11 +71,33 @@ pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawns client <`S`> over specified address
|
||||||
|
/// creates socket and connects endpoint to it
|
||||||
|
/// for request-reply connections to the service
|
||||||
|
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
|
||||||
|
let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| {
|
||||||
|
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
||||||
|
SocketError::RequestLink
|
||||||
|
}));
|
||||||
|
|
||||||
|
let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
|
||||||
|
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
|
||||||
|
SocketError::RequestLink
|
||||||
|
}));
|
||||||
|
|
||||||
|
Ok(GuardedSocket {
|
||||||
|
client: Arc::new(S::init(socket)),
|
||||||
|
_endpoint: endpoint,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Error occured while establising socket or endpoint
|
/// Error occured while establising socket or endpoint
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SocketError {
|
pub enum SocketError {
|
||||||
/// Error establising duplex (paired) socket and/or endpoint
|
/// Error establising duplex (paired) socket and/or endpoint
|
||||||
DuplexLink
|
DuplexLink,
|
||||||
|
/// Error establising duplex (paired) socket and/or endpoint
|
||||||
|
RequestLink,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Worker<S> where S: IpcInterface<S> {
|
impl<S> Worker<S> where S: IpcInterface<S> {
|
||||||
|
@ -39,7 +39,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn can_create_client() {
|
fn can_create_client() {
|
||||||
let client = nanoipc::init_client::<ServiceClient<_>>("ipc:///tmp/parity-nano-test10.ipc");
|
let client = nanoipc::init_duplex_client::<ServiceClient<_>>("ipc:///tmp/parity-nano-test10.ipc");
|
||||||
assert!(client.is_ok());
|
assert!(client.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,7 +60,7 @@ mod tests {
|
|||||||
});
|
});
|
||||||
|
|
||||||
while !worker_is_ready.load(Ordering::Relaxed) { }
|
while !worker_is_ready.load(Ordering::Relaxed) { }
|
||||||
let client = nanoipc::init_client::<ServiceClient<_>>(url).unwrap();
|
let client = nanoipc::init_duplex_client::<ServiceClient<_>>(url).unwrap();
|
||||||
|
|
||||||
let hs = client.handshake();
|
let hs = client.handshake();
|
||||||
|
|
||||||
|
@ -37,7 +37,10 @@ impl Hypervisor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn with_url(addr: &str) -> Arc<Hypervisor>{
|
fn with_url(addr: &str) -> Arc<Hypervisor>{
|
||||||
let service = HypervisorService::new();
|
Hypervisor::with_url_and_service(addr, HypervisorService::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_url_and_service(addr: &str, service: Arc<HypervisorService>) -> Arc<Hypervisor> {
|
||||||
let mut worker = nanoipc::Worker::new(&service);
|
let mut worker = nanoipc::Worker::new(&service);
|
||||||
worker.add_reqrep(addr);
|
worker.add_reqrep(addr);
|
||||||
|
|
||||||
@ -62,17 +65,38 @@ impl Hypervisor {
|
|||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::sync::atomic::{AtomicBool,Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use super::service::*;
|
||||||
|
use nanoipc;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn can_init() {
|
fn can_init() {
|
||||||
let hypervisor = Hypervisor::with_url("ipc:///tmp/test-parity-hypervisor-10");
|
let url = "ipc:///tmp/test-parity-hypervisor-10";
|
||||||
|
|
||||||
|
let hypervisor = Hypervisor::with_url(url);
|
||||||
assert_eq!(false, hypervisor.modules_ready());
|
assert_eq!(false, hypervisor.modules_ready());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn can_wait_for_startup() {
|
fn can_wait_for_startup() {
|
||||||
let hypervisor = Hypervisor::with_url("ipc:///tmp/test-parity-hypervisor-10");
|
let url = "ipc:///tmp/test-parity-hypervisor-20";
|
||||||
|
let test_module_id = 8080u64;
|
||||||
|
|
||||||
|
let hypervisor_ready = Arc::new(AtomicBool::new(false));
|
||||||
|
let hypervisor_ready_local = hypervisor_ready.clone();
|
||||||
|
|
||||||
|
::std::thread::spawn(move || {
|
||||||
|
while !hypervisor_ready.load(Ordering::Relaxed) { }
|
||||||
|
|
||||||
|
let client = nanoipc::init_client::<HypervisorServiceClient<_>>(url).unwrap();
|
||||||
|
client.module_ready(test_module_id);
|
||||||
|
});
|
||||||
|
|
||||||
|
let hypervisor = Hypervisor::with_url_and_service(url, HypervisorService::with_modules(vec![test_module_id]));
|
||||||
|
hypervisor_ready_local.store(true, Ordering::Relaxed);
|
||||||
hypervisor.wait_for_startup();
|
hypervisor.wait_for_startup();
|
||||||
assert_eq!(false, hypervisor.modules_ready());
|
|
||||||
|
assert_eq!(true, hypervisor.modules_ready());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,10 +19,10 @@ use std::ops::*;
|
|||||||
use ipc::IpcConfig;
|
use ipc::IpcConfig;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
type IpcModuleId = u64;
|
pub type IpcModuleId = u64;
|
||||||
|
|
||||||
const DATABASE_MODULE_ID: IpcModuleId = 1000;
|
pub const DATABASE_MODULE_ID: IpcModuleId = 1000;
|
||||||
const BLOCKCHAIN_MODULE_ID: IpcModuleId = 2000;
|
pub const BLOCKCHAIN_MODULE_ID: IpcModuleId = 2000;
|
||||||
|
|
||||||
pub struct HypervisorService {
|
pub struct HypervisorService {
|
||||||
check_list: RwLock<HashMap<IpcModuleId, bool>>,
|
check_list: RwLock<HashMap<IpcModuleId, bool>>,
|
||||||
@ -39,10 +39,10 @@ impl HypervisorService {
|
|||||||
|
|
||||||
impl HypervisorService {
|
impl HypervisorService {
|
||||||
pub fn new() -> Arc<HypervisorService> {
|
pub fn new() -> Arc<HypervisorService> {
|
||||||
HypervisorService::with_modules(vec![DATABASE_MODULE_ID]);
|
HypervisorService::with_modules(vec![DATABASE_MODULE_ID])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_modules(module_ids: Vec<IpcModuleID>) -> Arc<HypervisorService> {
|
pub fn with_modules(module_ids: Vec<IpcModuleId>) -> Arc<HypervisorService> {
|
||||||
let mut check_list = HashMap::new();
|
let mut check_list = HashMap::new();
|
||||||
for module_id in module_ids {
|
for module_id in module_ids {
|
||||||
check_list.insert(module_id, false);
|
check_list.insert(module_id, false);
|
||||||
|
Loading…
Reference in New Issue
Block a user