From 0a60da622f0ab36a4a2a1afca086c51770172f9a Mon Sep 17 00:00:00 2001 From: NikVolf Date: Sun, 3 Apr 2016 21:43:35 +0300 Subject: [PATCH 01/16] new crate --- ipc/nano/Cargo.toml | 11 +++++++++++ ipc/nano/src/lib.rs | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 ipc/nano/Cargo.toml create mode 100644 ipc/nano/src/lib.rs diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml new file mode 100644 index 000000000..73de137c3 --- /dev/null +++ b/ipc/nano/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ethcore-ipc-nano" +version = "1.1.0" +authors = ["Nikolay Volf "] +license = "GPL-3.0" + +[features] + +[dependencies] +"ethcore-ipc" = { path = "../rpc" } +nanomsg = "0.5.0" diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs new file mode 100644 index 000000000..a77ff7f05 --- /dev/null +++ b/ipc/nano/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Ethcore implementation of IPC over nanomsg transport + +extern crate ethcore_ipc as ipc; +extern crate nanomsg; From b04d8196c7496cfdec7cc2d1dfb200e4abdffd28 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Sun, 3 Apr 2016 23:39:49 +0300 Subject: [PATCH 02/16] dispatch_buf --- ipc/codegen/src/codegen.rs | 9 +++++++++ ipc/nano/src/lib.rs | 24 +++++++++++++++++++++++- ipc/rpc/src/interface.rs | 4 ++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 166ae87bd..f0c404132 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -531,6 +531,15 @@ fn implement_interface( _ => vec![] } } + + fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec + where R: ::std::io::Read + { + match method_num { + $dispatch_arms + _ => vec![] + } + } } ).unwrap(), dispatch_table)) } diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index a77ff7f05..166c4ea53 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -14,7 +14,29 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Ethcore implementation of IPC over nanomsg transport +//! IPC over nanomsg transport extern crate ethcore_ipc as ipc; extern crate nanomsg; + +pub use ipc::*; + +use std::sync::*; +use nanomsg::{Socket, Protocol}; + +pub struct Worker where S: IpcInterface { + service: Arc, + sockets: Vec, +} + +impl Worker where S: IpcInterface { + pub fn new(service: Arc, socket_addr: &str) -> Worker { + Worker:: { + service: service.clone(), + sockets: Vec::new(), + } + } + + pub fn work_loop(&mut self) { + } +} diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 8d67deca3..a55133191 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -23,6 +23,10 @@ use std::sync::atomic::*; pub trait IpcInterface { /// reads the message from io, dispatches the call and returns result fn dispatch(&self, r: &mut R) -> Vec where R: Read; + + /// deserialize the payload from the io `r` and invokes method specified by `method_num` + /// (for non-blocking io) + fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec where R: Read; } /// serializes method invocation (method_num and parameters) to the stream specified by `w` From 326855dc3a8e5bf2fbbaa05afbe7ea8c89489d8c Mon Sep 17 00:00:00 2001 From: NikVolf Date: Sun, 3 Apr 2016 23:58:18 +0300 Subject: [PATCH 03/16] basic polling --- ipc/nano/src/lib.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 166c4ea53..c43cb9b3e 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -22,21 +22,34 @@ extern crate nanomsg; pub use ipc::*; use std::sync::*; +use std::io::Write; use nanomsg::{Socket, Protocol}; pub struct Worker where S: IpcInterface { service: Arc, sockets: Vec, + method_buf: [u8;2], } impl Worker where S: IpcInterface { - pub fn new(service: Arc, socket_addr: &str) -> Worker { + pub fn new(service: Arc) -> Worker { Worker:: { service: service.clone(), sockets: Vec::new(), + method_buf: [0,0] } } - pub fn work_loop(&mut self) { + pub fn poll(&mut self) { + for socket in self.sockets.iter_mut() { + if let Ok(method_sig_len) = socket.nb_read(&mut self.method_buf) { + if method_sig_len == 2 { + let result = self.service.dispatch_buf( + self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, + socket); + socket.write(&result); + } + } + } } } From 5cd6a0408245c43aa515d894873bf396b3e79a04 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 00:00:57 +0300 Subject: [PATCH 04/16] to pollng also --- ipc/nano/Cargo.toml | 1 + ipc/nano/src/lib.rs | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml index 73de137c3..c13a7b5a5 100644 --- a/ipc/nano/Cargo.toml +++ b/ipc/nano/Cargo.toml @@ -9,3 +9,4 @@ license = "GPL-3.0" [dependencies] "ethcore-ipc" = { path = "../rpc" } nanomsg = "0.5.0" +log = "0.3" diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index c43cb9b3e..e92542d03 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -18,6 +18,7 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; +#[macro_use] extern crate log; pub use ipc::*; @@ -47,7 +48,9 @@ impl Worker where S: IpcInterface { let result = self.service.dispatch_buf( self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, socket); - socket.write(&result); + if let Err(e) = socket.write(&result) { + warn!(target: "ipc", "Failed to write response: {:?}", e); + } } } } From 99d127bb34fb02d4b99c70cec62f06ab3a078f12 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 00:33:30 +0300 Subject: [PATCH 05/16] duplex & tests --- ipc/nano/src/lib.rs | 52 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index e92542d03..1b0d0ca7b 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -32,6 +32,10 @@ pub struct Worker where S: IpcInterface { method_buf: [u8;2], } +pub enum SocketError { + DuplexLink +} + impl Worker where S: IpcInterface { pub fn new(service: Arc) -> Worker { Worker:: { @@ -43,8 +47,9 @@ impl Worker where S: IpcInterface { pub fn poll(&mut self) { for socket in self.sockets.iter_mut() { - if let Ok(method_sig_len) = socket.nb_read(&mut self.method_buf) { - if method_sig_len == 2 { + // non-blocking read only ok if there is something to read from socket + if let Ok(method_sign_len) = socket.nb_read(&mut self.method_buf) { + if method_sign_len == 2 { let result = self.service.dispatch_buf( self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, socket); @@ -52,7 +57,50 @@ impl Worker where S: IpcInterface { warn!(target: "ipc", "Failed to write response: {:?}", e); } } + else { + warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + } } } } + + pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> { + let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { + warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); + SocketError::DuplexLink + })); + + try!(socket.bind(addr).map_err(|e| { + warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", addr, e); + SocketError::DuplexLink + })); + + self.sockets.push(socket); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use super::Worker; + use ipc::*; + use std::io::Read; + use std::sync::Arc; + + struct DummyService; + + impl IpcInterface for DummyService { + fn dispatch(&self, r: &mut R) -> Vec where R: Read { + vec![] + } + fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec where R: Read { + vec![] + } + } + + fn can_create_worker() { + let worker = Worker::::new(Arc::new(DummyService)); + assert_eq!(0, worker.sockets.len()); + } } From 1395d58d3941b4fd2ba4e5a2b5dd8a137734c8c9 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 00:42:00 +0300 Subject: [PATCH 06/16] actual test flag --- ipc/nano/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 1b0d0ca7b..51b484049 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -99,6 +99,7 @@ mod tests { } } + #[test] fn can_create_worker() { let worker = Worker::::new(Arc::new(DummyService)); assert_eq!(0, worker.sockets.len()); From 675af841e8aa49ccc02baefb1b5f7ebbd02e6a87 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 00:54:30 +0300 Subject: [PATCH 07/16] dummy service --- ipc/nano/src/lib.rs | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 51b484049..28fbef610 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -32,6 +32,7 @@ pub struct Worker where S: IpcInterface { method_buf: [u8;2], } +#[derive(Debug)] pub enum SocketError { DuplexLink } @@ -86,22 +87,49 @@ mod tests { use super::Worker; use ipc::*; use std::io::Read; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; - struct DummyService; + struct TestInvoke { + method_num: u16, + params: Vec, + } + + struct DummyService { + methods_stack: RwLock>, + } + + impl DummyService { + fn new() -> DummyService { + DummyService { methods_stack: RwLock::new(Vec::new()) } + } + } impl IpcInterface for DummyService { - fn dispatch(&self, r: &mut R) -> Vec where R: Read { + fn dispatch(&self, _r: &mut R) -> Vec where R: Read { vec![] } fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec where R: Read { + let mut buf = vec![0u8; 4096]; + let size = r.read_to_end(&mut buf).unwrap(); + self.methods_stack.write().unwrap().push( + TestInvoke { + method_num: method_num, + params: unsafe { Vec::from_raw_parts(buf.as_mut_ptr(), size, size) } + }); vec![] } } #[test] fn can_create_worker() { - let worker = Worker::::new(Arc::new(DummyService)); + let worker = Worker::::new(Arc::new(DummyService::new())); assert_eq!(0, worker.sockets.len()); } + + #[test] + fn can_add_duplex_socket_to_worker() { + let mut worker = Worker::::new(Arc::new(DummyService::new())); + worker.add_duplex("ipc://tmp/parity/test1").unwrap(); + assert_eq!(1, worker.sockets.len()); + } } From fa63d9e34ade52529b1f2d003ab949b2401b0415 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 01:44:30 +0300 Subject: [PATCH 08/16] non-working test for dispatching --- ipc/nano/src/lib.rs | 66 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 28fbef610..81d3ac6ec 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -24,7 +24,7 @@ pub use ipc::*; use std::sync::*; use std::io::Write; -use nanomsg::{Socket, Protocol}; +use nanomsg::{Socket, Protocol, Error}; pub struct Worker where S: IpcInterface { service: Arc, @@ -49,17 +49,25 @@ impl Worker where S: IpcInterface { pub fn poll(&mut self) { for socket in self.sockets.iter_mut() { // non-blocking read only ok if there is something to read from socket - if let Ok(method_sign_len) = socket.nb_read(&mut self.method_buf) { - if method_sign_len == 2 { - let result = self.service.dispatch_buf( - self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, - socket); - if let Err(e) = socket.write(&result) { - warn!(target: "ipc", "Failed to write response: {:?}", e); + match socket.nb_read(&mut self.method_buf) { + Ok(method_sign_len) => { + if method_sign_len == 2 { + let result = self.service.dispatch_buf( + self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, + socket); + if let Err(e) = socket.write(&result) { + warn!(target: "ipc", "Failed to write response: {:?}", e); + } } + else { + warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + } + }, + Err(Error::TryAgain) => { } - else { - warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + Err(x) => { + warn!(target: "ipc", "Error polling connection {:?}", x); + panic!(); } } } @@ -86,8 +94,10 @@ mod tests { use super::Worker; use ipc::*; - use std::io::Read; + use std::io::{Read, Write}; use std::sync::{Arc, RwLock}; + use nanomsg::{Socket, Protocol}; + use std::thread; struct TestInvoke { method_num: u16, @@ -120,6 +130,15 @@ mod tests { } } + fn dummy_write(addr: &str, buf: &[u8]) { + let mut socket = Socket::new(Protocol::Pair).unwrap(); + socket.connect(addr).unwrap(); + thread::sleep_ms(10); +// socket.nb_write(buf).unwrap(); +// socket.flush(); + socket.write_all(buf).unwrap(); + } + #[test] fn can_create_worker() { let worker = Worker::::new(Arc::new(DummyService::new())); @@ -129,7 +148,30 @@ mod tests { #[test] fn can_add_duplex_socket_to_worker() { let mut worker = Worker::::new(Arc::new(DummyService::new())); - worker.add_duplex("ipc://tmp/parity/test1").unwrap(); + worker.add_duplex("ipc://tmp/parity-test10.ipc").unwrap(); assert_eq!(1, worker.sockets.len()); } + + #[test] + fn worker_can_poll_empty() { + let service = Arc::new(DummyService::new()); + let mut worker = Worker::::new(service.clone()); + worker.add_duplex("ipc://tmp/parity-test20.ipc").unwrap(); + worker.poll(); + assert_eq!(0, service.methods_stack.read().unwrap().len()); + } + + #[test] + fn worker_can_poll() { + let service = Arc::new(DummyService::new()); + let mut worker = Worker::::new(service.clone()); + worker.add_duplex("ipc:///tmp/parity-test30.ipc").unwrap(); + thread::sleep_ms(10); + + dummy_write("ipc:///tmp/parity-test30.ipc", &vec![0, 0, 6, 6, 6, 6]); + thread::sleep_ms(10); + worker.poll(); + + assert_eq!(1, service.methods_stack.read().unwrap().len()); + } } From 35465debd69d84a13dd37d4e7c5e4f8f861649d9 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 01:52:19 +0300 Subject: [PATCH 09/16] flush --- ipc/nano/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 81d3ac6ec..f44a68bd5 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -148,7 +148,7 @@ mod tests { #[test] fn can_add_duplex_socket_to_worker() { let mut worker = Worker::::new(Arc::new(DummyService::new())); - worker.add_duplex("ipc://tmp/parity-test10.ipc").unwrap(); + worker.add_duplex("ipc:///tmp/parity-test10.ipc").unwrap(); assert_eq!(1, worker.sockets.len()); } @@ -156,7 +156,7 @@ mod tests { fn worker_can_poll_empty() { let service = Arc::new(DummyService::new()); let mut worker = Worker::::new(service.clone()); - worker.add_duplex("ipc://tmp/parity-test20.ipc").unwrap(); + worker.add_duplex("ipc:///tmp/parity-test20.ipc").unwrap(); worker.poll(); assert_eq!(0, service.methods_stack.read().unwrap().len()); } From 952a834e43c3a1c2f610b97ae40d3ed20aa8c0b8 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 10:55:06 +0300 Subject: [PATCH 10/16] savework --- ipc/nano/src/lib.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index f44a68bd5..da820855d 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -23,7 +23,7 @@ extern crate nanomsg; pub use ipc::*; use std::sync::*; -use std::io::Write; +use std::io::{Write, Read}; use nanomsg::{Socket, Protocol, Error}; pub struct Worker where S: IpcInterface { @@ -64,7 +64,7 @@ impl Worker where S: IpcInterface { } }, Err(Error::TryAgain) => { - } + }, Err(x) => { warn!(target: "ipc", "Error polling connection {:?}", x); panic!(); @@ -134,8 +134,6 @@ mod tests { let mut socket = Socket::new(Protocol::Pair).unwrap(); socket.connect(addr).unwrap(); thread::sleep_ms(10); -// socket.nb_write(buf).unwrap(); -// socket.flush(); socket.write_all(buf).unwrap(); } @@ -163,15 +161,16 @@ mod tests { #[test] fn worker_can_poll() { - let service = Arc::new(DummyService::new()); - let mut worker = Worker::::new(service.clone()); - worker.add_duplex("ipc:///tmp/parity-test30.ipc").unwrap(); + let url = "ipc:///tmp/parity-test30.ipc"; + + let mut worker = Worker::::new(Arc::new(DummyService::new())); + worker.add_duplex(url).unwrap(); thread::sleep_ms(10); - dummy_write("ipc:///tmp/parity-test30.ipc", &vec![0, 0, 6, 6, 6, 6]); + dummy_write(url, &vec![0, 0]); thread::sleep_ms(10); worker.poll(); - assert_eq!(1, service.methods_stack.read().unwrap().len()); + assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); } } From 4cde01d81aedf17a0cdf1433234388d719651b02 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 20:47:16 +0300 Subject: [PATCH 11/16] guarding endpoints --- ipc/nano/src/lib.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index da820855d..d1cc58c48 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -24,11 +24,11 @@ pub use ipc::*; use std::sync::*; use std::io::{Write, Read}; -use nanomsg::{Socket, Protocol, Error}; +use nanomsg::{Socket, Protocol, Error, Endpoint}; pub struct Worker where S: IpcInterface { service: Arc, - sockets: Vec, + sockets: Vec<(Socket, Endpoint)>, method_buf: [u8;2], } @@ -47,7 +47,8 @@ impl Worker where S: IpcInterface { } pub fn poll(&mut self) { - for socket in self.sockets.iter_mut() { + for item in self.sockets.iter_mut() { + let socket = &mut item.0; // non-blocking read only ok if there is something to read from socket match socket.nb_read(&mut self.method_buf) { Ok(method_sign_len) => { @@ -79,12 +80,12 @@ impl Worker where S: IpcInterface { SocketError::DuplexLink })); - try!(socket.bind(addr).map_err(|e| { + let endpoint = try!(socket.bind(addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", addr, e); SocketError::DuplexLink })); - self.sockets.push(socket); + self.sockets.push((socket, endpoint)); Ok(()) } } @@ -132,7 +133,7 @@ mod tests { fn dummy_write(addr: &str, buf: &[u8]) { let mut socket = Socket::new(Protocol::Pair).unwrap(); - socket.connect(addr).unwrap(); + let endpoint = socket.connect(addr).unwrap(); thread::sleep_ms(10); socket.write_all(buf).unwrap(); } @@ -165,10 +166,8 @@ mod tests { let mut worker = Worker::::new(Arc::new(DummyService::new())); worker.add_duplex(url).unwrap(); - thread::sleep_ms(10); - dummy_write(url, &vec![0, 0]); - thread::sleep_ms(10); + dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); worker.poll(); assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); From 0d7e52ac6fa5e08edc960178cf36a61b196c50ea Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 5 Apr 2016 12:08:42 +0300 Subject: [PATCH 12/16] dispatch buf and proper polling --- ipc/codegen/src/codegen.rs | 164 ++++++++++++++++++++++--------------- ipc/nano/src/lib.rs | 62 +++++++++----- ipc/rpc/src/interface.rs | 2 +- 3 files changed, 139 insertions(+), 89 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index f0c404132..5385c72d7 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -145,14 +145,20 @@ struct Dispatch { return_type_ty: Option>, } -fn implement_dispatch_arm_invoke( +// This is the expanded version of this: +// +// let invoke_serialize_stmt = quote_stmt!(cx, { +// ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap() +// }); +// +// But the above does not allow comma-separated expressions for arbitrary number +// of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n +fn implement_dispatch_arm_invoke_stmt( cx: &ExtCtxt, builder: &aster::AstBuilder, dispatch: &Dispatch, -) -> P +) -> ast::Stmt { - let deserialize_expr = quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting")); - let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str()); let function_name = builder.id(dispatch.function_name.as_str()); let output_type_id = builder.id(dispatch.return_type_name.clone().unwrap().as_str()); @@ -161,63 +167,71 @@ fn implement_dispatch_arm_invoke( quote_expr!(cx, input. $arg_ident) }).collect::>>(); - // This is the expanded version of this: - // - // let invoke_serialize_stmt = quote_stmt!(cx, { - // ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap() - // }); - // - // But the above does not allow comma-separated expressions for arbitrary number - // of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n - let invoke_serialize_stmt = { - let ext_cx = &*cx; - ::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts( - ext_cx.parse_sess(), - ext_cx.cfg(), - { - let _sp = ext_cx.call_site(); - let mut tt = ::std::vec::Vec::new(); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And))); - tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter()); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot)); - tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter()); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); + let ext_cx = &*cx; + ::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts( + ext_cx.parse_sess(), + ext_cx.cfg(), + { + let _sp = ext_cx.call_site(); + let mut tt = ::std::vec::Vec::new(); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And))); + tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter()); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot)); + tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter()); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); - for arg_expr in input_args_exprs { - tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter()); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma)); - } - - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace))); + for arg_expr in input_args_exprs { + tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter()); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot)); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); - tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace))); - tt - })) + } + + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot)); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); + tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace))); + tt + })).unwrap() +} + +fn implement_dispatch_arm_invoke( + cx: &ExtCtxt, + builder: &aster::AstBuilder, + dispatch: &Dispatch, + buffer: bool, +) -> P +{ + let deserialize_expr = if buffer { + quote_expr!(cx, ::bincode::serde::deserialize(buf).expect("ipc deserialization error, aborting")) + } else { + quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting")) }; + + let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str()); + + let invoke_serialize_stmt = implement_dispatch_arm_invoke_stmt(cx, builder, dispatch); quote_expr!(cx, { let input: $input_type_id = $deserialize_expr; $invoke_serialize_stmt @@ -225,14 +239,31 @@ fn implement_dispatch_arm_invoke( } /// generates dispatch match for method id -fn implement_dispatch_arm(cx: &ExtCtxt, builder: &aster::AstBuilder, index: u32, dispatch: &Dispatch) - -> ast::Arm +fn implement_dispatch_arm( + cx: &ExtCtxt, + builder: &aster::AstBuilder, + index: u32, + dispatch: &Dispatch, + buffer: bool, +) -> ast::Arm { let index_ident = builder.id(format!("{}", index).as_str()); - let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch); + let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer); quote_arm!(cx, $index_ident => { $invoke_expr } ) } +fn implement_dispatch_arms( + cx: &ExtCtxt, + builder: &aster::AstBuilder, + dispatches: &[Dispatch], + buffer: bool, +) -> Vec +{ + let mut index = -1; + dispatches.iter() + .map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch, buffer) }).collect() +} + /// generates client type for specified server type /// for say `Service` it generates `ServiceClient` fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, item: &Item, push: &mut FnMut(Annotatable)) { @@ -511,9 +542,9 @@ fn implement_interface( dispatch_table.push(push_invoke_signature_aster(builder, &impl_item, signature, push)); } } - let mut index = -1; - let dispatch_arms: Vec<_> = dispatch_table.iter() - .map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch) }).collect(); + + let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false); + let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true); Ok((quote_item!(cx, impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause { @@ -532,11 +563,10 @@ fn implement_interface( } } - fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec - where R: ::std::io::Read + fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec { match method_num { - $dispatch_arms + $dispatch_arms_buffered _ => vec![] } } diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index d1cc58c48..871575750 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -23,13 +23,11 @@ extern crate nanomsg; pub use ipc::*; use std::sync::*; -use std::io::{Write, Read}; use nanomsg::{Socket, Protocol, Error, Endpoint}; pub struct Worker where S: IpcInterface { service: Arc, sockets: Vec<(Socket, Endpoint)>, - method_buf: [u8;2], } #[derive(Debug)] @@ -42,21 +40,26 @@ impl Worker where S: IpcInterface { Worker:: { service: service.clone(), sockets: Vec::new(), - method_buf: [0,0] } } pub fn poll(&mut self) { for item in self.sockets.iter_mut() { let socket = &mut item.0; + let mut buf = Vec::new(); // non-blocking read only ok if there is something to read from socket - match socket.nb_read(&mut self.method_buf) { + match socket.nb_read_to_end(&mut buf) { Ok(method_sign_len) => { - if method_sign_len == 2 { - let result = self.service.dispatch_buf( - self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, - socket); - if let Err(e) = socket.write(&result) { + if method_sign_len >= 2 { + // method_num + let method_num = buf[1] as u16 * 256 + buf[0] as u16; + // payload + let payload = &buf[2..]; + + // dispatching for ipc interface + let result = self.service.dispatch_buf(method_num, payload); + + if let Err(e) = socket.nb_write(&result) { warn!(target: "ipc", "Failed to write response: {:?}", e); } } @@ -67,7 +70,7 @@ impl Worker where S: IpcInterface { Err(Error::TryAgain) => { }, Err(x) => { - warn!(target: "ipc", "Error polling connection {:?}", x); + warn!(target: "ipc", "Error polling connections {:?}", x); panic!(); } } @@ -97,8 +100,7 @@ mod tests { use ipc::*; use std::io::{Read, Write}; use std::sync::{Arc, RwLock}; - use nanomsg::{Socket, Protocol}; - use std::thread; + use nanomsg::{Socket, Protocol, Endpoint}; struct TestInvoke { method_num: u16, @@ -119,23 +121,22 @@ mod tests { fn dispatch(&self, _r: &mut R) -> Vec where R: Read { vec![] } - fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec where R: Read { - let mut buf = vec![0u8; 4096]; - let size = r.read_to_end(&mut buf).unwrap(); + fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec { self.methods_stack.write().unwrap().push( TestInvoke { method_num: method_num, - params: unsafe { Vec::from_raw_parts(buf.as_mut_ptr(), size, size) } + params: buf.to_vec(), }); vec![] } } - fn dummy_write(addr: &str, buf: &[u8]) { + fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) { let mut socket = Socket::new(Protocol::Pair).unwrap(); let endpoint = socket.connect(addr).unwrap(); - thread::sleep_ms(10); - socket.write_all(buf).unwrap(); + //thread::sleep_ms(10); + socket.write(buf).unwrap(); + (socket, endpoint) } #[test] @@ -167,9 +168,28 @@ mod tests { let mut worker = Worker::::new(Arc::new(DummyService::new())); worker.add_duplex(url).unwrap(); - dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); - worker.poll(); + let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); + for _ in 0..1000 { worker.poll(); } assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); + assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); + assert_eq!([7, 7, 6, 6], worker.service.methods_stack.read().unwrap()[0].params[..]); + } + + #[test] + fn worker_can_poll_long() { + let url = "ipc:///tmp/parity-test30.ipc"; + + let mut worker = Worker::::new(Arc::new(DummyService::new())); + worker.add_duplex(url).unwrap(); + + let message = [0u8; 1024*1024]; + + let (_socket, _endpoint) = dummy_write(url, &message); + for _ in 0..10000 { worker.poll(); } + + assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); + assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); + assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params); } } diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index a55133191..95a360cad 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -26,7 +26,7 @@ pub trait IpcInterface { /// deserialize the payload from the io `r` and invokes method specified by `method_num` /// (for non-blocking io) - fn dispatch_buf(&self, method_num: u16, r: &mut R) -> Vec where R: Read; + fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec; } /// serializes method invocation (method_num and parameters) to the stream specified by `w` From 201d47a4835b2a47297109c952a2c40ef9a53a03 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 5 Apr 2016 12:11:05 +0300 Subject: [PATCH 13/16] fixing url --- ipc/nano/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 871575750..c520d8dc5 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -178,7 +178,7 @@ mod tests { #[test] fn worker_can_poll_long() { - let url = "ipc:///tmp/parity-test30.ipc"; + let url = "ipc:///tmp/parity-test40.ipc"; let mut worker = Worker::::new(Arc::new(DummyService::new())); worker.add_duplex(url).unwrap(); From 6d425bb5bb8a6dc81949ccad51cefe08a8788e97 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 5 Apr 2016 12:35:45 +0300 Subject: [PATCH 14/16] fix doc --- ipc/rpc/src/interface.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 95a360cad..7ed8b60c4 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -21,10 +21,10 @@ use std::marker::Sync; use std::sync::atomic::*; pub trait IpcInterface { - /// reads the message from io, dispatches the call and returns result + /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; - /// deserialize the payload from the io `r` and invokes method specified by `method_num` + /// deserialize the payload from buffer, dispatches invoke and returns serialized result /// (for non-blocking io) fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec; } From 47cfab2bbf2ec8c9f019935efa361c6809993a9f Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 5 Apr 2016 12:37:05 +0300 Subject: [PATCH 15/16] loop size --- ipc/nano/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index c520d8dc5..369fc444c 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -169,7 +169,7 @@ mod tests { worker.add_duplex(url).unwrap(); let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); - for _ in 0..1000 { worker.poll(); } + for _ in 0..100 { worker.poll(); } assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); @@ -186,7 +186,7 @@ mod tests { let message = [0u8; 1024*1024]; let (_socket, _endpoint) = dummy_write(url, &message); - for _ in 0..10000 { worker.poll(); } + for _ in 0..1000 { worker.poll(); } assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); From aea185471a1afd4e8183d285493af70ceb7b7765 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 6 Apr 2016 00:10:24 +0300 Subject: [PATCH 16/16] using nanomsg polling --- ipc/nano/src/lib.rs | 73 ++++++++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 369fc444c..6a0a3d4bf 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -23,11 +23,15 @@ extern crate nanomsg; pub use ipc::*; use std::sync::*; -use nanomsg::{Socket, Protocol, Error, Endpoint}; +use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; + +const POLL_TIMEOUT: isize = 100; pub struct Worker where S: IpcInterface { service: Arc, sockets: Vec<(Socket, Endpoint)>, + polls: Vec, + buf: Vec, } #[derive(Debug)] @@ -40,43 +44,55 @@ impl Worker where S: IpcInterface { Worker:: { service: service.clone(), sockets: Vec::new(), + polls: Vec::new(), + buf: Vec::new(), } } pub fn poll(&mut self) { - for item in self.sockets.iter_mut() { - let socket = &mut item.0; - let mut buf = Vec::new(); - // non-blocking read only ok if there is something to read from socket - match socket.nb_read_to_end(&mut buf) { - Ok(method_sign_len) => { - if method_sign_len >= 2 { - // method_num - let method_num = buf[1] as u16 * 256 + buf[0] as u16; - // payload - let payload = &buf[2..]; + let mut request = PollRequest::new(&mut self.polls[..]); + let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); - // dispatching for ipc interface - let result = self.service.dispatch_buf(method_num, payload); + for (fd_index, fd) in request.get_fds().iter().enumerate() { + if fd.can_read() { + let (ref mut socket, _) = self.sockets[fd_index]; + unsafe { self.buf.set_len(0); } + match socket.nb_read_to_end(&mut self.buf) { + Ok(method_sign_len) => { + if method_sign_len >= 2 { + // method_num + let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16; + // payload + let payload = &self.buf[2..]; - if let Err(e) = socket.nb_write(&result) { - warn!(target: "ipc", "Failed to write response: {:?}", e); + // dispatching for ipc interface + let result = self.service.dispatch_buf(method_num, payload); + + if let Err(e) = socket.nb_write(&result) { + warn!(target: "ipc", "Failed to write response: {:?}", e); + } } + else { + warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + } + }, + Err(Error::TryAgain) => { + }, + Err(x) => { + warn!(target: "ipc", "Error polling connections {:?}", x); + panic!(); } - else { - warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); - } - }, - Err(Error::TryAgain) => { - }, - Err(x) => { - warn!(target: "ipc", "Error polling connections {:?}", x); - panic!(); } } } } + fn rebuild_poll_request(&mut self) { + self.polls = self.sockets.iter() + .map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In)) + .collect::>(); + } + pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> { let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); @@ -89,6 +105,9 @@ impl Worker where S: IpcInterface { })); self.sockets.push((socket, endpoint)); + + self.rebuild_poll_request(); + Ok(()) } } @@ -169,7 +188,7 @@ mod tests { worker.add_duplex(url).unwrap(); let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); - for _ in 0..100 { worker.poll(); } + worker.poll(); assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); @@ -186,7 +205,7 @@ mod tests { let message = [0u8; 1024*1024]; let (_socket, _endpoint) = dummy_write(url, &message); - for _ in 0..1000 { worker.poll(); } + worker.poll(); assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);