From a232aabda3dcb5e46e468fef90940c49ef42e9b0 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 11:53:41 +0300 Subject: [PATCH 01/15] some docs --- ipc/nano/src/lib.rs | 13 +++++++++++++ ipc/rpc/src/interface.rs | 15 +++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 9d74325fa..18a5b0ffe 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -28,6 +28,7 @@ use std::ops::Deref; const POLL_TIMEOUT: isize = 100; +/// Generic worker to handle service (binded) sockets pub struct Worker where S: IpcInterface { service: Arc, sockets: Vec<(Socket, Endpoint)>, @@ -35,6 +36,8 @@ pub struct Worker where S: IpcInterface { buf: Vec, } +/// struct for guarding `_endpoint` (so that it wont drop) +/// derefs to client `S` pub struct GuardedSocket where S: WithSocket { client: Arc, _endpoint: Endpoint, @@ -48,6 +51,9 @@ impl Deref for GuardedSocket where S: WithSocket { } } +/// Spawns client <`S`> over specified address +/// creates socket and connects endpoint to it +/// for duplex (paired) connections with the service pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); @@ -65,12 +71,15 @@ pub fn init_client(socket_addr: &str) -> Result, SocketError }) } +/// Error occured while establising socket or endpoint #[derive(Debug)] pub enum SocketError { + /// Error establising duplex (paired) socket and/or endpoint DuplexLink } impl Worker where S: IpcInterface { + /// New worker over specified `service` pub fn new(service: Arc) -> Worker { Worker:: { service: service.clone(), @@ -80,6 +89,7 @@ impl Worker where S: IpcInterface { } } + /// Polls all sockets, reads and dispatches method invocations pub fn poll(&mut self) { let mut request = PollRequest::new(&mut self.polls[..]); let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); @@ -119,12 +129,15 @@ impl Worker where S: IpcInterface { } } + /// Stores nanomsg poll request for reuse fn rebuild_poll_request(&mut self) { self.polls = self.sockets.iter() .map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In)) .collect::>(); } + /// Add exclusive socket for paired client + /// Only one connection over this address is allowed pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> { let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 5c0d57612..077683d5b 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -20,24 +20,33 @@ use std::io::{Read, Write}; use std::marker::Sync; use semver::Version; +/// Handshake for client and server to negotiate api/protocol version pub struct Handshake { pub protocol_version: Version, pub api_version: Version, } +/// Allows to configure custom version and custom handshake response for +/// ipc host pub trait IpcConfig { + /// Current service api version + /// Should be increased if any of the methods changes signature fn api_version() -> Version { Version::parse("1.0.0").unwrap() } + /// Current ipc protocol version + /// Should be increased only if signature of system methods changes fn protocol_version() -> Version { Version::parse("1.0.0").unwrap() } + /// Default handshake requires exact versions match fn handshake(handshake: &Handshake) -> bool { handshake.protocol_version == Self::protocol_version() && handshake.api_version == Self::api_version() } } +/// Error in dispatching or invoking methods via IPC #[derive(Debug)] pub enum Error { UnkownSystemCall, @@ -46,6 +55,8 @@ pub enum Error { HandshakeFailed, } +/// Allows implementor to be attached to generic worker and dispatch rpc requests +/// over IPC pub trait IpcInterface:IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; @@ -77,11 +88,11 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: } } -/// IpcSocket +/// IpcSocket, read/write generalization pub trait IpcSocket: Read + Write + Sync { } - +/// Basically something that needs only socket to be spawned pub trait WithSocket { fn init(socket: S) -> Self; } From c351bcd5a2658818f8874530bb261fa6416737ea Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 7 Apr 2016 23:18:48 +0300 Subject: [PATCH 02/15] ipcconfig trait --- ipc/codegen/src/codegen.rs | 2 ++ ipc/rpc/Cargo.toml | 1 + ipc/rpc/src/interface.rs | 18 +++++++++++++++++- ipc/rpc/src/lib.rs | 3 ++- ipc/tests/Cargo.toml | 1 + ipc/tests/examples.rs | 9 +++++++++ ipc/tests/run.rs | 1 + ipc/tests/service.rs.in | 2 ++ 8 files changed, 35 insertions(+), 2 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 5385c72d7..47fd9a1ba 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -36,6 +36,8 @@ use syntax::ptr::P; pub struct Error; +const RESERVED_MESSAGE_IDS: u16 = 16; + pub fn expand_ipc_implementation( cx: &mut ExtCtxt, span: Span, diff --git a/ipc/rpc/Cargo.toml b/ipc/rpc/Cargo.toml index 99fcd3233..b5177db41 100644 --- a/ipc/rpc/Cargo.toml +++ b/ipc/rpc/Cargo.toml @@ -8,3 +8,4 @@ license = "GPL-3.0" [dependencies] ethcore-devtools = { path = "../../devtools" } +semver = "0.2.0" diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 7ed8b60c4..060da36a9 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -19,8 +19,24 @@ use std::io::{Read, Write}; use std::marker::Sync; use std::sync::atomic::*; +use semver::Version; -pub trait IpcInterface { +pub struct Handshake { + protocol_version: Version, + api_version: Version, + reserved: [u8; 64], +} + +pub trait IpcConfig { + fn api_version() -> Version { + Version::parse("1.0.0").unwrap() + } + fn protocol_version() -> Version { + Version::parse("1.0.0").unwrap() + } +} + +pub trait IpcInterface where T: IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; diff --git a/ipc/rpc/src/lib.rs b/ipc/rpc/src/lib.rs index f8c68cdbd..a0d718871 100644 --- a/ipc/rpc/src/lib.rs +++ b/ipc/rpc/src/lib.rs @@ -17,6 +17,7 @@ //! IPC RPC interface extern crate ethcore_devtools as devtools; +extern crate semver; pub mod interface; -pub use interface::{IpcInterface, IpcSocket, invoke}; +pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig}; diff --git a/ipc/tests/Cargo.toml b/ipc/tests/Cargo.toml index 1d2bc35b4..d6d066b93 100644 --- a/ipc/tests/Cargo.toml +++ b/ipc/tests/Cargo.toml @@ -12,6 +12,7 @@ path = "run.rs" bincode = "*" serde = "0.7.0" ethcore-devtools = { path = "../../devtools" } +semver = "0.2.0" [build-dependencies] syntex = "0.30.0" diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 52ac0cf22..32c8206ed 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -20,6 +20,7 @@ mod tests { use super::super::service::*; use ipc::*; use devtools::*; + use semver::Version; #[test] fn call_service() { @@ -57,4 +58,12 @@ mod tests { assert_eq!(vec![0, 1, 1, 0, 0, 0, 5, 0, 0, 0, 10], service_client.socket().borrow().write_buffer.clone()); assert_eq!(10, result); } + + #[test] + fn query_default_version() { + let ver = Service::protocol_version(); + assert_eq!(ver, Version::parse("1.0.0").unwrap()); + let ver = Service::api_version(); + assert_eq!(ver, Version::parse("1.0.0").unwrap()); + } } diff --git a/ipc/tests/run.rs b/ipc/tests/run.rs index 7aac809ed..e464d186f 100644 --- a/ipc/tests/run.rs +++ b/ipc/tests/run.rs @@ -18,6 +18,7 @@ extern crate bincode; extern crate ethcore_ipc as ipc; extern crate serde; extern crate ethcore_devtools as devtools; +extern crate semver; pub mod service; mod examples; diff --git a/ipc/tests/service.rs.in b/ipc/tests/service.rs.in index 2b529534a..14de716da 100644 --- a/ipc/tests/service.rs.in +++ b/ipc/tests/service.rs.in @@ -45,3 +45,5 @@ impl Service { } } } + +impl ::ipc::IpcConfig for Service {} From f836e07fd31afbeeebe2a82f4ce5981242b0ed3a Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 7 Apr 2016 23:32:33 +0300 Subject: [PATCH 03/15] reserved message ids and little endian for client --- ipc/tests/examples.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 32c8206ed..202139d54 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -25,7 +25,7 @@ mod tests { #[test] fn call_service() { // method_num = 0, f = 10 (method Service::commit) - let mut socket = TestSocket::new_ready(vec![0, 0, 0, 0, 0, 10]); + let mut socket = TestSocket::new_ready(vec![0, 16, 0, 0, 0, 10]); let service = Service::new(); assert_eq!(0, *service.commits.read().unwrap()); @@ -43,7 +43,7 @@ mod tests { let result = service_client.commit(5); - assert_eq!(vec![0, 0, 0, 0, 0, 5], service_client.socket().borrow().write_buffer.clone()); + assert_eq!(vec![0, 16, 0, 0, 0, 5], service_client.socket().borrow().write_buffer.clone()); assert_eq!(10, result); } @@ -55,7 +55,7 @@ mod tests { let result = service_client.rollback(Some(5), 10); - assert_eq!(vec![0, 1, 1, 0, 0, 0, 5, 0, 0, 0, 10], service_client.socket().borrow().write_buffer.clone()); + assert_eq!(vec![0, 17, 1, 0, 0, 0, 5, 0, 0, 0, 10], service_client.socket().borrow().write_buffer.clone()); assert_eq!(10, result); } From c5dc281934a8ca01ac6a41eb6c1fafcdb51d7038 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 7 Apr 2016 23:32:41 +0300 Subject: [PATCH 04/15] reserved message ids and little endian for client --- ipc/codegen/src/codegen.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 47fd9a1ba..08944196f 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -249,7 +249,7 @@ fn implement_dispatch_arm( buffer: bool, ) -> ast::Arm { - let index_ident = builder.id(format!("{}", index).as_str()); + let index_ident = builder.id(format!("{}", index + (RESERVED_MESSAGE_IDS as u32)).as_str()); let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer); quote_arm!(cx, $index_ident => { $invoke_expr } ) } @@ -387,7 +387,7 @@ fn implement_client_method_body( request_serialization_statements.push( quote_stmt!(cx, let serialized_payload = ::bincode::serde::serialize(&payload, ::bincode::SizeLimit::Infinite).unwrap())); - let index_ident = builder.id(format!("{}", index).as_str()); + let index_ident = builder.id(format!("{}", index + RESERVED_MESSAGE_IDS).as_str()); request_serialization_statements.push( quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut socket))); @@ -559,7 +559,8 @@ fn implement_interface( Err(e) => { panic!("ipc read error: {:?}, aborting", e); } _ => { } } - match method_num[0] as u16 + (method_num[1] as u16)*256 { + // method_num is a 16-bit little-endian unsigned number + match method_num[1] as u16 + (method_num[0] as u16)*256 { $dispatch_arms _ => vec![] } From a6d140616b6353844c939b0dc615b381cebdbd0c Mon Sep 17 00:00:00 2001 From: NikVolf Date: Fri, 8 Apr 2016 14:07:25 +0300 Subject: [PATCH 05/15] server handshake --- ipc/codegen/src/codegen.rs | 26 +++++++++++++++++++++++++- ipc/rpc/src/interface.rs | 16 ++++++++++++---- ipc/rpc/src/lib.rs | 2 +- ipc/tests/service.rs.in | 1 + 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 08944196f..63f225100 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -13,7 +13,6 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . - use aster; use syntax::ast::{ @@ -61,10 +60,24 @@ pub fn expand_ipc_implementation( }; push_client(cx, &builder, &item, &dispatches, push); + push_handshake_struct(cx, push); push(Annotatable::Item(impl_item)) } +fn push_handshake_struct(cx: &ExtCtxt, push: &mut FnMut(Annotatable)) { + let handshake_item = quote_item!(cx, + #[derive(Serialize, Deserialize)] + pub struct BinHandshake { + api_version: String, + protocol_version: String, + _reserved: Vec, + } + ).unwrap(); + + push(Annotatable::Item(handshake_item)); +} + fn field_name(builder: &aster::AstBuilder, arg: &Arg) -> ast::Ident { match arg.pat.node { PatKind::Ident(_, ref ident, _) => builder.id(ident.node), @@ -548,6 +561,14 @@ fn implement_interface( let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false); let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true); + let handshake_arm = quote_arm!(&cx, 0 => { + let handshake_payload = ::bincode::serde::deserialize_from::<_, BinHandshake>(r, ::bincode::SizeLimit::Infinite).unwrap(); + ::bincode::serde::serialize::(&Self::handshake(&::ipc::Handshake { + api_version: ::semver::Version::parse(&handshake_payload.api_version).unwrap(), + protocol_version: ::semver::Version::parse(&handshake_payload.protocol_version).unwrap(), + }), ::bincode::SizeLimit::Infinite).unwrap() + }); + Ok((quote_item!(cx, impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause { fn dispatch(&self, r: &mut R) -> Vec @@ -561,6 +582,9 @@ fn implement_interface( } // method_num is a 16-bit little-endian unsigned number match method_num[1] as u16 + (method_num[0] as u16)*256 { + // handshake + $handshake_arm + // user methods $dispatch_arms _ => vec![] } diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 060da36a9..5b5a30b27 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -22,9 +22,8 @@ use std::sync::atomic::*; use semver::Version; pub struct Handshake { - protocol_version: Version, - api_version: Version, - reserved: [u8; 64], + pub protocol_version: Version, + pub api_version: Version, } pub trait IpcConfig { @@ -34,13 +33,22 @@ pub trait IpcConfig { fn protocol_version() -> Version { Version::parse("1.0.0").unwrap() } + fn handshake(handshake: &Handshake) -> bool { + handshake.protocol_version == Self::protocol_version() && + handshake.api_version == Self::api_version() + } +} + +pub enum Error { + UnkownSystemCall, + ClientUnsupported, } pub trait IpcInterface where T: IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; - /// deserialize the payload from buffer, dispatches invoke and returns serialized result + /// deserializes the payload from buffer, dispatches invoke and returns serialized result /// (for non-blocking io) fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec; } diff --git a/ipc/rpc/src/lib.rs b/ipc/rpc/src/lib.rs index a0d718871..f0083e66e 100644 --- a/ipc/rpc/src/lib.rs +++ b/ipc/rpc/src/lib.rs @@ -20,4 +20,4 @@ extern crate ethcore_devtools as devtools; extern crate semver; pub mod interface; -pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig}; +pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error}; diff --git a/ipc/tests/service.rs.in b/ipc/tests/service.rs.in index 14de716da..f48596e6c 100644 --- a/ipc/tests/service.rs.in +++ b/ipc/tests/service.rs.in @@ -16,6 +16,7 @@ use std::sync::RwLock; use std::ops::*; +use ipc::IpcConfig; pub struct Service { pub commits: RwLock, From 9adb79ed0e43ba6006d836abf5f0db747b18800a Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 06:43:45 +0300 Subject: [PATCH 06/15] handshake dispatch test --- ipc/tests/examples.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 202139d54..9a5158830 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -35,6 +35,30 @@ mod tests { assert_eq!(10, *service.commits.read().unwrap()); } + + #[test] + fn call_service_handshake() { + let mut socket = TestSocket::new_ready(vec![0, 0, + // protocol version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // api version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // reserved + 0, 0, 0, 0, 0, 0, 0, 64, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]); + + let service = Service::new(); + let result = service.dispatch(&mut socket); + + // single `true` + assert_eq!(vec![1], result); + } + + #[test] fn call_service_proxy() { let mut socket = TestSocket::new(); From fa47f1c28baf10a0efd57a6070d693d79d2b89a6 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 07:07:12 +0300 Subject: [PATCH 07/15] codegen for client handshake --- ipc/codegen/src/codegen.rs | 63 +++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 63f225100..1be753103 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -502,6 +502,7 @@ fn push_client_implementation( .collect::>>(); let client_ident = builder.id(format!("{}Client", item.ident.name.as_str())); + let item_ident = builder.id(format!("{}", item.ident.name.as_str())); let implement = quote_item!(cx, impl $client_ident where S: ::ipc::IpcSocket { pub fn new(socket: S) -> $client_ident { @@ -511,6 +512,27 @@ fn push_client_implementation( } } + pub fn handshake(&self) -> bool { + let payload = BinHandshake { + protocol_version: $item_ident::protocol_version().to_string(), + api_version: $item_ident::api_version().to_string(), + _reserved: vec![0u8, 64], + }; + + let mut socket_ref = self.socket.borrow_mut(); + let mut socket = socket_ref.deref_mut(); + ::ipc::invoke( + 0, + &Some(::bincode::serde::serialize(&payload, ::bincode::SizeLimit::Infinite).unwrap()), + &mut socket); + + let mut result = vec![0u8; 1]; + if socket.read(&mut result).unwrap() == 1 { + result[0] == 1 + } + else { false } + } + #[cfg(test)] pub fn socket(&self) -> &::std::cell::RefCell { &self.socket @@ -522,6 +544,38 @@ fn push_client_implementation( push(Annotatable::Item(implement)); } +/// implements dispatching of system handshake invocation (method_num 0) +fn implement_handshake_arm( + cx: &ExtCtxt, +) -> (ast::Arm, ast::Arm) +{ + let handshake_deserialize = quote_stmt!(&cx, + let handshake_payload = ::bincode::serde::deserialize_from::<_, BinHandshake>(r, ::bincode::SizeLimit::Infinite).unwrap(); + ); + + let handshake_deserialize_buf = quote_stmt!(&cx, + let handshake_payload = ::bincode::serde::deserialize::(buf).unwrap(); + ); + + let handshake_serialize = quote_expr!(&cx, + ::bincode::serde::serialize::(&Self::handshake(&::ipc::Handshake { + api_version: ::semver::Version::parse(&handshake_payload.api_version).unwrap(), + protocol_version: ::semver::Version::parse(&handshake_payload.protocol_version).unwrap(), + }), ::bincode::SizeLimit::Infinite).unwrap() + ); + + ( + quote_arm!(&cx, 0 => { + $handshake_deserialize + $handshake_serialize + }), + quote_arm!(&cx, 0 => { + $handshake_deserialize_buf + $handshake_serialize + }), + ) +} + /// implements `IpcInterface` for the given class `C` fn implement_interface( cx: &ExtCtxt, @@ -561,13 +615,7 @@ fn implement_interface( let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false); let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true); - let handshake_arm = quote_arm!(&cx, 0 => { - let handshake_payload = ::bincode::serde::deserialize_from::<_, BinHandshake>(r, ::bincode::SizeLimit::Infinite).unwrap(); - ::bincode::serde::serialize::(&Self::handshake(&::ipc::Handshake { - api_version: ::semver::Version::parse(&handshake_payload.api_version).unwrap(), - protocol_version: ::semver::Version::parse(&handshake_payload.protocol_version).unwrap(), - }), ::bincode::SizeLimit::Infinite).unwrap() - }); + let (handshake_arm, handshake_arm_buf) = implement_handshake_arm(cx); Ok((quote_item!(cx, impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause { @@ -593,6 +641,7 @@ fn implement_interface( fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec { match method_num { + $handshake_arm_buf $dispatch_arms_buffered _ => vec![] } From 0c42126b8f0e86928265503864d213c4f40faacc Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 07:13:31 +0300 Subject: [PATCH 08/15] client handshake tests, errors --- ipc/codegen/src/codegen.rs | 11 +++++++---- ipc/rpc/src/interface.rs | 3 +++ ipc/tests/examples.rs | 15 +++++++++++++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 1be753103..7341cc220 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -512,7 +512,7 @@ fn push_client_implementation( } } - pub fn handshake(&self) -> bool { + pub fn handshake(&self) -> Result<(), ::ipc::Error> { let payload = BinHandshake { protocol_version: $item_ident::protocol_version().to_string(), api_version: $item_ident::api_version().to_string(), @@ -527,10 +527,13 @@ fn push_client_implementation( &mut socket); let mut result = vec![0u8; 1]; - if socket.read(&mut result).unwrap() == 1 { - result[0] == 1 + if try!(socket.read(&mut result).map_err(|_| ::ipc::Error::HandshakeFailed)) == 1 { + match result[0] { + 1 => Ok(()), + _ => Err(::ipc::Error::RemoteServiceUnsupported), + } } - else { false } + else { Err(::ipc::Error::HandshakeFailed) } } #[cfg(test)] diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 5b5a30b27..a4e2b3c05 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -39,9 +39,12 @@ pub trait IpcConfig { } } +#[derive(Debug)] pub enum Error { UnkownSystemCall, ClientUnsupported, + RemoteServiceUnsupported, + HandshakeFailed, } pub trait IpcInterface where T: IpcConfig { diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 9a5158830..931021d73 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -60,7 +60,7 @@ mod tests { #[test] - fn call_service_proxy() { + fn call_service_client() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; let service_client = ServiceClient::new(socket); @@ -72,7 +72,7 @@ mod tests { } #[test] - fn call_service_proxy_optional() { + fn call_service_client_optional() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; let service_client = ServiceClient::new(socket); @@ -90,4 +90,15 @@ mod tests { let ver = Service::api_version(); assert_eq!(ver, Version::parse("1.0.0").unwrap()); } + + #[test] + fn call_service_client_handshake() { + let mut socket = TestSocket::new(); + socket.read_buffer = vec![1]; + let service_client = ServiceClient::new(socket); + + let result = service_client.handshake(); + + assert!(result.is_ok()); + } } From 5609b555d2a83dafc8bdd2c7ce6e8e4337ac00a6 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 07:17:57 +0300 Subject: [PATCH 09/15] removed ready func --- ipc/codegen/src/codegen.rs | 3 --- ipc/rpc/src/interface.rs | 4 ---- 2 files changed, 7 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 7341cc220..c73bed22f 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -412,21 +412,18 @@ fn implement_client_method_body( vec![] }; - let wait_result_stmt = quote_stmt!(cx, while !socket.ready().load(::std::sync::atomic::Ordering::Relaxed) { }); if let Some(ref return_ty) = dispatch.return_type_ty { let return_expr = quote_expr!(cx, ::bincode::serde::deserialize_from::<_, $return_ty>(&mut socket, ::bincode::SizeLimit::Infinite).unwrap() ); quote_expr!(cx, { $request - $wait_result_stmt $return_expr }) } else { quote_expr!(cx, { $request - $wait_result_stmt }) } } diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index a4e2b3c05..247b5339b 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -79,11 +79,7 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: /// IpcSocket pub trait IpcSocket: Read + Write + Sync { - fn ready(&self) -> AtomicBool; } impl IpcSocket for ::devtools::TestSocket { - fn ready(&self) -> AtomicBool { - AtomicBool::new(true) - } } From 40e0d370c2cf5b216ba01bb37c75ac8c19191bdb Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 10:18:39 +0300 Subject: [PATCH 10/15] client spawner --- ipc/codegen/src/codegen.rs | 27 ++++++++++++++++++++------- ipc/nano/src/lib.rs | 37 +++++++++++++++++++++++++++++++++++-- ipc/rpc/Cargo.toml | 1 + ipc/rpc/src/interface.rs | 12 +++++++++--- ipc/rpc/src/lib.rs | 3 ++- ipc/tests/Cargo.toml | 1 + ipc/tests/examples.rs | 6 +++--- ipc/tests/run.rs | 1 + 8 files changed, 72 insertions(+), 16 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index c73bed22f..871771d1d 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -303,6 +303,7 @@ fn push_client( { push_client_struct(cx, builder, item, push); push_client_implementation(cx, builder, dispatches, item, push); + push_with_socket_client_implementation(cx, builder, item, push); } /// returns an expression with the body for single operation that is being sent to server @@ -485,6 +486,25 @@ fn implement_client_method( signature.unwrap() } +fn push_with_socket_client_implementation( + cx: &ExtCtxt, + builder: &aster::AstBuilder, + item: &Item, + push: &mut FnMut(Annotatable)) +{ + let client_ident = builder.id(format!("{}Client", item.ident.name.as_str())); + let implement = quote_item!(cx, + impl ::ipc::WithSocket for $client_ident where S: ::ipc::IpcSocket { + fn init(socket: S) -> $client_ident { + $client_ident { + socket: ::std::cell::RefCell::new(socket), + phantom: ::std::marker::PhantomData, + } + } + }).unwrap(); + push(Annotatable::Item(implement)); +} + /// pushes full client side code for the original class exposed via ipc fn push_client_implementation( cx: &ExtCtxt, @@ -502,13 +522,6 @@ fn push_client_implementation( let item_ident = builder.id(format!("{}", item.ident.name.as_str())); let implement = quote_item!(cx, impl $client_ident where S: ::ipc::IpcSocket { - pub fn new(socket: S) -> $client_ident { - $client_ident { - socket: ::std::cell::RefCell::new(socket), - phantom: ::std::marker::PhantomData, - } - } - pub fn handshake(&self) -> Result<(), ::ipc::Error> { let payload = BinHandshake { protocol_version: $item_ident::protocol_version().to_string(), diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 6a0a3d4bf..4362448d5 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -20,10 +20,11 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; -pub use ipc::*; +pub use ipc::{WithSocket, IpcInterface, IpcConfig}; use std::sync::*; use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; +use std::ops::Deref; const POLL_TIMEOUT: isize = 100; @@ -34,6 +35,36 @@ pub struct Worker where S: IpcInterface { buf: Vec, } +pub struct GuardedSocket where S: WithSocket { + client: Arc, + _endpoint: Endpoint, +} + +impl Deref for GuardedSocket where S: WithSocket { + type Target = S; + + fn deref(&self) -> &S { + &self.client + } +} + +pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { + warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); + SocketError::DuplexLink + })); + + let endpoint = try!(socket.bind(socket_addr).map_err(|e| { + warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); + SocketError::DuplexLink + })); + + Ok(GuardedSocket { + client: Arc::new(S::init(socket)), + _endpoint: endpoint, + }) +} + #[derive(Debug)] pub enum SocketError { DuplexLink @@ -113,7 +144,7 @@ impl Worker where S: IpcInterface { } #[cfg(test)] -mod tests { +mod service_tests { use super::Worker; use ipc::*; @@ -150,6 +181,8 @@ mod tests { } } + impl IpcConfig for DummyService {} + fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) { let mut socket = Socket::new(Protocol::Pair).unwrap(); let endpoint = socket.connect(addr).unwrap(); diff --git a/ipc/rpc/Cargo.toml b/ipc/rpc/Cargo.toml index b5177db41..a6346bbf9 100644 --- a/ipc/rpc/Cargo.toml +++ b/ipc/rpc/Cargo.toml @@ -9,3 +9,4 @@ license = "GPL-3.0" [dependencies] ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" +nanomsg = "0.5.0" diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 247b5339b..1c9b78703 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -18,7 +18,6 @@ use std::io::{Read, Write}; use std::marker::Sync; -use std::sync::atomic::*; use semver::Version; pub struct Handshake { @@ -47,7 +46,7 @@ pub enum Error { HandshakeFailed, } -pub trait IpcInterface where T: IpcConfig { +pub trait IpcInterface:IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; @@ -81,5 +80,12 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: pub trait IpcSocket: Read + Write + Sync { } -impl IpcSocket for ::devtools::TestSocket { + +pub trait WithSocket { + fn init(socket: S) -> Self; } + + +impl IpcSocket for ::devtools::TestSocket {} + +impl IpcSocket for ::nanomsg::Socket {} diff --git a/ipc/rpc/src/lib.rs b/ipc/rpc/src/lib.rs index f0083e66e..c2165b6e5 100644 --- a/ipc/rpc/src/lib.rs +++ b/ipc/rpc/src/lib.rs @@ -18,6 +18,7 @@ extern crate ethcore_devtools as devtools; extern crate semver; +extern crate nanomsg; pub mod interface; -pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error}; +pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error, WithSocket}; diff --git a/ipc/tests/Cargo.toml b/ipc/tests/Cargo.toml index d6d066b93..931bf8061 100644 --- a/ipc/tests/Cargo.toml +++ b/ipc/tests/Cargo.toml @@ -13,6 +13,7 @@ bincode = "*" serde = "0.7.0" ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" +nanomsg = "0.5.0" [build-dependencies] syntex = "0.30.0" diff --git a/ipc/tests/examples.rs b/ipc/tests/examples.rs index 931021d73..9d5eb37c4 100644 --- a/ipc/tests/examples.rs +++ b/ipc/tests/examples.rs @@ -63,7 +63,7 @@ mod tests { fn call_service_client() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.commit(5); @@ -75,7 +75,7 @@ mod tests { fn call_service_client_optional() { let mut socket = TestSocket::new(); socket.read_buffer = vec![0, 0, 0, 10]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.rollback(Some(5), 10); @@ -95,7 +95,7 @@ mod tests { fn call_service_client_handshake() { let mut socket = TestSocket::new(); socket.read_buffer = vec![1]; - let service_client = ServiceClient::new(socket); + let service_client = ServiceClient::init(socket); let result = service_client.handshake(); diff --git a/ipc/tests/run.rs b/ipc/tests/run.rs index e464d186f..0b86fe368 100644 --- a/ipc/tests/run.rs +++ b/ipc/tests/run.rs @@ -19,6 +19,7 @@ extern crate ethcore_ipc as ipc; extern crate serde; extern crate ethcore_devtools as devtools; extern crate semver; +extern crate nanomsg; pub mod service; mod examples; From 9a82607385ab7b2817a544e15958de689795e609 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 10:41:17 +0300 Subject: [PATCH 11/15] client & server dual tests (not working) --- ipc/tests/Cargo.toml | 2 ++ ipc/tests/over_nano.rs | 54 ++++++++++++++++++++++++++++++++++++++++++ ipc/tests/run.rs | 2 ++ 3 files changed, 58 insertions(+) create mode 100644 ipc/tests/over_nano.rs diff --git a/ipc/tests/Cargo.toml b/ipc/tests/Cargo.toml index 931bf8061..035de644c 100644 --- a/ipc/tests/Cargo.toml +++ b/ipc/tests/Cargo.toml @@ -14,6 +14,8 @@ serde = "0.7.0" ethcore-devtools = { path = "../../devtools" } semver = "0.2.0" nanomsg = "0.5.0" +ethcore-ipc-nano = { path = "../nano" } + [build-dependencies] syntex = "0.30.0" diff --git a/ipc/tests/over_nano.rs b/ipc/tests/over_nano.rs new file mode 100644 index 000000000..944f12c2e --- /dev/null +++ b/ipc/tests/over_nano.rs @@ -0,0 +1,54 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#[cfg(test)] +mod tests { + + use super::super::service::*; + use nanoipc; + use std::sync::Arc; + + fn init_worker(addr: &str) -> nanoipc::Worker { + let mut worker = nanoipc::Worker::::new(Arc::new(Service::new())); + worker.add_duplex(addr).unwrap(); + worker + } + + #[test] + fn can_create_client() { + let client = nanoipc::init_client::>("ipc:///tmp/parity-examples-test10.ipc"); + assert!(client.is_ok()); + } + + #[test] + fn can_call_handshake() { + let exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let url = "ipc:///tmp/parity-test-examples-20.ipc"; + + let worker_exit = exit.clone(); + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !worker_exit.load(::std::sync::atomic::Ordering::Relaxed) { worker.poll() } + }); + let client = nanoipc::init_client::>(url).unwrap(); + + let hs = client.handshake(); + + exit.store(true, ::std::sync::atomic::Ordering::Relaxed); + assert!(hs.is_ok()); + } + +} diff --git a/ipc/tests/run.rs b/ipc/tests/run.rs index 0b86fe368..d3eb15287 100644 --- a/ipc/tests/run.rs +++ b/ipc/tests/run.rs @@ -20,6 +20,8 @@ extern crate serde; extern crate ethcore_devtools as devtools; extern crate semver; extern crate nanomsg; +extern crate ethcore_ipc_nano as nanoipc; pub mod service; mod examples; +mod over_nano; From cb1096d1e1adcb3a7ba15c7735e9f6d05804abbc Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 11:13:27 +0300 Subject: [PATCH 12/15] adding init wait --- ipc/nano/src/lib.rs | 2 +- ipc/tests/over_nano.rs | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 4362448d5..41f17018e 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -91,6 +91,7 @@ impl Worker where S: IpcInterface { match socket.nb_read_to_end(&mut self.buf) { Ok(method_sign_len) => { if method_sign_len >= 2 { + // method_num let method_num = self.buf[1] as u16 * 256 + self.buf[0] as u16; // payload @@ -186,7 +187,6 @@ mod service_tests { fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) { let mut socket = Socket::new(Protocol::Pair).unwrap(); let endpoint = socket.connect(addr).unwrap(); - //thread::sleep_ms(10); socket.write(buf).unwrap(); (socket, endpoint) } diff --git a/ipc/tests/over_nano.rs b/ipc/tests/over_nano.rs index 944f12c2e..56ea94d61 100644 --- a/ipc/tests/over_nano.rs +++ b/ipc/tests/over_nano.rs @@ -29,25 +29,33 @@ mod tests { #[test] fn can_create_client() { - let client = nanoipc::init_client::>("ipc:///tmp/parity-examples-test10.ipc"); + let client = nanoipc::init_client::>("ipc:///tmp/parity-nano-test10.ipc"); assert!(client.is_ok()); } #[test] fn can_call_handshake() { - let exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); - let url = "ipc:///tmp/parity-test-examples-20.ipc"; + let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + let url = "ipc:///tmp/parity-test-nano-20.ipc"; - let worker_exit = exit.clone(); ::std::thread::spawn(move || { let mut worker = init_worker(url); - while !worker_exit.load(::std::sync::atomic::Ordering::Relaxed) { worker.poll() } + while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed); + } }); + + while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { } let client = nanoipc::init_client::>(url).unwrap(); let hs = client.handshake(); - exit.store(true, ::std::sync::atomic::Ordering::Relaxed); + worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed); assert!(hs.is_ok()); } From 806f5b90642323fa79518c72d681d2e7cee2d049 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 11:34:56 +0300 Subject: [PATCH 13/15] working client spawn --- ipc/codegen/src/codegen.rs | 2 +- ipc/nano/src/lib.rs | 2 +- ipc/rpc/src/interface.rs | 1 + ipc/tests/over_nano.rs | 50 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 871771d1d..671e067b3 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -526,7 +526,7 @@ fn push_client_implementation( let payload = BinHandshake { protocol_version: $item_ident::protocol_version().to_string(), api_version: $item_ident::api_version().to_string(), - _reserved: vec![0u8, 64], + _reserved: vec![0u8; 64], }; let mut socket_ref = self.socket.borrow_mut(); diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 41f17018e..9d74325fa 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -54,7 +54,7 @@ pub fn init_client(socket_addr: &str) -> Result, SocketError SocketError::DuplexLink })); - let endpoint = try!(socket.bind(socket_addr).map_err(|e| { + let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); SocketError::DuplexLink })); diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 1c9b78703..5c0d57612 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -69,6 +69,7 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: if params.is_some() { buf[2..buf_len].clone_from_slice(params.as_ref().unwrap()); } + if w.write(&buf).unwrap() != buf_len { // if write was inconsistent diff --git a/ipc/tests/over_nano.rs b/ipc/tests/over_nano.rs index 56ea94d61..731163638 100644 --- a/ipc/tests/over_nano.rs +++ b/ipc/tests/over_nano.rs @@ -20,6 +20,15 @@ mod tests { use super::super::service::*; use nanoipc; use std::sync::Arc; + use std::io::{Write, Read}; + + fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) { + let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap(); + let endpoint = socket.connect(addr).unwrap(); + socket.write(buf).unwrap(); + (socket, endpoint) + } + fn init_worker(addr: &str) -> nanoipc::Worker { let mut worker = nanoipc::Worker::::new(Arc::new(Service::new())); @@ -35,13 +44,12 @@ mod tests { #[test] fn can_call_handshake() { + let url = "ipc:///tmp/parity-test-nano-20.ipc"; let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); let c_worker_should_exit = worker_should_exit.clone(); let c_worker_is_ready = worker_is_ready.clone(); - let url = "ipc:///tmp/parity-test-nano-20.ipc"; - ::std::thread::spawn(move || { let mut worker = init_worker(url); while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { @@ -59,4 +67,42 @@ mod tests { assert!(hs.is_ok()); } + #[test] + fn can_receive_dummy_writes_in_thread() { + let url = "ipc:///tmp/parity-test-nano-30.ipc"; + let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed); + } + }); + while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { } + + let (mut _s, _e) = dummy_write(url, &vec![0, 0, + // protocol version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // api version + 0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0', + // reserved + 0, 0, 0, 0, 0, 0, 0, 64, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]); + + let mut buf = vec![0u8;1]; + let result = _s.read(&mut buf); + assert_eq!(1, buf.len()); + assert_eq!(1, buf[0]); + + worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed); + } + } From b1330b9375a9bd047fbd6f7353c05a7c4dabd481 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 11:41:26 +0300 Subject: [PATCH 14/15] removed global paths and fix warn --- ipc/tests/over_nano.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/ipc/tests/over_nano.rs b/ipc/tests/over_nano.rs index 731163638..720dff81d 100644 --- a/ipc/tests/over_nano.rs +++ b/ipc/tests/over_nano.rs @@ -21,6 +21,7 @@ mod tests { use nanoipc; use std::sync::Arc; use std::io::{Write, Read}; + use std::sync::atomic::{Ordering, AtomicBool}; fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) { let mut socket = ::nanomsg::Socket::new(::nanomsg::Protocol::Pair).unwrap(); @@ -45,44 +46,44 @@ mod tests { #[test] fn can_call_handshake() { let url = "ipc:///tmp/parity-test-nano-20.ipc"; - let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); - let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); let c_worker_should_exit = worker_should_exit.clone(); let c_worker_is_ready = worker_is_ready.clone(); ::std::thread::spawn(move || { let mut worker = init_worker(url); - while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { + while !c_worker_should_exit.load(Ordering::Relaxed) { worker.poll(); - c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed); + c_worker_is_ready.store(true, Ordering::Relaxed); } }); - while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { } + while !worker_is_ready.load(Ordering::Relaxed) { } let client = nanoipc::init_client::>(url).unwrap(); let hs = client.handshake(); - worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed); + worker_should_exit.store(true, Ordering::Relaxed); assert!(hs.is_ok()); } #[test] fn can_receive_dummy_writes_in_thread() { let url = "ipc:///tmp/parity-test-nano-30.ipc"; - let worker_should_exit = Arc::new(::std::sync::atomic::AtomicBool::new(false)); - let worker_is_ready = Arc::new(::std::sync::atomic::AtomicBool::new(false)); + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); let c_worker_should_exit = worker_should_exit.clone(); let c_worker_is_ready = worker_is_ready.clone(); ::std::thread::spawn(move || { let mut worker = init_worker(url); - while !c_worker_should_exit.load(::std::sync::atomic::Ordering::Relaxed) { + while !c_worker_should_exit.load(Ordering::Relaxed) { worker.poll(); - c_worker_is_ready.store(true, ::std::sync::atomic::Ordering::Relaxed); + c_worker_is_ready.store(true, Ordering::Relaxed); } }); - while !worker_is_ready.load(::std::sync::atomic::Ordering::Relaxed) { } + while !worker_is_ready.load(Ordering::Relaxed) { } let (mut _s, _e) = dummy_write(url, &vec![0, 0, // protocol version @@ -98,11 +99,11 @@ mod tests { ]); let mut buf = vec![0u8;1]; - let result = _s.read(&mut buf); + _s.read(&mut buf).unwrap(); assert_eq!(1, buf.len()); assert_eq!(1, buf[0]); - worker_should_exit.store(true, ::std::sync::atomic::Ordering::Relaxed); + worker_should_exit.store(true, Ordering::Relaxed); } } From 8af86aae8477d34c162265c65d1ca3344f4a6266 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 12 Apr 2016 11:53:41 +0300 Subject: [PATCH 15/15] some docs --- ipc/nano/src/lib.rs | 13 +++++++++++++ ipc/rpc/src/interface.rs | 15 +++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 9d74325fa..18a5b0ffe 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -28,6 +28,7 @@ use std::ops::Deref; const POLL_TIMEOUT: isize = 100; +/// Generic worker to handle service (binded) sockets pub struct Worker where S: IpcInterface { service: Arc, sockets: Vec<(Socket, Endpoint)>, @@ -35,6 +36,8 @@ pub struct Worker where S: IpcInterface { buf: Vec, } +/// struct for guarding `_endpoint` (so that it wont drop) +/// derefs to client `S` pub struct GuardedSocket where S: WithSocket { client: Arc, _endpoint: Endpoint, @@ -48,6 +51,9 @@ impl Deref for GuardedSocket where S: WithSocket { } } +/// Spawns client <`S`> over specified address +/// creates socket and connects endpoint to it +/// for duplex (paired) connections with the service pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); @@ -65,12 +71,15 @@ pub fn init_client(socket_addr: &str) -> Result, SocketError }) } +/// Error occured while establising socket or endpoint #[derive(Debug)] pub enum SocketError { + /// Error establising duplex (paired) socket and/or endpoint DuplexLink } impl Worker where S: IpcInterface { + /// New worker over specified `service` pub fn new(service: Arc) -> Worker { Worker:: { service: service.clone(), @@ -80,6 +89,7 @@ impl Worker where S: IpcInterface { } } + /// Polls all sockets, reads and dispatches method invocations pub fn poll(&mut self) { let mut request = PollRequest::new(&mut self.polls[..]); let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); @@ -119,12 +129,15 @@ impl Worker where S: IpcInterface { } } + /// Stores nanomsg poll request for reuse fn rebuild_poll_request(&mut self) { self.polls = self.sockets.iter() .map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In)) .collect::>(); } + /// Add exclusive socket for paired client + /// Only one connection over this address is allowed pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> { let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 5c0d57612..077683d5b 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -20,24 +20,33 @@ use std::io::{Read, Write}; use std::marker::Sync; use semver::Version; +/// Handshake for client and server to negotiate api/protocol version pub struct Handshake { pub protocol_version: Version, pub api_version: Version, } +/// Allows to configure custom version and custom handshake response for +/// ipc host pub trait IpcConfig { + /// Current service api version + /// Should be increased if any of the methods changes signature fn api_version() -> Version { Version::parse("1.0.0").unwrap() } + /// Current ipc protocol version + /// Should be increased only if signature of system methods changes fn protocol_version() -> Version { Version::parse("1.0.0").unwrap() } + /// Default handshake requires exact versions match fn handshake(handshake: &Handshake) -> bool { handshake.protocol_version == Self::protocol_version() && handshake.api_version == Self::api_version() } } +/// Error in dispatching or invoking methods via IPC #[derive(Debug)] pub enum Error { UnkownSystemCall, @@ -46,6 +55,8 @@ pub enum Error { HandshakeFailed, } +/// Allows implementor to be attached to generic worker and dispatch rpc requests +/// over IPC pub trait IpcInterface:IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; @@ -77,11 +88,11 @@ pub fn invoke(method_num: u16, params: &Option>, w: &mut W) where W: } } -/// IpcSocket +/// IpcSocket, read/write generalization pub trait IpcSocket: Read + Write + Sync { } - +/// Basically something that needs only socket to be spawned pub trait WithSocket { fn init(socket: S) -> Self; }