working client spawn

This commit is contained in:
NikVolf 2016-04-12 11:34:56 +03:00
parent cb1096d1e1
commit 806f5b9064
4 changed files with 51 additions and 4 deletions

View File

@ -526,7 +526,7 @@ fn push_client_implementation(
let payload = BinHandshake {
protocol_version: $item_ident::protocol_version().to_string(),
api_version: $item_ident::api_version().to_string(),
_reserved: vec![0u8, 64],
_reserved: vec![0u8; 64],
};
let mut socket_ref = self.socket.borrow_mut();

View File

@ -54,7 +54,7 @@ pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError
SocketError::DuplexLink
}));
let endpoint = try!(socket.bind(socket_addr).map_err(|e| {
let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::DuplexLink
}));

View File

@ -69,6 +69,7 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
if params.is_some() {
buf[2..buf_len].clone_from_slice(params.as_ref().unwrap());
}
if w.write(&buf).unwrap() != buf_len
{
// if write was inconsistent

View File

@ -20,6 +20,15 @@ mod tests {
use super::super::service::*;
use nanoipc;
use std::sync::Arc;
use std::io::{Write, Read};
fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) {
let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap();
socket.write(buf).unwrap();
(socket, endpoint)
}
fn init_worker(addr: &str) -> nanoipc::Worker<Service> {
let mut worker = nanoipc::Worker::<Service>::new(Arc::new(Service::new()));
@ -35,13 +44,12 @@ mod tests {
#[test]
fn can_call_handshake() {
let url = "ipc:///tmp/parity-test-nano-20.ipc";
let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
let url = "ipc:///tmp/parity-test-nano-20.ipc";
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) {
@ -59,4 +67,42 @@ mod tests {
assert!(hs.is_ok());
}
#[test]
fn can_receive_dummy_writes_in_thread() {
let url = "ipc:///tmp/parity-test-nano-30.ipc";
let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
});
while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { }
let (mut _s, _e) = dummy_write(url, &vec![0, 0,
// protocol version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// api version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// reserved
0, 0, 0, 0, 0, 0, 0, 64,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]);
let mut buf = vec![0u8;1];
let result = _s.read(&mut buf);
assert_eq!(1, buf.len());
assert_eq!(1, buf[0]);
worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
}