diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs
index 166ae87bd..5385c72d7 100644
--- a/ipc/codegen/src/codegen.rs
+++ b/ipc/codegen/src/codegen.rs
@@ -145,14 +145,20 @@ struct Dispatch {
return_type_ty: Option
>,
}
-fn implement_dispatch_arm_invoke(
+// This is the expanded version of this:
+//
+// let invoke_serialize_stmt = quote_stmt!(cx, {
+// ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap()
+// });
+//
+// But the above does not allow comma-separated expressions for arbitrary number
+// of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n
+fn implement_dispatch_arm_invoke_stmt(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
dispatch: &Dispatch,
-) -> P
+) -> ast::Stmt
{
- let deserialize_expr = quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting"));
- let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str());
let function_name = builder.id(dispatch.function_name.as_str());
let output_type_id = builder.id(dispatch.return_type_name.clone().unwrap().as_str());
@@ -161,63 +167,71 @@ fn implement_dispatch_arm_invoke(
quote_expr!(cx, input. $arg_ident)
}).collect::>>();
- // This is the expanded version of this:
- //
- // let invoke_serialize_stmt = quote_stmt!(cx, {
- // ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap()
- // });
- //
- // But the above does not allow comma-separated expressions for arbitrary number
- // of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n
- let invoke_serialize_stmt = {
- let ext_cx = &*cx;
- ::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts(
- ext_cx.parse_sess(),
- ext_cx.cfg(),
- {
- let _sp = ext_cx.call_site();
- let mut tt = ::std::vec::Vec::new();
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And)));
- tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter());
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
- tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter());
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
+ let ext_cx = &*cx;
+ ::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts(
+ ext_cx.parse_sess(),
+ ext_cx.cfg(),
+ {
+ let _sp = ext_cx.call_site();
+ let mut tt = ::std::vec::Vec::new();
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And)));
+ tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter());
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
+ tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter());
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
- for arg_expr in input_args_exprs {
- tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter());
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
- }
-
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
+ for arg_expr in input_args_exprs {
+ tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter());
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
- tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
- tt
- }))
+ }
+
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
+ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
+ tt
+ })).unwrap()
+}
+
+fn implement_dispatch_arm_invoke(
+ cx: &ExtCtxt,
+ builder: &aster::AstBuilder,
+ dispatch: &Dispatch,
+ buffer: bool,
+) -> P
+{
+ let deserialize_expr = if buffer {
+ quote_expr!(cx, ::bincode::serde::deserialize(buf).expect("ipc deserialization error, aborting"))
+ } else {
+ quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting"))
};
+
+ let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str());
+
+ let invoke_serialize_stmt = implement_dispatch_arm_invoke_stmt(cx, builder, dispatch);
quote_expr!(cx, {
let input: $input_type_id = $deserialize_expr;
$invoke_serialize_stmt
@@ -225,14 +239,31 @@ fn implement_dispatch_arm_invoke(
}
/// generates dispatch match for method id
-fn implement_dispatch_arm(cx: &ExtCtxt, builder: &aster::AstBuilder, index: u32, dispatch: &Dispatch)
- -> ast::Arm
+fn implement_dispatch_arm(
+ cx: &ExtCtxt,
+ builder: &aster::AstBuilder,
+ index: u32,
+ dispatch: &Dispatch,
+ buffer: bool,
+) -> ast::Arm
{
let index_ident = builder.id(format!("{}", index).as_str());
- let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch);
+ let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer);
quote_arm!(cx, $index_ident => { $invoke_expr } )
}
+fn implement_dispatch_arms(
+ cx: &ExtCtxt,
+ builder: &aster::AstBuilder,
+ dispatches: &[Dispatch],
+ buffer: bool,
+) -> Vec
+{
+ let mut index = -1;
+ dispatches.iter()
+ .map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch, buffer) }).collect()
+}
+
/// generates client type for specified server type
/// for say `Service` it generates `ServiceClient`
fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, item: &Item, push: &mut FnMut(Annotatable)) {
@@ -511,9 +542,9 @@ fn implement_interface(
dispatch_table.push(push_invoke_signature_aster(builder, &impl_item, signature, push));
}
}
- let mut index = -1;
- let dispatch_arms: Vec<_> = dispatch_table.iter()
- .map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch) }).collect();
+
+ let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false);
+ let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true);
Ok((quote_item!(cx,
impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause {
@@ -531,6 +562,14 @@ fn implement_interface(
_ => vec![]
}
}
+
+ fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec
+ {
+ match method_num {
+ $dispatch_arms_buffered
+ _ => vec![]
+ }
+ }
}
).unwrap(), dispatch_table))
}
diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml
new file mode 100644
index 000000000..c13a7b5a5
--- /dev/null
+++ b/ipc/nano/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "ethcore-ipc-nano"
+version = "1.1.0"
+authors = ["Nikolay Volf "]
+license = "GPL-3.0"
+
+[features]
+
+[dependencies]
+"ethcore-ipc" = { path = "../rpc" }
+nanomsg = "0.5.0"
+log = "0.3"
diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs
new file mode 100644
index 000000000..6a0a3d4bf
--- /dev/null
+++ b/ipc/nano/src/lib.rs
@@ -0,0 +1,214 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+//! IPC over nanomsg transport
+
+extern crate ethcore_ipc as ipc;
+extern crate nanomsg;
+#[macro_use] extern crate log;
+
+pub use ipc::*;
+
+use std::sync::*;
+use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
+
+const POLL_TIMEOUT: isize = 100;
+
+pub struct Worker where S: IpcInterface {
+ service: Arc,
+ sockets: Vec<(Socket, Endpoint)>,
+ polls: Vec,
+ buf: Vec,
+}
+
+#[derive(Debug)]
+pub enum SocketError {
+ DuplexLink
+}
+
+impl Worker where S: IpcInterface {
+ pub fn new(service: Arc) -> Worker {
+ Worker:: {
+ service: service.clone(),
+ sockets: Vec::new(),
+ polls: Vec::new(),
+ buf: Vec::new(),
+ }
+ }
+
+ pub fn poll(&mut self) {
+ let mut request = PollRequest::new(&mut self.polls[..]);
+ let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
+
+ for (fd_index, fd) in request.get_fds().iter().enumerate() {
+ if fd.can_read() {
+ let (ref mut socket, _) = self.sockets[fd_index];
+ unsafe { self.buf.set_len(0); }
+ match socket.nb_read_to_end(&mut self.buf) {
+ Ok(method_sign_len) => {
+ if method_sign_len >= 2 {
+ // method_num
+ let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16;
+ // payload
+ let payload = &self.buf[2..];
+
+ // dispatching for ipc interface
+ let result = self.service.dispatch_buf(method_num, payload);
+
+ if let Err(e) = socket.nb_write(&result) {
+ warn!(target: "ipc", "Failed to write response: {:?}", e);
+ }
+ }
+ else {
+ warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len);
+ }
+ },
+ Err(Error::TryAgain) => {
+ },
+ Err(x) => {
+ warn!(target: "ipc", "Error polling connections {:?}", x);
+ panic!();
+ }
+ }
+ }
+ }
+ }
+
+ fn rebuild_poll_request(&mut self) {
+ self.polls = self.sockets.iter()
+ .map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In))
+ .collect::>();
+ }
+
+ pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> {
+ let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
+ warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
+ SocketError::DuplexLink
+ }));
+
+ let endpoint = try!(socket.bind(addr).map_err(|e| {
+ warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", addr, e);
+ SocketError::DuplexLink
+ }));
+
+ self.sockets.push((socket, endpoint));
+
+ self.rebuild_poll_request();
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use super::Worker;
+ use ipc::*;
+ use std::io::{Read, Write};
+ use std::sync::{Arc, RwLock};
+ use nanomsg::{Socket, Protocol, Endpoint};
+
+ struct TestInvoke {
+ method_num: u16,
+ params: Vec,
+ }
+
+ struct DummyService {
+ methods_stack: RwLock>,
+ }
+
+ impl DummyService {
+ fn new() -> DummyService {
+ DummyService { methods_stack: RwLock::new(Vec::new()) }
+ }
+ }
+
+ impl IpcInterface for DummyService {
+ fn dispatch(&self, _r: &mut R) -> Vec where R: Read {
+ vec![]
+ }
+ fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec {
+ self.methods_stack.write().unwrap().push(
+ TestInvoke {
+ method_num: method_num,
+ params: buf.to_vec(),
+ });
+ vec![]
+ }
+ }
+
+ fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) {
+ let mut socket = Socket::new(Protocol::Pair).unwrap();
+ let endpoint = socket.connect(addr).unwrap();
+ //thread::sleep_ms(10);
+ socket.write(buf).unwrap();
+ (socket, endpoint)
+ }
+
+ #[test]
+ fn can_create_worker() {
+ let worker = Worker::::new(Arc::new(DummyService::new()));
+ assert_eq!(0, worker.sockets.len());
+ }
+
+ #[test]
+ fn can_add_duplex_socket_to_worker() {
+ let mut worker = Worker::::new(Arc::new(DummyService::new()));
+ worker.add_duplex("ipc:///tmp/parity-test10.ipc").unwrap();
+ assert_eq!(1, worker.sockets.len());
+ }
+
+ #[test]
+ fn worker_can_poll_empty() {
+ let service = Arc::new(DummyService::new());
+ let mut worker = Worker::::new(service.clone());
+ worker.add_duplex("ipc:///tmp/parity-test20.ipc").unwrap();
+ worker.poll();
+ assert_eq!(0, service.methods_stack.read().unwrap().len());
+ }
+
+ #[test]
+ fn worker_can_poll() {
+ let url = "ipc:///tmp/parity-test30.ipc";
+
+ let mut worker = Worker::::new(Arc::new(DummyService::new()));
+ worker.add_duplex(url).unwrap();
+
+ let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]);
+ worker.poll();
+
+ assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
+ assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
+ assert_eq!([7, 7, 6, 6], worker.service.methods_stack.read().unwrap()[0].params[..]);
+ }
+
+ #[test]
+ fn worker_can_poll_long() {
+ let url = "ipc:///tmp/parity-test40.ipc";
+
+ let mut worker = Worker::::new(Arc::new(DummyService::new()));
+ worker.add_duplex(url).unwrap();
+
+ let message = [0u8; 1024*1024];
+
+ let (_socket, _endpoint) = dummy_write(url, &message);
+ worker.poll();
+
+ assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
+ assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
+ assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params);
+ }
+}
diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs
index 8d67deca3..7ed8b60c4 100644
--- a/ipc/rpc/src/interface.rs
+++ b/ipc/rpc/src/interface.rs
@@ -21,8 +21,12 @@ use std::marker::Sync;
use std::sync::atomic::*;
pub trait IpcInterface {
- /// reads the message from io, dispatches the call and returns result
+ /// reads the message from io, dispatches the call and returns serialized result
fn dispatch(&self, r: &mut R) -> Vec where R: Read;
+
+ /// deserialize the payload from buffer, dispatches invoke and returns serialized result
+ /// (for non-blocking io)
+ fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec;
}
/// serializes method invocation (method_num and parameters) to the stream specified by `w`