From fa63d9e34ade52529b1f2d003ab949b2401b0415 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 4 Apr 2016 01:44:30 +0300 Subject: [PATCH] non-working test for dispatching --- ipc/nano/src/lib.rs | 66 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 28fbef610..81d3ac6ec 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -24,7 +24,7 @@ pub use ipc::*; use std::sync::*; use std::io::Write; -use nanomsg::{Socket, Protocol}; +use nanomsg::{Socket, Protocol, Error}; pub struct Worker where S: IpcInterface { service: Arc, @@ -49,17 +49,25 @@ impl Worker where S: IpcInterface { pub fn poll(&mut self) { for socket in self.sockets.iter_mut() { // non-blocking read only ok if there is something to read from socket - if let Ok(method_sign_len) = socket.nb_read(&mut self.method_buf) { - if method_sign_len == 2 { - let result = self.service.dispatch_buf( - self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, - socket); - if let Err(e) = socket.write(&result) { - warn!(target: "ipc", "Failed to write response: {:?}", e); + match socket.nb_read(&mut self.method_buf) { + Ok(method_sign_len) => { + if method_sign_len == 2 { + let result = self.service.dispatch_buf( + self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, + socket); + if let Err(e) = socket.write(&result) { + warn!(target: "ipc", "Failed to write response: {:?}", e); + } } + else { + warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + } + }, + Err(Error::TryAgain) => { } - else { - warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len); + Err(x) => { + warn!(target: "ipc", "Error polling connection {:?}", x); + panic!(); } } } @@ -86,8 +94,10 @@ mod tests { use super::Worker; use ipc::*; - use std::io::Read; + use std::io::{Read, Write}; use std::sync::{Arc, RwLock}; + use nanomsg::{Socket, Protocol}; + use std::thread; struct TestInvoke { method_num: u16, @@ -120,6 +130,15 @@ mod tests { } } + fn dummy_write(addr: &str, buf: &[u8]) { + let mut socket = Socket::new(Protocol::Pair).unwrap(); + socket.connect(addr).unwrap(); + thread::sleep_ms(10); +// socket.nb_write(buf).unwrap(); +// socket.flush(); + socket.write_all(buf).unwrap(); + } + #[test] fn can_create_worker() { let worker = Worker::::new(Arc::new(DummyService::new())); @@ -129,7 +148,30 @@ mod tests { #[test] fn can_add_duplex_socket_to_worker() { let mut worker = Worker::::new(Arc::new(DummyService::new())); - worker.add_duplex("ipc://tmp/parity/test1").unwrap(); + worker.add_duplex("ipc://tmp/parity-test10.ipc").unwrap(); assert_eq!(1, worker.sockets.len()); } + + #[test] + fn worker_can_poll_empty() { + let service = Arc::new(DummyService::new()); + let mut worker = Worker::::new(service.clone()); + worker.add_duplex("ipc://tmp/parity-test20.ipc").unwrap(); + worker.poll(); + assert_eq!(0, service.methods_stack.read().unwrap().len()); + } + + #[test] + fn worker_can_poll() { + let service = Arc::new(DummyService::new()); + let mut worker = Worker::::new(service.clone()); + worker.add_duplex("ipc:///tmp/parity-test30.ipc").unwrap(); + thread::sleep_ms(10); + + dummy_write("ipc:///tmp/parity-test30.ipc", &vec![0, 0, 6, 6, 6, 6]); + thread::sleep_ms(10); + worker.poll(); + + assert_eq!(1, service.methods_stack.read().unwrap().len()); + } }