Merge pull request #933 from ethcore/ipc-prs-client

IPC persistent client link
This commit is contained in:
Nikolay Volf 2016-04-12 15:13:06 +03:00
commit f1f81777cc
9 changed files with 188 additions and 18 deletions

View File

@ -303,6 +303,7 @@ fn push_client(
{
push_client_struct(cx, builder, item, push);
push_client_implementation(cx, builder, dispatches, item, push);
push_with_socket_client_implementation(cx, builder, item, push);
}
/// returns an expression with the body for single operation that is being sent to server
@ -485,6 +486,25 @@ fn implement_client_method(
signature.unwrap()
}
fn push_with_socket_client_implementation(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
item: &Item,
push: &mut FnMut(Annotatable))
{
let client_ident = builder.id(format!("{}Client", item.ident.name.as_str()));
let implement = quote_item!(cx,
impl<S> ::ipc::WithSocket<S> for $client_ident<S> where S: ::ipc::IpcSocket {
fn init(socket: S) -> $client_ident<S> {
$client_ident {
socket: ::std::cell::RefCell::new(socket),
phantom: ::std::marker::PhantomData,
}
}
}).unwrap();
push(Annotatable::Item(implement));
}
/// pushes full client side code for the original class exposed via ipc
fn push_client_implementation(
cx: &ExtCtxt,
@ -502,18 +522,11 @@ fn push_client_implementation(
let item_ident = builder.id(format!("{}", item.ident.name.as_str()));
let implement = quote_item!(cx,
impl<S> $client_ident<S> where S: ::ipc::IpcSocket {
pub fn new(socket: S) -> $client_ident<S> {
$client_ident {
socket: ::std::cell::RefCell::new(socket),
phantom: ::std::marker::PhantomData,
}
}
pub fn handshake(&self) -> Result<(), ::ipc::Error> {
let payload = BinHandshake {
protocol_version: $item_ident::protocol_version().to_string(),
api_version: $item_ident::api_version().to_string(),
_reserved: vec![0u8, 64],
_reserved: vec![0u8; 64],
};
let mut socket_ref = self.socket.borrow_mut();

View File

@ -20,10 +20,11 @@ extern crate ethcore_ipc as ipc;
extern crate nanomsg;
#[macro_use] extern crate log;
pub use ipc::*;
pub use ipc::{WithSocket, IpcInterface, IpcConfig};
use std::sync::*;
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
use std::ops::Deref;
const POLL_TIMEOUT: isize = 100;
@ -34,6 +35,36 @@ pub struct Worker<S> where S: IpcInterface<S> {
buf: Vec<u8>,
}
pub struct GuardedSocket<S> where S: WithSocket<Socket> {
client: Arc<S>,
_endpoint: Endpoint,
}
impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
type Target = S;
fn deref(&self) -> &S {
&self.client
}
}
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::DuplexLink
}));
let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::DuplexLink
}));
Ok(GuardedSocket {
client: Arc::new(S::init(socket)),
_endpoint: endpoint,
})
}
#[derive(Debug)]
pub enum SocketError {
DuplexLink
@ -60,6 +91,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
match socket.nb_read_to_end(&mut self.buf) {
Ok(method_sign_len) => {
if method_sign_len >= 2 {
// method_num
let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16;
// payload
@ -113,7 +145,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
}
#[cfg(test)]
mod tests {
mod service_tests {
use super::Worker;
use ipc::*;
@ -150,10 +182,11 @@ mod tests {
}
}
impl IpcConfig for DummyService {}
fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) {
let mut socket = Socket::new(Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap();
//thread::sleep_ms(10);
socket.write(buf).unwrap();
(socket, endpoint)
}

View File

@ -9,3 +9,4 @@ license = "GPL-3.0"
[dependencies]
ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0"
nanomsg = "0.5.0"

View File

@ -18,7 +18,6 @@
use std::io::{Read, Write};
use std::marker::Sync;
use std::sync::atomic::*;
use semver::Version;
pub struct Handshake {
@ -49,7 +48,7 @@ pub enum Error {
HandshakeFailed,
}
pub trait IpcInterface<T> where T: IpcConfig {
pub trait IpcInterface<T>: IpcConfig {
/// reads the message from io, dispatches the call and returns serialized result
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
@ -72,6 +71,7 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
if params.is_some() {
buf[2..buf_len].clone_from_slice(params.as_ref().unwrap());
}
if w.write(&buf).unwrap() != buf_len
{
// if write was inconsistent
@ -83,5 +83,12 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
pub trait IpcSocket: Read + Write + Sync {
}
impl IpcSocket for ::devtools::TestSocket {
pub trait WithSocket<S: IpcSocket> {
fn init(socket: S) -> Self;
}
impl IpcSocket for ::devtools::TestSocket {}
impl IpcSocket for ::nanomsg::Socket {}

View File

@ -18,6 +18,7 @@
extern crate ethcore_devtools as devtools;
extern crate semver;
extern crate nanomsg;
pub mod interface;
pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error};
pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error, WithSocket};

View File

@ -13,6 +13,9 @@ bincode = "*"
serde = "0.7.0"
ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0"
nanomsg = "0.5.0"
ethcore-ipc-nano = { path = "../nano" }
[build-dependencies]
syntex = "0.30.0"

View File

@ -63,7 +63,7 @@ mod tests {
fn call_service_client() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket);
let service_client = ServiceClient::init(socket);
let result = service_client.commit(5);
@ -75,7 +75,7 @@ mod tests {
fn call_service_client_optional() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket);
let service_client = ServiceClient::init(socket);
let result = service_client.rollback(Some(5), 10);
@ -95,7 +95,7 @@ mod tests {
fn call_service_client_handshake() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![1];
let service_client = ServiceClient::new(socket);
let service_client = ServiceClient::init(socket);
let result = service_client.handshake();

109
ipc/tests/over_nano.rs Normal file
View File

@ -0,0 +1,109 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
#[cfg(test)]
mod tests {
use super::super::service::*;
use nanoipc;
use std::sync::Arc;
use std::io::{Write, Read};
use std::sync::atomic::{Ordering, AtomicBool};
fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) {
let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap();
socket.write(buf).unwrap();
(socket, endpoint)
}
fn init_worker(addr: &str) -> nanoipc::Worker<Service> {
let mut worker = nanoipc::Worker::<Service>::new(Arc::new(Service::new()));
worker.add_duplex(addr).unwrap();
worker
}
#[test]
fn can_create_client() {
let client = nanoipc::init_client::<ServiceClient<_>>("ipc:///tmp/parity-nano-test10.ipc");
assert!(client.is_ok());
}
#[test]
fn can_call_handshake() {
let url = "ipc:///tmp/parity-test-nano-20.ipc";
let worker_should_exit = Arc::new(AtomicBool::new(false));
let worker_is_ready = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed);
}
});
while !worker_is_ready.load(Ordering::Relaxed) { }
let client = nanoipc::init_client::<ServiceClient<_>>(url).unwrap();
let hs = client.handshake();
worker_should_exit.store(true, Ordering::Relaxed);
assert!(hs.is_ok());
}
#[test]
fn can_receive_dummy_writes_in_thread() {
let url = "ipc:///tmp/parity-test-nano-30.ipc";
let worker_should_exit = Arc::new(AtomicBool::new(false));
let worker_is_ready = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed);
}
});
while !worker_is_ready.load(Ordering::Relaxed) { }
let (mut _s, _e) = dummy_write(url, &vec![0, 0,
// protocol version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// api version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// reserved
0, 0, 0, 0, 0, 0, 0, 64,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]);
let mut buf = vec![0u8;1];
_s.read(&mut buf).unwrap();
assert_eq!(1, buf.len());
assert_eq!(1, buf[0]);
worker_should_exit.store(true, Ordering::Relaxed);
}
}

View File

@ -19,6 +19,9 @@ extern crate ethcore_ipc as ipc;
extern crate serde;
extern crate ethcore_devtools as devtools;
extern crate semver;
extern crate nanomsg;
extern crate ethcore_ipc_nano as nanoipc;
pub mod service;
mod examples;
mod over_nano;