guarding endpoints

This commit is contained in:
NikVolf 2016-04-04 20:47:16 +03:00
parent 952a834e43
commit 4cde01d81a

View File

@ -24,11 +24,11 @@ pub use ipc::*;
use std::sync::*; use std::sync::*;
use std::io::{Write, Read}; use std::io::{Write, Read};
use nanomsg::{Socket, Protocol, Error}; use nanomsg::{Socket, Protocol, Error, Endpoint};
pub struct Worker<S> where S: IpcInterface<S> { pub struct Worker<S> where S: IpcInterface<S> {
service: Arc<S>, service: Arc<S>,
sockets: Vec<Socket>, sockets: Vec<(Socket, Endpoint)>,
method_buf: [u8;2], method_buf: [u8;2],
} }
@ -47,7 +47,8 @@ impl<S> Worker<S> where S: IpcInterface<S> {
} }
pub fn poll(&mut self) { pub fn poll(&mut self) {
for socket in self.sockets.iter_mut() { for item in self.sockets.iter_mut() {
let socket = &mut item.0;
// non-blocking read only ok if there is something to read from socket // non-blocking read only ok if there is something to read from socket
match socket.nb_read(&mut self.method_buf) { match socket.nb_read(&mut self.method_buf) {
Ok(method_sign_len) => { Ok(method_sign_len) => {
@ -79,12 +80,12 @@ impl<S> Worker<S> where S: IpcInterface<S> {
SocketError::DuplexLink SocketError::DuplexLink
})); }));
try!(socket.bind(addr).map_err(|e| { let endpoint = try!(socket.bind(addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", addr, e); warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", addr, e);
SocketError::DuplexLink SocketError::DuplexLink
})); }));
self.sockets.push(socket); self.sockets.push((socket, endpoint));
Ok(()) Ok(())
} }
} }
@ -132,7 +133,7 @@ mod tests {
fn dummy_write(addr: &str, buf: &[u8]) { fn dummy_write(addr: &str, buf: &[u8]) {
let mut socket = Socket::new(Protocol::Pair).unwrap(); let mut socket = Socket::new(Protocol::Pair).unwrap();
socket.connect(addr).unwrap(); let endpoint = socket.connect(addr).unwrap();
thread::sleep_ms(10); thread::sleep_ms(10);
socket.write_all(buf).unwrap(); socket.write_all(buf).unwrap();
} }
@ -165,10 +166,8 @@ mod tests {
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new())); let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
worker.add_duplex(url).unwrap(); worker.add_duplex(url).unwrap();
thread::sleep_ms(10);
dummy_write(url, &vec![0, 0]); dummy_write(url, &vec![0, 0, 7, 7, 6, 6]);
thread::sleep_ms(10);
worker.poll(); worker.poll();
assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(1, worker.service.methods_stack.read().unwrap().len());