From c0e7b859d76b89da22d4e48c8dda0997aad76726 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 10:18:39 +0300 Subject: [PATCH] client spawner --- ipc/codegen/src/codegen.rs | 27 ++++++++++++++++++++------- ipc/nano/src/lib.rs | 37 +++++++++++++++++++++++++++++++++++-- ipc/rpc/Cargo.toml | 1 + ipc/rpc/src/interface.rs | 12 +++++++++--- ipc/rpc/src/lib.rs | 3 ++- ipc/tests/Cargo.toml | 1 + ipc/tests/examples.rs | 6 +++--- ipc/tests/run.rs | 1 + 8 files changed, 72 insertions(+), 16 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index c73bed22f..871771d1d 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -303,6 +303,7 @@ fn push_client( { push_client_struct(cx, builder, item, push); push_client_implementation(cx, builder, dispatches, item, push); + push_with_socket_client_implementation(cx, builder, item, push); } /// returns an expression with the body for single operation that is being sent to server @@ -485,6 +486,25 @@ fn implement_client_method( signature.unwrap() } +fn push_with_socket_client_implementation( + cx: &ExtCtxt, + builder: &aster::AstBuilder, + item: &Item, + push: &mut FnMut(Annotatable)) +{ + let client_ident = builder.id(format!("{}Client", item.ident.name.as_str())); + let implement = quote_item!(cx, + impl ::ipc::WithSocket for $client_ident where S: ::ipc::IpcSocket { + fn init(socket: S) -> $client_ident { + $client_ident { + socket: ::std::cell::RefCell::new(socket), + phantom: ::std::marker::PhantomData, + } + } + }).unwrap(); + push(Annotatable::Item(implement)); +} + /// pushes full client side code for the original class exposed via ipc fn push_client_implementation( cx: &ExtCtxt, @@ -502,13 +522,6 @@ fn push_client_implementation( let item_ident = builder.id(format!("{}", item.ident.name.as_str())); let implement = quote_item!(cx, impl $client_ident where S: ::ipc::IpcSocket { - pub fn new(socket: S) -> $client_ident { - $client_ident { - socket: ::std::cell::RefCell::new(socket), - phantom: ::std::marker::PhantomData, - } - } - pub fn handshake(&self) -> Result<(), ::ipc::Error> { let payload = BinHandshake { protocol_version: $item_ident::protocol_version().to_string(), diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 6a0a3d4bf..4362448d5 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -20,10 +20,11 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; -pub use ipc::*; +pub use ipc::{WithSocket, IpcInterface, IpcConfig}; use std::sync::*; use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; +use std::ops::Deref; const POLL_TIMEOUT: isize = 100; @@ -34,6 +35,36 @@ pub struct Worker where S: IpcInterface { buf: Vec, } +pub struct GuardedSocket where S: WithSocket { + client: Arc, + _endpoint: Endpoint, +} + +impl Deref for GuardedSocket where S: WithSocket { + type Target = S; + + fn deref(&self) -> &S { + &self.client + } +} + +pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { + warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); + SocketError::DuplexLink + })); + + let endpoint = try!(socket.bind(socket_addr).map_err(|e| { + warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); + SocketError::DuplexLink + })); + + Ok(GuardedSocket { + client: Arc::new(S::init(socket)), + _endpoint: endpoint, + }) +} + #[derive(Debug)] pub enum SocketError { DuplexLink @@ -113,7 +144,7 @@ impl Worker where S: IpcInterface { } #[cfg(test)] -mod tests { +mod service_tests { use super::Worker; use ipc::*; @@ -150,6 +181,8 @@ mod tests { } } + impl IpcConfig for DummyService {} + fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) { let mut socket = Socket::new(Protocol::Pair).unwrap(); let endpoint = socket.connect(addr).unwrap(); diff --git a/ipc/rpc/Cargo.toml b/ipc/rpc/Cargo.toml index b5177db41..a6346bbf9 100644 --- a/ipc/rpc/Cargo.toml +++ b/ipc/rpc/Cargo.toml @@ -9,3 +9,4 @@ license = "GPL-3.0" [dependencies] ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" +nanomsg = "0.5.0" diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 247b5339b..1c9b78703 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -18,7 +18,6 @@ use std::io::{Read, Write}; use std::marker::Sync; -use std::sync::atomic::*; use semver::Version; pub struct Handshake { @@ -47,7 +46,7 @@ pub enum Error { HandshakeFailed, } -pub trait IpcInterface where T: IpcConfig { +pub trait IpcInterface:IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; @@ -81,5 +80,12 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: pub trait IpcSocket: Read + Write + Sync { } -impl IpcSocket for ::devtools::TestSocket { + +pub trait WithSocket { + fn init(socket: S) -> Self; } + + +impl IpcSocket for ::devtools::TestSocket {} + +impl IpcSocket for ::nanomsg::Socket {} diff --git a/ipc/rpc/src/lib.rs b/ipc/rpc/src/lib.rs index f0083e66e..c2165b6e5 100644 --- a/ipc/rpc/src/lib.rs +++ b/ipc/rpc/src/lib.rs @@ -18,6 +18,7 @@ extern crate ethcore_devtools as devtools; extern crate semver; +extern crate nanomsg; pub mod interface; -pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error}; +pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error, WithSocket}; diff --git a/ipc/tests/Cargo.toml b/ipc/tests/Cargo.toml index d6d066b93..931bf8061 100644 --- a/ipc/tests/Cargo.toml +++ b/ipc/tests/Cargo.toml @@ -13,6 +13,7 @@ bincode = "*" serde = "0.7.0" ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" +nanomsg = "0.5.0" [build-dependencies] syntex = "0.30.0" diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 931021d73..9d5eb37c4 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -63,7 +63,7 @@ mod tests { fn call_service_client() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.commit(5); @@ -75,7 +75,7 @@ mod tests { fn call_service_client_optional() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.rollback(Some(5), 10); @@ -95,7 +95,7 @@ mod tests { fn call_service_client_handshake() { let mut socket = TestSocket::new(); socket.read_buffer = vec![1]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.handshake(); diff --git a/ipc/tests/run.rs b/ipc/tests/run.rs index e464d186f..0b86fe368 100644 --- a/ipc/tests/run.rs +++ b/ipc/tests/run.rs @@ -19,6 +19,7 @@ extern crate ethcore_ipc as ipc; extern crate serde; extern crate ethcore_devtools as devtools; extern crate semver; +extern crate nanomsg; pub mod service; mod examples;