diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 871771d1d..671e067b3 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -526,7 +526,7 @@ fn push_client_implementation( let payload = BinHandshake { protocol_version: $item_ident::protocol_version().to_string(), api_version: $item_ident::api_version().to_string(), - _reserved: vec![0u8, 64], + _reserved: vec![0u8; 64], }; let mut socket_ref = self.socket.borrow_mut(); diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 41f17018e..9d74325fa 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -54,7 +54,7 @@ pub fn init_client(socket_addr: &str) -> Result, SocketError SocketError::DuplexLink })); - let endpoint = try!(socket.bind(socket_addr).map_err(|e| { + let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); SocketError::DuplexLink })); diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 1c9b78703..5c0d57612 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -69,6 +69,7 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: if params.is_some() { buf[2..buf_len].clone_from_slice(params.as_ref().unwrap()); } + if w.write(&buf).unwrap() != buf_len { // if write was inconsistent diff --git a/ipc/tests/over_nano.rs b/ipc/tests/over_nano.rs index 56ea94d61..731163638 100644 --- a/ipc/tests/over_nano.rs +++ b/ipc/tests/over_nano.rs @@ -20,6 +20,15 @@ mod tests { use super::super::service::*; use nanoipc; use std::sync::Arc; + use std::io::{Write, Read}; + + fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) { + let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap(); + let endpoint = socket.connect(addr).unwrap(); + socket.write(buf).unwrap(); + (socket, endpoint) + } + fn init_worker(addr: &str) -> nanoipc::Worker { let mut worker = nanoipc::Worker::::new(Arc::new(Service::new())); @@ -35,13 +44,12 @@ mod tests { #[test] fn can_call_handshake() { + let url = "ipc:///tmp/parity-test-nano-20.ipc"; let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); let c_worker_should_exit = worker_should_exit.clone(); let c_worker_is_ready = worker_is_ready.clone(); - let url = "ipc:///tmp/parity-test-nano-20.ipc"; - ::std::thread::spawn(move || { let mut worker = init_worker(url); while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { @@ -59,4 +67,42 @@ mod tests { assert!(hs.is_ok()); } + #[test] + fn can_receive_dummy_writes_in_thread() { + let url = "ipc:///tmp/parity-test-nano-30.ipc"; + let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed); + } + }); + while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { } + + let (mut _s, _e) = dummy_write(url, &vec![0, 0, + // protocol version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // api version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // reserved + 0, 0, 0, 0, 0, 0, 0, 64, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]); + + let mut buf = vec![0u8;1]; + let result = _s.read(&mut buf); + assert_eq!(1, buf.len()); + assert_eq!(1, buf[0]); + + worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed); + } + }