From 81d8dafd9e3459dcd76a416fd3f901f8426a4ec7 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 2 Jun 2016 19:04:42 +0200 Subject: [PATCH] Ipc serialization & protocol fixes (#1188) * serialization and codegen fixes from branch * nano lib fixes * fixes error encoding & comment * another comment fix * client timeout -> const --- ipc/codegen/src/codegen.rs | 27 ++++++++------------ ipc/codegen/src/serialization.rs | 14 ++++++++--- ipc/nano/src/lib.rs | 13 ++++++++++ ipc/rpc/src/binary.rs | 43 ++++++++++++++++++++++++++++---- 4 files changed, 71 insertions(+), 26 deletions(-) diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 57db7d261..9dd6a7b32 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -392,16 +392,13 @@ fn implement_client_method_body( }); request_serialization_statements.push( - quote_stmt!(cx, let mut socket_ref = self.socket.borrow_mut())); - - request_serialization_statements.push( - quote_stmt!(cx, let mut socket = socket_ref.deref_mut())); + quote_stmt!(cx, let mut socket = self.socket.write().unwrap(); )); request_serialization_statements.push( quote_stmt!(cx, let serialized_payload = ::ipc::binary::serialize(&payload).unwrap())); request_serialization_statements.push( - quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut socket))); + quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut *socket))); request_serialization_statements @@ -409,17 +406,15 @@ fn implement_client_method_body( else { let mut request_serialization_statements = Vec::new(); request_serialization_statements.push( - quote_stmt!(cx, let mut socket_ref = self.socket.borrow_mut())); + quote_stmt!(cx, let mut socket = self.socket.write().unwrap(); )); request_serialization_statements.push( - quote_stmt!(cx, let mut socket = socket_ref.deref_mut())); - request_serialization_statements.push( - quote_stmt!(cx, ::ipc::invoke($index_ident, &None, &mut socket))); + quote_stmt!(cx, ::ipc::invoke($index_ident, &None, &mut *socket))); request_serialization_statements }; if let Some(ref return_ty) = dispatch.return_type_ty { let return_expr = quote_expr!(cx, - ::ipc::binary::deserialize_from::<$return_ty, _>(&mut socket).unwrap() + ::ipc::binary::deserialize_from::<$return_ty, _>(&mut *socket).unwrap() ); quote_expr!(cx, { $request @@ -525,7 +520,7 @@ fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, interface_map: let client_struct_item = quote_item!(cx, pub struct $client_short_ident $generics { - socket: ::std::cell::RefCell, + socket: ::std::sync::RwLock, phantom: $phantom, }); @@ -560,7 +555,7 @@ fn push_with_socket_client_implementation( impl $generics ::ipc::WithSocket for $client_ident $where_clause { fn init(socket: S) -> $client_ident { $client_short_ident { - socket: ::std::cell::RefCell::new(socket), + socket: ::std::sync::RwLock::new(socket), phantom: ::std::marker::PhantomData, } } @@ -594,15 +589,13 @@ fn push_client_implementation( reserved: vec![0u8; 64], }; - let mut socket_ref = self.socket.borrow_mut(); - let mut socket = socket_ref.deref_mut(); ::ipc::invoke( 0, &Some(::ipc::binary::serialize(&payload).unwrap()), - &mut socket); + &mut *self.socket.write().unwrap()); let mut result = vec![0u8; 1]; - if try!(socket.read(&mut result).map_err(|_| ::ipc::Error::HandshakeFailed)) == 1 { + if try!(self.socket.write().unwrap().read(&mut result).map_err(|_| ::ipc::Error::HandshakeFailed)) == 1 { match result[0] { 1 => Ok(()), _ => Err(::ipc::Error::RemoteServiceUnsupported), @@ -613,7 +606,7 @@ fn push_client_implementation( let socket_item = quote_impl_item!(cx, #[cfg(test)] - pub fn socket(&self) -> &::std::cell::RefCell { + pub fn socket(&self) -> &::std::sync::RwLock { &self.socket }).unwrap(); diff --git a/ipc/codegen/src/serialization.rs b/ipc/codegen/src/serialization.rs index c2e39ea33..b32c88b6d 100644 --- a/ipc/codegen/src/serialization.rs +++ b/ipc/codegen/src/serialization.rs @@ -230,6 +230,9 @@ fn binary_expr_struct( let field_amount = builder.id(&format!("{}",fields.len())); map_stmts.push(quote_stmt!(cx, let mut map = vec![0usize; $field_amount];).unwrap()); map_stmts.push(quote_stmt!(cx, let mut total = 0usize;).unwrap()); + + let mut post_write_stmts = Vec::::new(); + for (index, field) in fields.iter().enumerate() { let field_type_ident = builder.id( &::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty))); @@ -249,6 +252,7 @@ fn binary_expr_struct( }; let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)); + let range_ident = builder.id(format!("r{}", index)); match raw_ident.as_ref() { "u8" => { write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap()); @@ -258,15 +262,17 @@ fn binary_expr_struct( write_stmts.push(quote_stmt!(cx, let size = $member_expr .len();).unwrap()); write_stmts.push(quote_stmt!(cx, let next_line = offset + size;).unwrap()); write_stmts.push(quote_stmt!(cx, length_stack.push_back(size);).unwrap()); - write_stmts.push(quote_stmt!(cx, buffer[offset..next_line].clone_from_slice($member_expr); ).unwrap()); + write_stmts.push(quote_stmt!(cx, let $range_ident = offset..next_line; ).unwrap()); + post_write_stmts.push(quote_stmt!(cx, buffer[$range_ident].clone_from_slice($member_expr); ).unwrap()); } _ => { write_stmts.push(quote_stmt!(cx, let next_line = offset + match $field_type_ident_qualified::len_params() { 0 => mem::size_of::<$field_type_ident>(), _ => { let size = $member_expr .size(); length_stack.push_back(size); size }, }).unwrap()); - write_stmts.push(quote_stmt!(cx, - if let Err(e) = $member_expr .to_bytes(&mut buffer[offset..next_line], length_stack) { return Err(e) };).unwrap()); + write_stmts.push(quote_stmt!(cx, let $range_ident = offset..next_line; ).unwrap()); + post_write_stmts.push(quote_stmt!(cx, + if let Err(e) = $member_expr .to_bytes(&mut buffer[$range_ident], length_stack) { return Err(e) };).unwrap()); } } @@ -312,7 +318,7 @@ fn binary_expr_struct( Ok(BinaryExpressions { size: total_size_expr, - write: quote_expr!(cx, { $write_stmts; Ok(()) } ), + write: quote_expr!(cx, { $write_stmts; $post_write_stmts; Ok(()) } ), read: read_expr, }) } diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 964e52c68..38ff05c5b 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -30,6 +30,7 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut} use std::ops::Deref; const POLL_TIMEOUT: isize = 100; +const CLIENT_CONNECTION_TIMEOUT: isize = 2500; /// Generic worker to handle service (binded) sockets pub struct Worker where S: IpcInterface { @@ -46,6 +47,12 @@ pub struct GuardedSocket where S: WithSocket { _endpoint: Endpoint, } +impl GuardedSocket where S: WithSocket { + pub fn service(&self) -> Arc { + self.client.clone() + } +} + impl Deref for GuardedSocket where S: WithSocket { type Target = S; @@ -63,6 +70,9 @@ pub fn init_duplex_client(socket_addr: &str) -> Result, Sock SocketError::DuplexLink })); + // 2500 ms default timeout + socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); + let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); SocketError::DuplexLink @@ -83,6 +93,9 @@ pub fn init_client(socket_addr: &str) -> Result, SocketError SocketError::RequestLink })); + // 2500 ms default timeout + socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); + let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); SocketError::RequestLink diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index a985a2de9..62a3c43b0 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -104,14 +104,31 @@ impl BinaryConvertable for Result) -> Result<(), BinaryConvertError> { match *self { - Ok(ref r) => { buffer[0] = 0; Ok(try!(r.to_bytes(&mut buffer[1..], length_stack))) }, - Err(ref e) => { buffer[1] = 1; Ok(try!(e.to_bytes(&mut buffer[1..], length_stack))) }, + Ok(ref r) => { + buffer[0] = 0; + if r.size() > 0 { + Ok(try!(r.to_bytes(&mut buffer[1..], length_stack))) + } + else { Ok(()) } + }, + Err(ref e) => { + buffer[0] = 1; + if e.size() > 0 { + Ok(try!(e.to_bytes(&mut buffer[1..], length_stack))) + } + else { Ok(()) } + }, } } fn from_bytes(buffer: &[u8], length_stack: &mut VecDeque) -> Result { match buffer[0] { - 0 => Ok(Ok(try!(R::from_bytes(&buffer[1..], length_stack)))), + 0 => { + match buffer.len() { + 1 => Ok(Ok(try!(R::from_empty_bytes()))), + _ => Ok(Ok(try!(R::from_bytes(&buffer[1..], length_stack)))), + } + } 1 => Ok(Err(try!(E::from_bytes(&buffer[1..], length_stack)))), _ => Err(BinaryConvertError) } @@ -154,6 +171,8 @@ impl BinaryConvertable for Vec where T: BinaryConvertable { _ => 128, }); + if buffer.len() == 0 { return Ok(result); } + loop { let next_size = match T::len_params() { 0 => mem::size_of::(), @@ -300,8 +319,8 @@ pub fn deserialize_from(r: &mut R) -> Result let mut payload = Vec::new(); try!(r.read_to_end(&mut payload).map_err(|_| BinaryConvertError)); - let mut length_stack = VecDeque::::new(); let stack_len = try!(u64::from_bytes(&payload[0..8], &mut fake_stack)) as usize; + let mut length_stack = VecDeque::::with_capacity(stack_len); if stack_len > 0 { for idx in 0..stack_len { @@ -607,7 +626,7 @@ fn deserialize_simple_err() { } #[test] -fn deserialize_opt_vec_in_out() { +fn serialize_opt_vec_in_out() { use std::io::{Cursor, SeekFrom, Seek}; let mut buff = Cursor::new(Vec::new()); @@ -619,3 +638,17 @@ fn deserialize_opt_vec_in_out() { assert!(vec.is_none()); } + +#[test] +fn serialize_err_opt_vec_in_out() { + use std::io::{Cursor, SeekFrom, Seek}; + + let mut buff = Cursor::new(Vec::new()); + let optional_vec: Result>, u32> = Ok(None); + serialize_into(&optional_vec, &mut buff).unwrap(); + + buff.seek(SeekFrom::Start(0)).unwrap(); + let vec = deserialize_from::>, u32>, _>(&mut buff).unwrap(); + + assert!(vec.is_ok()); +}