Merge pull request #886 from ethcore/ipc-nano
ipc rpc with nano transport (simple duplex)
This commit is contained in:
commit
460d5bffd7
@ -145,14 +145,20 @@ struct Dispatch {
|
|||||||
return_type_ty: Option<P<Ty>>,
|
return_type_ty: Option<P<Ty>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn implement_dispatch_arm_invoke(
|
// This is the expanded version of this:
|
||||||
|
//
|
||||||
|
// let invoke_serialize_stmt = quote_stmt!(cx, {
|
||||||
|
// ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap()
|
||||||
|
// });
|
||||||
|
//
|
||||||
|
// But the above does not allow comma-separated expressions for arbitrary number
|
||||||
|
// of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n
|
||||||
|
fn implement_dispatch_arm_invoke_stmt(
|
||||||
cx: &ExtCtxt,
|
cx: &ExtCtxt,
|
||||||
builder: &aster::AstBuilder,
|
builder: &aster::AstBuilder,
|
||||||
dispatch: &Dispatch,
|
dispatch: &Dispatch,
|
||||||
) -> P<ast::Expr>
|
) -> ast::Stmt
|
||||||
{
|
{
|
||||||
let deserialize_expr = quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting"));
|
|
||||||
let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str());
|
|
||||||
let function_name = builder.id(dispatch.function_name.as_str());
|
let function_name = builder.id(dispatch.function_name.as_str());
|
||||||
let output_type_id = builder.id(dispatch.return_type_name.clone().unwrap().as_str());
|
let output_type_id = builder.id(dispatch.return_type_name.clone().unwrap().as_str());
|
||||||
|
|
||||||
@ -161,63 +167,71 @@ fn implement_dispatch_arm_invoke(
|
|||||||
quote_expr!(cx, input. $arg_ident)
|
quote_expr!(cx, input. $arg_ident)
|
||||||
}).collect::<Vec<P<ast::Expr>>>();
|
}).collect::<Vec<P<ast::Expr>>>();
|
||||||
|
|
||||||
// This is the expanded version of this:
|
let ext_cx = &*cx;
|
||||||
//
|
::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts(
|
||||||
// let invoke_serialize_stmt = quote_stmt!(cx, {
|
ext_cx.parse_sess(),
|
||||||
// ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap()
|
ext_cx.cfg(),
|
||||||
// });
|
{
|
||||||
//
|
let _sp = ext_cx.call_site();
|
||||||
// But the above does not allow comma-separated expressions for arbitrary number
|
let mut tt = ::std::vec::Vec::new();
|
||||||
// of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
|
||||||
let invoke_serialize_stmt = {
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
||||||
let ext_cx = &*cx;
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
|
||||||
::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts(
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
||||||
ext_cx.parse_sess(),
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName)));
|
||||||
ext_cx.cfg(),
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
||||||
{
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain)));
|
||||||
let _sp = ext_cx.call_site();
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
|
||||||
let mut tt = ::std::vec::Vec::new();
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
|
tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter());
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain)));
|
tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter());
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And)));
|
|
||||||
tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter());
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain)));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain)));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
|
|
||||||
tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter());
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
|
|
||||||
|
|
||||||
for arg_expr in input_args_exprs {
|
for arg_expr in input_args_exprs {
|
||||||
tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter());
|
tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter());
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
|
|
||||||
}
|
|
||||||
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
}
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
|
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain)));
|
||||||
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
|
||||||
tt
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
|
||||||
}))
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain)));
|
||||||
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
|
||||||
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
|
||||||
|
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
|
||||||
|
tt
|
||||||
|
})).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn implement_dispatch_arm_invoke(
|
||||||
|
cx: &ExtCtxt,
|
||||||
|
builder: &aster::AstBuilder,
|
||||||
|
dispatch: &Dispatch,
|
||||||
|
buffer: bool,
|
||||||
|
) -> P<ast::Expr>
|
||||||
|
{
|
||||||
|
let deserialize_expr = if buffer {
|
||||||
|
quote_expr!(cx, ::bincode::serde::deserialize(buf).expect("ipc deserialization error, aborting"))
|
||||||
|
} else {
|
||||||
|
quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting"))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str());
|
||||||
|
|
||||||
|
let invoke_serialize_stmt = implement_dispatch_arm_invoke_stmt(cx, builder, dispatch);
|
||||||
quote_expr!(cx, {
|
quote_expr!(cx, {
|
||||||
let input: $input_type_id = $deserialize_expr;
|
let input: $input_type_id = $deserialize_expr;
|
||||||
$invoke_serialize_stmt
|
$invoke_serialize_stmt
|
||||||
@ -225,14 +239,31 @@ fn implement_dispatch_arm_invoke(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// generates dispatch match for method id
|
/// generates dispatch match for method id
|
||||||
fn implement_dispatch_arm(cx: &ExtCtxt, builder: &aster::AstBuilder, index: u32, dispatch: &Dispatch)
|
fn implement_dispatch_arm(
|
||||||
-> ast::Arm
|
cx: &ExtCtxt,
|
||||||
|
builder: &aster::AstBuilder,
|
||||||
|
index: u32,
|
||||||
|
dispatch: &Dispatch,
|
||||||
|
buffer: bool,
|
||||||
|
) -> ast::Arm
|
||||||
{
|
{
|
||||||
let index_ident = builder.id(format!("{}", index).as_str());
|
let index_ident = builder.id(format!("{}", index).as_str());
|
||||||
let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch);
|
let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer);
|
||||||
quote_arm!(cx, $index_ident => { $invoke_expr } )
|
quote_arm!(cx, $index_ident => { $invoke_expr } )
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn implement_dispatch_arms(
|
||||||
|
cx: &ExtCtxt,
|
||||||
|
builder: &aster::AstBuilder,
|
||||||
|
dispatches: &[Dispatch],
|
||||||
|
buffer: bool,
|
||||||
|
) -> Vec<ast::Arm>
|
||||||
|
{
|
||||||
|
let mut index = -1;
|
||||||
|
dispatches.iter()
|
||||||
|
.map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch, buffer) }).collect()
|
||||||
|
}
|
||||||
|
|
||||||
/// generates client type for specified server type
|
/// generates client type for specified server type
|
||||||
/// for say `Service` it generates `ServiceClient`
|
/// for say `Service` it generates `ServiceClient`
|
||||||
fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, item: &Item, push: &mut FnMut(Annotatable)) {
|
fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, item: &Item, push: &mut FnMut(Annotatable)) {
|
||||||
@ -511,9 +542,9 @@ fn implement_interface(
|
|||||||
dispatch_table.push(push_invoke_signature_aster(builder, &impl_item, signature, push));
|
dispatch_table.push(push_invoke_signature_aster(builder, &impl_item, signature, push));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut index = -1;
|
|
||||||
let dispatch_arms: Vec<_> = dispatch_table.iter()
|
let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false);
|
||||||
.map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch) }).collect();
|
let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true);
|
||||||
|
|
||||||
Ok((quote_item!(cx,
|
Ok((quote_item!(cx,
|
||||||
impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause {
|
impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause {
|
||||||
@ -531,6 +562,14 @@ fn implement_interface(
|
|||||||
_ => vec![]
|
_ => vec![]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>
|
||||||
|
{
|
||||||
|
match method_num {
|
||||||
|
$dispatch_arms_buffered
|
||||||
|
_ => vec![]
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
).unwrap(), dispatch_table))
|
).unwrap(), dispatch_table))
|
||||||
}
|
}
|
||||||
|
12
ipc/nano/Cargo.toml
Normal file
12
ipc/nano/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
[package]
|
||||||
|
name = "ethcore-ipc-nano"
|
||||||
|
version = "1.1.0"
|
||||||
|
authors = ["Nikolay Volf <nikolay@ethcore.io>"]
|
||||||
|
license = "GPL-3.0"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
"ethcore-ipc" = { path = "../rpc" }
|
||||||
|
nanomsg = "0.5.0"
|
||||||
|
log = "0.3"
|
214
ipc/nano/src/lib.rs
Normal file
214
ipc/nano/src/lib.rs
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! IPC over nanomsg transport
|
||||||
|
|
||||||
|
extern crate ethcore_ipc as ipc;
|
||||||
|
extern crate nanomsg;
|
||||||
|
#[macro_use] extern crate log;
|
||||||
|
|
||||||
|
pub use ipc::*;
|
||||||
|
|
||||||
|
use std::sync::*;
|
||||||
|
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
|
||||||
|
|
||||||
|
const POLL_TIMEOUT: isize = 100;
|
||||||
|
|
||||||
|
pub struct Worker<S> where S: IpcInterface<S> {
|
||||||
|
service: Arc<S>,
|
||||||
|
sockets: Vec<(Socket, Endpoint)>,
|
||||||
|
polls: Vec<PollFd>,
|
||||||
|
buf: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum SocketError {
|
||||||
|
DuplexLink
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Worker<S> where S: IpcInterface<S> {
|
||||||
|
pub fn new(service: Arc<S>) -> Worker<S> {
|
||||||
|
Worker::<S> {
|
||||||
|
service: service.clone(),
|
||||||
|
sockets: Vec::new(),
|
||||||
|
polls: Vec::new(),
|
||||||
|
buf: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn poll(&mut self) {
|
||||||
|
let mut request = PollRequest::new(&mut self.polls[..]);
|
||||||
|
let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
|
||||||
|
|
||||||
|
for (fd_index, fd) in request.get_fds().iter().enumerate() {
|
||||||
|
if fd.can_read() {
|
||||||
|
let (ref mut socket, _) = self.sockets[fd_index];
|
||||||
|
unsafe { self.buf.set_len(0); }
|
||||||
|
match socket.nb_read_to_end(&mut self.buf) {
|
||||||
|
Ok(method_sign_len) => {
|
||||||
|
if method_sign_len >= 2 {
|
||||||
|
// method_num
|
||||||
|
let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16;
|
||||||
|
// payload
|
||||||
|
let payload = &self.buf[2..];
|
||||||
|
|
||||||
|
// dispatching for ipc interface
|
||||||
|
let result = self.service.dispatch_buf(method_num, payload);
|
||||||
|
|
||||||
|
if let Err(e) = socket.nb_write(&result) {
|
||||||
|
warn!(target: "ipc", "Failed to write response: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(Error::TryAgain) => {
|
||||||
|
},
|
||||||
|
Err(x) => {
|
||||||
|
warn!(target: "ipc", "Error polling connections {:?}", x);
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rebuild_poll_request(&mut self) {
|
||||||
|
self.polls = self.sockets.iter()
|
||||||
|
.map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In))
|
||||||
|
.collect::<Vec<PollFd>>();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> {
|
||||||
|
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
||||||
|
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
||||||
|
SocketError::DuplexLink
|
||||||
|
}));
|
||||||
|
|
||||||
|
let endpoint = try!(socket.bind(addr).map_err(|e| {
|
||||||
|
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", addr, e);
|
||||||
|
SocketError::DuplexLink
|
||||||
|
}));
|
||||||
|
|
||||||
|
self.sockets.push((socket, endpoint));
|
||||||
|
|
||||||
|
self.rebuild_poll_request();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
use super::Worker;
|
||||||
|
use ipc::*;
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use nanomsg::{Socket, Protocol, Endpoint};
|
||||||
|
|
||||||
|
struct TestInvoke {
|
||||||
|
method_num: u16,
|
||||||
|
params: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DummyService {
|
||||||
|
methods_stack: RwLock<Vec<TestInvoke>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DummyService {
|
||||||
|
fn new() -> DummyService {
|
||||||
|
DummyService { methods_stack: RwLock::new(Vec::new()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IpcInterface<DummyService> for DummyService {
|
||||||
|
fn dispatch<R>(&self, _r: &mut R) -> Vec<u8> where R: Read {
|
||||||
|
vec![]
|
||||||
|
}
|
||||||
|
fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8> {
|
||||||
|
self.methods_stack.write().unwrap().push(
|
||||||
|
TestInvoke {
|
||||||
|
method_num: method_num,
|
||||||
|
params: buf.to_vec(),
|
||||||
|
});
|
||||||
|
vec![]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) {
|
||||||
|
let mut socket = Socket::new(Protocol::Pair).unwrap();
|
||||||
|
let endpoint = socket.connect(addr).unwrap();
|
||||||
|
//thread::sleep_ms(10);
|
||||||
|
socket.write(buf).unwrap();
|
||||||
|
(socket, endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_create_worker() {
|
||||||
|
let worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
|
||||||
|
assert_eq!(0, worker.sockets.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_add_duplex_socket_to_worker() {
|
||||||
|
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
|
||||||
|
worker.add_duplex("ipc:///tmp/parity-test10.ipc").unwrap();
|
||||||
|
assert_eq!(1, worker.sockets.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn worker_can_poll_empty() {
|
||||||
|
let service = Arc::new(DummyService::new());
|
||||||
|
let mut worker = Worker::<DummyService>::new(service.clone());
|
||||||
|
worker.add_duplex("ipc:///tmp/parity-test20.ipc").unwrap();
|
||||||
|
worker.poll();
|
||||||
|
assert_eq!(0, service.methods_stack.read().unwrap().len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn worker_can_poll() {
|
||||||
|
let url = "ipc:///tmp/parity-test30.ipc";
|
||||||
|
|
||||||
|
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
|
||||||
|
worker.add_duplex(url).unwrap();
|
||||||
|
|
||||||
|
let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]);
|
||||||
|
worker.poll();
|
||||||
|
|
||||||
|
assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
|
||||||
|
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
|
||||||
|
assert_eq!([7, 7, 6, 6], worker.service.methods_stack.read().unwrap()[0].params[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn worker_can_poll_long() {
|
||||||
|
let url = "ipc:///tmp/parity-test40.ipc";
|
||||||
|
|
||||||
|
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
|
||||||
|
worker.add_duplex(url).unwrap();
|
||||||
|
|
||||||
|
let message = [0u8; 1024*1024];
|
||||||
|
|
||||||
|
let (_socket, _endpoint) = dummy_write(url, &message);
|
||||||
|
worker.poll();
|
||||||
|
|
||||||
|
assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
|
||||||
|
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
|
||||||
|
assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params);
|
||||||
|
}
|
||||||
|
}
|
@ -21,8 +21,12 @@ use std::marker::Sync;
|
|||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
|
|
||||||
pub trait IpcInterface<T> {
|
pub trait IpcInterface<T> {
|
||||||
/// reads the message from io, dispatches the call and returns result
|
/// reads the message from io, dispatches the call and returns serialized result
|
||||||
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
|
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
|
||||||
|
|
||||||
|
/// deserialize the payload from buffer, dispatches invoke and returns serialized result
|
||||||
|
/// (for non-blocking io)
|
||||||
|
fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// serializes method invocation (method_num and parameters) to the stream specified by `w`
|
/// serializes method invocation (method_num and parameters) to the stream specified by `w`
|
||||||
|
Loading…
Reference in New Issue
Block a user