diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index c73bed22f..671e067b3 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -303,6 +303,7 @@ fn push_client( { push_client_struct(cx, builder, item, push); push_client_implementation(cx, builder, dispatches, item, push); + push_with_socket_client_implementation(cx, builder, item, push); } /// returns an expression with the body for single operation that is being sent to server @@ -485,6 +486,25 @@ fn implement_client_method( signature.unwrap() } +fn push_with_socket_client_implementation( + cx: &ExtCtxt, + builder: &aster::AstBuilder, + item: &Item, + push: &mut FnMut(Annotatable)) +{ + let client_ident = builder.id(format!("{}Client", item.ident.name.as_str())); + let implement = quote_item!(cx, + impl ::ipc::WithSocket for $client_ident where S: ::ipc::IpcSocket { + fn init(socket: S) -> $client_ident { + $client_ident { + socket: ::std::cell::RefCell::new(socket), + phantom: ::std::marker::PhantomData, + } + } + }).unwrap(); + push(Annotatable::Item(implement)); +} + /// pushes full client side code for the original class exposed via ipc fn push_client_implementation( cx: &ExtCtxt, @@ -502,18 +522,11 @@ fn push_client_implementation( let item_ident = builder.id(format!("{}", item.ident.name.as_str())); let implement = quote_item!(cx, impl $client_ident where S: ::ipc::IpcSocket { - pub fn new(socket: S) -> $client_ident { - $client_ident { - socket: ::std::cell::RefCell::new(socket), - phantom: ::std::marker::PhantomData, - } - } - pub fn handshake(&self) -> Result<(), ::ipc::Error> { let payload = BinHandshake { protocol_version: $item_ident::protocol_version().to_string(), api_version: $item_ident::api_version().to_string(), - _reserved: vec![0u8, 64], + _reserved: vec![0u8; 64], }; let mut socket_ref = self.socket.borrow_mut(); diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 6a0a3d4bf..9d74325fa 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -20,10 +20,11 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; -pub use ipc::*; +pub use ipc::{WithSocket, IpcInterface, IpcConfig}; use std::sync::*; use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; +use std::ops::Deref; const POLL_TIMEOUT: isize = 100; @@ -34,6 +35,36 @@ pub struct Worker where S: IpcInterface { buf: Vec, } +pub struct GuardedSocket where S: WithSocket { + client: Arc, + _endpoint: Endpoint, +} + +impl Deref for GuardedSocket where S: WithSocket { + type Target = S; + + fn deref(&self) -> &S { + &self.client + } +} + +pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { + warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); + SocketError::DuplexLink + })); + + let endpoint = try!(socket.connect(socket_addr).map_err(|e| { + warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); + SocketError::DuplexLink + })); + + Ok(GuardedSocket { + client: Arc::new(S::init(socket)), + _endpoint: endpoint, + }) +} + #[derive(Debug)] pub enum SocketError { DuplexLink @@ -60,6 +91,7 @@ impl Worker where S: IpcInterface { match socket.nb_read_to_end(&mut self.buf) { Ok(method_sign_len) => { if method_sign_len >= 2 { + // method_num let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16; // payload @@ -113,7 +145,7 @@ impl Worker where S: IpcInterface { } #[cfg(test)] -mod tests { +mod service_tests { use super::Worker; use ipc::*; @@ -150,10 +182,11 @@ mod tests { } } + impl IpcConfig for DummyService {} + fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) { let mut socket = Socket::new(Protocol::Pair).unwrap(); let endpoint = socket.connect(addr).unwrap(); - //thread::sleep_ms(10); socket.write(buf).unwrap(); (socket, endpoint) } diff --git a/ipc/rpc/Cargo.toml b/ipc/rpc/Cargo.toml index b5177db41..a6346bbf9 100644 --- a/ipc/rpc/Cargo.toml +++ b/ipc/rpc/Cargo.toml @@ -9,3 +9,4 @@ license = "GPL-3.0" [dependencies] ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" +nanomsg = "0.5.0" diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index dd52ce319..e1a0be2d5 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -18,7 +18,6 @@ use std::io::{Read, Write}; use std::marker::Sync; -use std::sync::atomic::*; use semver::Version; pub struct Handshake { @@ -49,7 +48,7 @@ pub enum Error { HandshakeFailed, } -pub trait IpcInterface where T: IpcConfig { +pub trait IpcInterface: IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; @@ -72,6 +71,7 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: if params.is_some() { buf[2..buf_len].clone_from_slice(params.as_ref().unwrap()); } + if w.write(&buf).unwrap() != buf_len { // if write was inconsistent @@ -83,5 +83,12 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: pub trait IpcSocket: Read + Write + Sync { } -impl IpcSocket for ::devtools::TestSocket { + +pub trait WithSocket { + fn init(socket: S) -> Self; } + + +impl IpcSocket for ::devtools::TestSocket {} + +impl IpcSocket for ::nanomsg::Socket {} diff --git a/ipc/rpc/src/lib.rs b/ipc/rpc/src/lib.rs index f0083e66e..c2165b6e5 100644 --- a/ipc/rpc/src/lib.rs +++ b/ipc/rpc/src/lib.rs @@ -18,6 +18,7 @@ extern crate ethcore_devtools as devtools; extern crate semver; +extern crate nanomsg; pub mod interface; -pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error}; +pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error, WithSocket}; diff --git a/ipc/tests/Cargo.toml b/ipc/tests/Cargo.toml index d6d066b93..035de644c 100644 --- a/ipc/tests/Cargo.toml +++ b/ipc/tests/Cargo.toml @@ -13,6 +13,9 @@ bincode = "*" serde = "0.7.0" ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" +nanomsg = "0.5.0" +ethcore-ipc-nano = { path = "../nano" } + [build-dependencies] syntex = "0.30.0" diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 931021d73..9d5eb37c4 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -63,7 +63,7 @@ mod tests { fn call_service_client() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.commit(5); @@ -75,7 +75,7 @@ mod tests { fn call_service_client_optional() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.rollback(Some(5), 10); @@ -95,7 +95,7 @@ mod tests { fn call_service_client_handshake() { let mut socket = TestSocket::new(); socket.read_buffer = vec![1]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.handshake(); diff --git a/ipc/tests/over_nano.rs b/ipc/tests/over_nano.rs new file mode 100644 index 000000000..720dff81d --- /dev/null +++ b/ipc/tests/over_nano.rs @@ -0,0 +1,109 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#[cfg(test)] +mod tests { + + use super::super::service::*; + use nanoipc; + use std::sync::Arc; + use std::io::{Write, Read}; + use std::sync::atomic::{Ordering, AtomicBool}; + + fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) { + let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap(); + let endpoint = socket.connect(addr).unwrap(); + socket.write(buf).unwrap(); + (socket, endpoint) + } + + + fn init_worker(addr: &str) -> nanoipc::Worker { + let mut worker = nanoipc::Worker::::new(Arc::new(Service::new())); + worker.add_duplex(addr).unwrap(); + worker + } + + #[test] + fn can_create_client() { + let client = nanoipc::init_client::>("ipc:///tmp/parity-nano-test10.ipc"); + assert!(client.is_ok()); + } + + #[test] + fn can_call_handshake() { + let url = "ipc:///tmp/parity-test-nano-20.ipc"; + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, Ordering::Relaxed); + } + }); + + while !worker_is_ready.load(Ordering::Relaxed) { } + let client = nanoipc::init_client::>(url).unwrap(); + + let hs = client.handshake(); + + worker_should_exit.store(true, Ordering::Relaxed); + assert!(hs.is_ok()); + } + + #[test] + fn can_receive_dummy_writes_in_thread() { + let url = "ipc:///tmp/parity-test-nano-30.ipc"; + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, Ordering::Relaxed); + } + }); + while !worker_is_ready.load(Ordering::Relaxed) { } + + let (mut _s, _e) = dummy_write(url, &vec![0, 0, + // protocol version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // api version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // reserved + 0, 0, 0, 0, 0, 0, 0, 64, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]); + + let mut buf = vec![0u8;1]; + _s.read(&mut buf).unwrap(); + assert_eq!(1, buf.len()); + assert_eq!(1, buf[0]); + + worker_should_exit.store(true, Ordering::Relaxed); + } + +} diff --git a/ipc/tests/run.rs b/ipc/tests/run.rs index e464d186f..d3eb15287 100644 --- a/ipc/tests/run.rs +++ b/ipc/tests/run.rs @@ -19,6 +19,9 @@ extern crate ethcore_ipc as ipc; extern crate serde; extern crate ethcore_devtools as devtools; extern crate semver; +extern crate nanomsg; +extern crate ethcore_ipc_nano as nanoipc; pub mod service; mod examples; +mod over_nano;