working client spawn

This commit is contained in:
NikVolf 2016-04-12 11:34:56 +03:00
parent 593ccd2510
commit 9b329296e4
4 changed files with 51 additions and 4 deletions

View File

@ -526,7 +526,7 @@ fn push_client_implementation(
let payload = BinHandshake {
protocol_version: $item_ident::protocol_version().to_string(),
api_version: $item_ident::api_version().to_string(),
_reserved: vec![0u8, 64],
_reserved: vec![0u8; 64],
};
let mut socket_ref = self.socket.borrow_mut();

View File

@ -54,7 +54,7 @@ pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError
SocketError::DuplexLink
}));
let endpoint = try!(socket.bind(socket_addr).map_err(|e| {
let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::DuplexLink
}));

View File

@ -69,6 +69,7 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
if params.is_some() {
buf[2..buf_len].clone_from_slice(params.as_ref().unwrap());
}
if w.write(&buf).unwrap() != buf_len
{
// if write was inconsistent

View File

@ -20,6 +20,15 @@ mod tests {
use super::super::service::*;
use nanoipc;
use std::sync::Arc;
use std::io::{Write, Read};
fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) {
let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap();
socket.write(buf).unwrap();
(socket, endpoint)
}
fn init_worker(addr: &str) -> nanoipc::Worker<Service> {
let mut worker = nanoipc::Worker::<Service>::new(Arc::new(Service::new()));
@ -35,13 +44,12 @@ mod tests {
#[test]
fn can_call_handshake() {
let url = "ipc:///tmp/parity-test-nano-20.ipc";
let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
let url = "ipc:///tmp/parity-test-nano-20.ipc";
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) {
@ -59,4 +67,42 @@ mod tests {
assert!(hs.is_ok());
}
#[test]
fn can_receive_dummy_writes_in_thread() {
let url = "ipc:///tmp/parity-test-nano-30.ipc";
let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
});
while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { }
let (mut _s, _e) = dummy_write(url, &vec![0, 0,
// protocol version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// api version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// reserved
0, 0, 0, 0, 0, 0, 0, 64,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]);
let mut buf = vec![0u8;1];
let result = _s.read(&mut buf);
assert_eq!(1, buf.len());
assert_eq!(1, buf[0]);
worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed);
}
}