non-working test for dispatching

This commit is contained in:
NikVolf 2016-04-04 01:44:30 +03:00
parent 675af841e8
commit fa63d9e34a

View File

@ -24,7 +24,7 @@ pub use ipc::*;
use std::sync::*;
use std::io::Write;
use nanomsg::{Socket, Protocol};
use nanomsg::{Socket, Protocol, Error};
pub struct Worker<S> where S: IpcInterface<S> {
service: Arc<S>,
@ -49,17 +49,25 @@ impl<S> Worker<S> where S: IpcInterface<S> {
pub fn poll(&mut self) {
for socket in self.sockets.iter_mut() {
// non-blocking read only ok if there is something to read from socket
if let Ok(method_sign_len) = socket.nb_read(&mut self.method_buf) {
if method_sign_len == 2 {
let result = self.service.dispatch_buf(
self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16,
socket);
if let Err(e) = socket.write(&result) {
warn!(target: "ipc", "Failed to write response: {:?}", e);
match socket.nb_read(&mut self.method_buf) {
Ok(method_sign_len) => {
if method_sign_len == 2 {
let result = self.service.dispatch_buf(
self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16,
socket);
if let Err(e) = socket.write(&result) {
warn!(target: "ipc", "Failed to write response: {:?}", e);
}
}
else {
warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len);
}
},
Err(Error::TryAgain) => {
}
else {
warn!(target: "ipc", "Failed to read method signature from socket: unexpected message length({})", method_sign_len);
Err(x) => {
warn!(target: "ipc", "Error polling connection {:?}", x);
panic!();
}
}
}
@ -86,8 +94,10 @@ mod tests {
use super::Worker;
use ipc::*;
use std::io::Read;
use std::io::{Read, Write};
use std::sync::{Arc, RwLock};
use nanomsg::{Socket, Protocol};
use std::thread;
struct TestInvoke {
method_num: u16,
@ -120,6 +130,15 @@ mod tests {
}
}
fn dummy_write(addr: &str, buf: &[u8]) {
let mut socket = Socket::new(Protocol::Pair).unwrap();
socket.connect(addr).unwrap();
thread::sleep_ms(10);
// socket.nb_write(buf).unwrap();
// socket.flush();
socket.write_all(buf).unwrap();
}
#[test]
fn can_create_worker() {
let worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
@ -129,7 +148,30 @@ mod tests {
#[test]
fn can_add_duplex_socket_to_worker() {
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
worker.add_duplex("ipc://tmp/parity/test1").unwrap();
worker.add_duplex("ipc://tmp/parity-test10.ipc").unwrap();
assert_eq!(1, worker.sockets.len());
}
#[test]
fn worker_can_poll_empty() {
let service = Arc::new(DummyService::new());
let mut worker = Worker::<DummyService>::new(service.clone());
worker.add_duplex("ipc://tmp/parity-test20.ipc").unwrap();
worker.poll();
assert_eq!(0, service.methods_stack.read().unwrap().len());
}
#[test]
fn worker_can_poll() {
let service = Arc::new(DummyService::new());
let mut worker = Worker::<DummyService>::new(service.clone());
worker.add_duplex("ipc:///tmp/parity-test30.ipc").unwrap();
thread::sleep_ms(10);
dummy_write("ipc:///tmp/parity-test30.ipc", &vec![0, 0, 6, 6, 6, 6]);
thread::sleep_ms(10);
worker.poll();
assert_eq!(1, service.methods_stack.read().unwrap().len());
}
}