diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 369fc444c..6a0a3d4bf 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -23,11 +23,15 @@ extern crate nanomsg; pub use ipc::*; use std::sync::*; -use nanomsg::{Socket, Protocol, Error, Endpoint}; +use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; + +const POLL_TIMEOUT: isize = 100; pub struct Worker where S: IpcInterface { service: Arc, sockets: Vec<(Socket, Endpoint)>, + polls: Vec, + buf: Vec, } #[derive(Debug)] @@ -40,43 +44,55 @@ impl Worker where S: IpcInterface { Worker:: { service: service.clone(), sockets: Vec::new(), + polls: Vec::new(), + buf: Vec::new(), } } pub fn poll(&mut self) { - for item in self.sockets.iter_mut() { - let socket = &mut item.0; - let mut buf = Vec::new(); - // non-blocking read only ok if there is something to read from socket - match socket.nb_read_to_end(&mut buf) { - Ok(method_sign_len) => { - if method_sign_len >= 2 { - // method_num - let method_num = buf[1] as u16 * 256 + buf[0] as u16; - // payload - let payload = &buf[2..]; + let mut request = PollRequest::new(&mut self.polls[..]); + let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); - // dispatching for ipc interface - let result = self.service.dispatch_buf(method_num, payload); + for (fd_index, fd) in request.get_fds().iter().enumerate() { + if fd.can_read() { + let (ref mut socket, _) = self.sockets[fd_index]; + unsafe { self.buf.set_len(0); } + match socket.nb_read_to_end(&mut self.buf) { + Ok(method_sign_len) => { + if method_sign_len >= 2 { + // method_num + let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16; + // payload + let payload = &self.buf[2..]; - if let Err(e) = socket.nb_write(&result) { - warn!(target: "ipc", "Failed to write response: {:?}", e); + // dispatching for ipc interface + let result = self.service.dispatch_buf(method_num, payload); + + if let Err(e) = socket.nb_write(&result) { + warn!(target: "ipc", "Failed to write response: {:?}", e); + } } + else { + warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + } + }, + Err(Error::TryAgain) => { + }, + Err(x) => { + warn!(target: "ipc", "Error polling connections {:?}", x); + panic!(); } - else { - warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); - } - }, - Err(Error::TryAgain) => { - }, - Err(x) => { - warn!(target: "ipc", "Error polling connections {:?}", x); - panic!(); } } } } + fn rebuild_poll_request(&mut self) { + self.polls = self.sockets.iter() + .map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In)) + .collect::>(); + } + pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> { let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); @@ -89,6 +105,9 @@ impl Worker where S: IpcInterface { })); self.sockets.push((socket, endpoint)); + + self.rebuild_poll_request(); + Ok(()) } } @@ -169,7 +188,7 @@ mod tests { worker.add_duplex(url).unwrap(); let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); - for _ in 0..100 { worker.poll(); } + worker.poll(); assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); @@ -186,7 +205,7 @@ mod tests { let message = [0u8; 1024*1024]; let (_socket, _endpoint) = dummy_write(url, &message); - for _ in 0..1000 { worker.poll(); } + worker.poll(); assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);