using nanomsg polling

This commit is contained in:
NikVolf 2016-04-06 00:10:24 +03:00
parent 47cfab2bbf
commit aea185471a

View File

@ -23,11 +23,15 @@ extern crate nanomsg;
pub use ipc::*;
use std::sync::*;
use nanomsg::{Socket, Protocol, Error, Endpoint};
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
const POLL_TIMEOUT: isize = 100;
pub struct Worker<S> where S: IpcInterface<S> {
service: Arc<S>,
sockets: Vec<(Socket, Endpoint)>,
polls: Vec<PollFd>,
buf: Vec<u8>,
}
#[derive(Debug)]
@ -40,43 +44,55 @@ impl<S> Worker<S> where S: IpcInterface<S> {
Worker::<S> {
service: service.clone(),
sockets: Vec::new(),
polls: Vec::new(),
buf: Vec::new(),
}
}
pub fn poll(&mut self) {
for item in self.sockets.iter_mut() {
let socket = &mut item.0;
let mut buf = Vec::new();
// non-blocking read only ok if there is something to read from socket
match socket.nb_read_to_end(&mut buf) {
Ok(method_sign_len) => {
if method_sign_len >= 2 {
// method_num
let method_num = buf[1] as u16 * 256 + buf[0] as u16;
// payload
let payload = &buf[2..];
let mut request = PollRequest::new(&mut self.polls[..]);
let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
// dispatching for ipc interface
let result = self.service.dispatch_buf(method_num, payload);
for (fd_index, fd) in request.get_fds().iter().enumerate() {
if fd.can_read() {
let (ref mut socket, _) = self.sockets[fd_index];
unsafe { self.buf.set_len(0); }
match socket.nb_read_to_end(&mut self.buf) {
Ok(method_sign_len) => {
if method_sign_len >= 2 {
// method_num
let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16;
// payload
let payload = &self.buf[2..];
if let Err(e) = socket.nb_write(&result) {
warn!(target: "ipc", "Failed to write response: {:?}", e);
// dispatching for ipc interface
let result = self.service.dispatch_buf(method_num, payload);
if let Err(e) = socket.nb_write(&result) {
warn!(target: "ipc", "Failed to write response: {:?}", e);
}
}
else {
warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len);
}
},
Err(Error::TryAgain) => {
},
Err(x) => {
warn!(target: "ipc", "Error polling connections {:?}", x);
panic!();
}
else {
warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len);
}
},
Err(Error::TryAgain) => {
},
Err(x) => {
warn!(target: "ipc", "Error polling connections {:?}", x);
panic!();
}
}
}
}
fn rebuild_poll_request(&mut self) {
self.polls = self.sockets.iter()
.map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In))
.collect::<Vec<PollFd>>();
}
pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> {
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
@ -89,6 +105,9 @@ impl<S> Worker<S> where S: IpcInterface<S> {
}));
self.sockets.push((socket, endpoint));
self.rebuild_poll_request();
Ok(())
}
}
@ -169,7 +188,7 @@ mod tests {
worker.add_duplex(url).unwrap();
let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]);
for _ in 0..100 { worker.poll(); }
worker.poll();
assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
@ -186,7 +205,7 @@ mod tests {
let message = [0u8; 1024*1024];
let (_socket, _endpoint) = dummy_write(url, &message);
for _ in 0..1000 { worker.poll(); }
worker.poll();
assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);