client spawner

This commit is contained in:
NikVolf 2016-04-12 10:18:39 +03:00
parent 5609b555d2
commit 40e0d370c2
8 changed files with 72 additions and 16 deletions

View File

@ -303,6 +303,7 @@ fn push_client(
{ {
push_client_struct(cx, builder, item, push); push_client_struct(cx, builder, item, push);
push_client_implementation(cx, builder, dispatches, item, push); push_client_implementation(cx, builder, dispatches, item, push);
push_with_socket_client_implementation(cx, builder, item, push);
} }
/// returns an expression with the body for single operation that is being sent to server /// returns an expression with the body for single operation that is being sent to server
@ -485,6 +486,25 @@ fn implement_client_method(
signature.unwrap() signature.unwrap()
} }
fn push_with_socket_client_implementation(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
item: &Item,
push: &mut FnMut(Annotatable))
{
let client_ident = builder.id(format!("{}Client", item.ident.name.as_str()));
let implement = quote_item!(cx,
impl<S> ::ipc::WithSocket<S> for $client_ident<S> where S: ::ipc::IpcSocket {
fn init(socket: S) -> $client_ident<S> {
$client_ident {
socket: ::std::cell::RefCell::new(socket),
phantom: ::std::marker::PhantomData,
}
}
}).unwrap();
push(Annotatable::Item(implement));
}
/// pushes full client side code for the original class exposed via ipc /// pushes full client side code for the original class exposed via ipc
fn push_client_implementation( fn push_client_implementation(
cx: &ExtCtxt, cx: &ExtCtxt,
@ -502,13 +522,6 @@ fn push_client_implementation(
let item_ident = builder.id(format!("{}", item.ident.name.as_str())); let item_ident = builder.id(format!("{}", item.ident.name.as_str()));
let implement = quote_item!(cx, let implement = quote_item!(cx,
impl<S> $client_ident<S> where S: ::ipc::IpcSocket { impl<S> $client_ident<S> where S: ::ipc::IpcSocket {
pub fn new(socket: S) -> $client_ident<S> {
$client_ident {
socket: ::std::cell::RefCell::new(socket),
phantom: ::std::marker::PhantomData,
}
}
pub fn handshake(&self) -> Result<(), ::ipc::Error> { pub fn handshake(&self) -> Result<(), ::ipc::Error> {
let payload = BinHandshake { let payload = BinHandshake {
protocol_version: $item_ident::protocol_version().to_string(), protocol_version: $item_ident::protocol_version().to_string(),

View File

@ -20,10 +20,11 @@ extern crate ethcore_ipc as ipc;
extern crate nanomsg; extern crate nanomsg;
#[macro_use] extern crate log; #[macro_use] extern crate log;
pub use ipc::*; pub use ipc::{WithSocket, IpcInterface, IpcConfig};
use std::sync::*; use std::sync::*;
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
use std::ops::Deref;
const POLL_TIMEOUT: isize = 100; const POLL_TIMEOUT: isize = 100;
@ -34,6 +35,36 @@ pub struct Worker<S> where S: IpcInterface<S> {
buf: Vec<u8>, buf: Vec<u8>,
} }
pub struct GuardedSocket<S> where S: WithSocket<Socket> {
client: Arc<S>,
_endpoint: Endpoint,
}
impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
type Target = S;
fn deref(&self) -> &S {
&self.client
}
}
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::DuplexLink
}));
let endpoint = try!(socket.bind(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::DuplexLink
}));
Ok(GuardedSocket {
client: Arc::new(S::init(socket)),
_endpoint: endpoint,
})
}
#[derive(Debug)] #[derive(Debug)]
pub enum SocketError { pub enum SocketError {
DuplexLink DuplexLink
@ -113,7 +144,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod service_tests {
use super::Worker; use super::Worker;
use ipc::*; use ipc::*;
@ -150,6 +181,8 @@ mod tests {
} }
} }
impl IpcConfig for DummyService {}
fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) { fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) {
let mut socket = Socket::new(Protocol::Pair).unwrap(); let mut socket = Socket::new(Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap(); let endpoint = socket.connect(addr).unwrap();

View File

@ -9,3 +9,4 @@ license = "GPL-3.0"
[dependencies] [dependencies]
ethcore-devtools = { path = "../../devtools" } ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0" semver = "0.2.0"
nanomsg = "0.5.0"

View File

@ -18,7 +18,6 @@
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::marker::Sync; use std::marker::Sync;
use std::sync::atomic::*;
use semver::Version; use semver::Version;
pub struct Handshake { pub struct Handshake {
@ -47,7 +46,7 @@ pub enum Error {
HandshakeFailed, HandshakeFailed,
} }
pub trait IpcInterface<T> where T: IpcConfig { pub trait IpcInterface<T>:IpcConfig {
/// reads the message from io, dispatches the call and returns serialized result /// reads the message from io, dispatches the call and returns serialized result
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read; fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
@ -81,5 +80,12 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
pub trait IpcSocket: Read + Write + Sync { pub trait IpcSocket: Read + Write + Sync {
} }
impl IpcSocket for ::devtools::TestSocket {
pub trait WithSocket<S: IpcSocket> {
fn init(socket: S) -> Self;
} }
impl IpcSocket for ::devtools::TestSocket {}
impl IpcSocket for ::nanomsg::Socket {}

View File

@ -18,6 +18,7 @@
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;
extern crate semver; extern crate semver;
extern crate nanomsg;
pub mod interface; pub mod interface;
pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error}; pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error, WithSocket};

View File

@ -13,6 +13,7 @@ bincode = "*"
serde = "0.7.0" serde = "0.7.0"
ethcore-devtools = { path = "../../devtools" } ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0" semver = "0.2.0"
nanomsg = "0.5.0"
[build-dependencies] [build-dependencies]
syntex = "0.30.0" syntex = "0.30.0"

View File

@ -63,7 +63,7 @@ mod tests {
fn call_service_client() { fn call_service_client() {
let mut socket = TestSocket::new(); let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10]; socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket); let service_client = ServiceClient::init(socket);
let result = service_client.commit(5); let result = service_client.commit(5);
@ -75,7 +75,7 @@ mod tests {
fn call_service_client_optional() { fn call_service_client_optional() {
let mut socket = TestSocket::new(); let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10]; socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket); let service_client = ServiceClient::init(socket);
let result = service_client.rollback(Some(5), 10); let result = service_client.rollback(Some(5), 10);
@ -95,7 +95,7 @@ mod tests {
fn call_service_client_handshake() { fn call_service_client_handshake() {
let mut socket = TestSocket::new(); let mut socket = TestSocket::new();
socket.read_buffer = vec![1]; socket.read_buffer = vec![1];
let service_client = ServiceClient::new(socket); let service_client = ServiceClient::init(socket);
let result = service_client.handshake(); let result = service_client.handshake();

View File

@ -19,6 +19,7 @@ extern crate ethcore_ipc as ipc;
extern crate serde; extern crate serde;
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;
extern crate semver; extern crate semver;
extern crate nanomsg;
pub mod service; pub mod service;
mod examples; mod examples;