IPC (feature-gated) (#1654)
* moving ipc deriving to trait * refactoring of the client * all compiled * proved all working * warnings purged * allow hypervisor to specify initialization payload in two ways * using binary initialisation payload for sync * some docs * logger to separate crate * log settings for sync bin * forwarding logging arguments to the sync
This commit is contained in:
committed by
Arkadiy Paronyan
parent
7ae9e61d6c
commit
8ab56ea3d1
@@ -273,7 +273,12 @@ fn implement_dispatch_arm(
|
||||
{
|
||||
let index_ident = builder.id(format!("{}", index + (RESERVED_MESSAGE_IDS as u32)).as_str());
|
||||
let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer);
|
||||
quote_arm!(cx, $index_ident => { $invoke_expr } )
|
||||
let dispatching_trace = "Dispatching: ".to_owned() + &dispatch.function_name;
|
||||
let dispatching_trace_literal = builder.expr().lit().str::<&str>(&dispatching_trace);
|
||||
quote_arm!(cx, $index_ident => {
|
||||
trace!(target: "ipc", $dispatching_trace_literal);
|
||||
$invoke_expr
|
||||
})
|
||||
}
|
||||
|
||||
fn implement_dispatch_arms(
|
||||
@@ -420,17 +425,22 @@ fn implement_client_method_body(
|
||||
request_serialization_statements
|
||||
};
|
||||
|
||||
let invocation_trace = "Invoking: ".to_owned() + &dispatch.function_name;
|
||||
let invocation_trace_literal = builder.expr().lit().str::<&str>(&invocation_trace);
|
||||
|
||||
if let Some(ref return_ty) = dispatch.return_type_ty {
|
||||
let return_expr = quote_expr!(cx,
|
||||
::ipc::binary::deserialize_from::<$return_ty, _>(&mut *socket).unwrap()
|
||||
);
|
||||
quote_expr!(cx, {
|
||||
trace!(target: "ipc", $invocation_trace_literal);
|
||||
$request
|
||||
$return_expr
|
||||
})
|
||||
}
|
||||
else {
|
||||
quote_expr!(cx, {
|
||||
trace!(target: "ipc", $invocation_trace_literal);
|
||||
$request
|
||||
})
|
||||
}
|
||||
|
||||
@@ -261,6 +261,10 @@ fn binary_expr_struct(
|
||||
|
||||
let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty));
|
||||
let range_ident = builder.id(format!("r{}", index));
|
||||
|
||||
let error_message = "Error serializing member: ".to_owned() + &::syntax::print::pprust::expr_to_string(&member_expr);
|
||||
let error_message_literal = builder.expr().lit().str::<&str>(&error_message);
|
||||
|
||||
match raw_ident.as_ref() {
|
||||
"u8" => {
|
||||
write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap());
|
||||
@@ -280,7 +284,13 @@ fn binary_expr_struct(
|
||||
}).unwrap());
|
||||
write_stmts.push(quote_stmt!(cx, let $range_ident = offset..next_line; ).unwrap());
|
||||
post_write_stmts.push(quote_stmt!(cx,
|
||||
if let Err(e) = $member_expr .to_bytes(&mut buffer[$range_ident], length_stack) { return Err(e) };).unwrap());
|
||||
if $range_ident.end - $range_ident.start > 0 {
|
||||
if let Err(e) = $member_expr .to_bytes(&mut buffer[$range_ident], length_stack) {
|
||||
warn!(target: "ipc", $error_message_literal);
|
||||
return Err(e)
|
||||
};
|
||||
}
|
||||
).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,35 @@ pub struct Hypervisor {
|
||||
service: Arc<HypervisorService>,
|
||||
ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>,
|
||||
processes: RwLock<HashMap<BinaryId, Child>>,
|
||||
modules: HashMap<IpcModuleId, (BinaryId, Vec<String>)>,
|
||||
modules: HashMap<IpcModuleId, (BinaryId, BootArgs)>,
|
||||
}
|
||||
|
||||
/// Boot arguments for binary
|
||||
pub struct BootArgs {
|
||||
cli: Option<Vec<String>>,
|
||||
stdin: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl BootArgs {
|
||||
/// New empty boot arguments
|
||||
pub fn new() -> BootArgs {
|
||||
BootArgs {
|
||||
cli: None,
|
||||
stdin: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set command-line arguments for boot
|
||||
pub fn cli(mut self, cli: Vec<String>) -> BootArgs {
|
||||
self.cli = Some(cli);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set std-in stream for boot
|
||||
pub fn stdin(mut self, stdin: Vec<u8>) -> BootArgs {
|
||||
self.stdin = Some(stdin);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Hypervisor {
|
||||
@@ -51,7 +79,7 @@ impl Hypervisor {
|
||||
Hypervisor::with_url(HYPERVISOR_IPC_URL)
|
||||
}
|
||||
|
||||
pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: Vec<String>) -> Hypervisor {
|
||||
pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: BootArgs) -> Hypervisor {
|
||||
self.modules.insert(module_id, (binary_id, args));
|
||||
self.service.add_module(module_id);
|
||||
self
|
||||
@@ -78,7 +106,7 @@ impl Hypervisor {
|
||||
|
||||
/// Since one binary can host multiple modules
|
||||
/// we match binaries
|
||||
fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, Vec<String>)> {
|
||||
fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, BootArgs)> {
|
||||
self.modules.get(module_id)
|
||||
}
|
||||
|
||||
@@ -96,6 +124,8 @@ impl Hypervisor {
|
||||
/// Does nothing when it is already started on module is inside the
|
||||
/// main binary
|
||||
fn start_module(&self, module_id: IpcModuleId) {
|
||||
use std::io::Write;
|
||||
|
||||
self.match_module(&module_id).map(|&(ref binary_id, ref binary_args)| {
|
||||
let mut processes = self.processes.write().unwrap();
|
||||
{
|
||||
@@ -109,13 +139,30 @@ impl Hypervisor {
|
||||
executable_path.pop();
|
||||
executable_path.push(binary_id);
|
||||
|
||||
let mut command = Command::new(&executable_path.to_str().unwrap());
|
||||
for arg in binary_args { command.arg(arg); }
|
||||
let executable_path = executable_path.to_str().unwrap();
|
||||
let mut command = Command::new(&executable_path);
|
||||
command.stderr(std::process::Stdio::inherit());
|
||||
|
||||
if let Some(ref cli_args) = binary_args.cli {
|
||||
for arg in cli_args { command.arg(arg); }
|
||||
}
|
||||
|
||||
command.stdin(std::process::Stdio::piped());
|
||||
|
||||
trace!(target: "hypervisor", "Spawn executable: {:?}", command);
|
||||
|
||||
let child = command.spawn().unwrap_or_else(
|
||||
let mut child = command.spawn().unwrap_or_else(
|
||||
|e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e));
|
||||
|
||||
if let Some(ref std_in) = binary_args.stdin {
|
||||
trace!(target: "hypervisor", "Pushing std-in payload...");
|
||||
child.stdin.as_mut()
|
||||
.expect("std-in should be piped above")
|
||||
.write(std_in)
|
||||
.unwrap_or_else(|e| panic!(format!("Error trying to pipe stdin for {}: {:?}", &executable_path, e)));
|
||||
drop(child.stdin.take());
|
||||
}
|
||||
|
||||
processes.insert(binary_id, child);
|
||||
});
|
||||
}
|
||||
@@ -133,6 +180,7 @@ impl Hypervisor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown the ipc and all managed child processes
|
||||
pub fn shutdown(&self, wait_time: Option<std::time::Duration>) {
|
||||
if wait_time.is_some() { std::thread::sleep(wait_time.unwrap()) }
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ impl HypervisorService {
|
||||
})
|
||||
}
|
||||
|
||||
/// Add the module to the check-list
|
||||
pub fn add_module(&self, module_id: IpcModuleId) {
|
||||
self.check_list.write().unwrap().insert(module_id, false);
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ extern crate jsonrpc_core;
|
||||
use jsonrpc_core::IoHandler;
|
||||
|
||||
pub use ipc::{WithSocket, IpcInterface, IpcConfig};
|
||||
pub use nanomsg::Socket as NanoSocket;
|
||||
|
||||
use std::sync::*;
|
||||
use std::sync::atomic::*;
|
||||
@@ -54,9 +55,9 @@ impl<S> GuardedSocket<S> where S: WithSocket<Socket> {
|
||||
}
|
||||
|
||||
impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
|
||||
type Target = S;
|
||||
type Target = Arc<S>;
|
||||
|
||||
fn deref(&self) -> &S {
|
||||
fn deref(&self) -> &Arc<S> {
|
||||
&self.client
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ impl<T> BinaryConvertable for Option<T> where T: BinaryConvertable {
|
||||
}
|
||||
|
||||
fn from_bytes(buffer: &[u8], length_stack: &mut VecDeque<usize>) -> Result<Self, BinaryConvertError> {
|
||||
if buffer.len() == 0 { return Self::from_empty_bytes(); }
|
||||
Ok(Some(try!(T::from_bytes(buffer, length_stack))))
|
||||
}
|
||||
|
||||
@@ -779,6 +780,42 @@ fn serialize_into_deserialize_from() {
|
||||
assert_eq!(v, de_v);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_vec_str() {
|
||||
// empty
|
||||
let source = Vec::<String>::new();
|
||||
let serialized = serialize(&source).unwrap();
|
||||
let deserialized = deserialize::<Vec<String>>(&serialized).unwrap();
|
||||
|
||||
assert_eq!(source, deserialized);
|
||||
|
||||
// with few values
|
||||
let mut source = Vec::<String>::new();
|
||||
source.push("val1".to_owned());
|
||||
source.push("val2".to_owned());
|
||||
let serialized = serialize(&source).unwrap();
|
||||
let deserialized = deserialize::<Vec<String>>(&serialized).unwrap();
|
||||
|
||||
assert_eq!(source, deserialized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_opt_str() {
|
||||
// none
|
||||
let source: Option<String> = None;
|
||||
let serialized = serialize(&source).unwrap();
|
||||
let deserialized = deserialize::<Option<String>>(&serialized).unwrap();
|
||||
|
||||
assert_eq!(source, deserialized);
|
||||
|
||||
// value
|
||||
let source: Option<String> = Some("i have value".to_owned());
|
||||
let serialized = serialize(&source).unwrap();
|
||||
let deserialized = deserialize::<Option<String>>(&serialized).unwrap();
|
||||
|
||||
assert_eq!(source, deserialized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_opt_vec() {
|
||||
use std::io::Cursor;
|
||||
|
||||
Reference in New Issue
Block a user