Ipc serialization & protocol fixes (#1188)

* serialization and codegen fixes from branch

* nano lib fixes

* fixes error encoding & comment

* another comment fix

* client timeout -> const
This commit is contained in:
Nikolay Volf 2016-06-02 19:04:42 +02:00 committed by Gav Wood
parent 0318bb9fe9
commit 81d8dafd9e
4 changed files with 71 additions and 26 deletions

View File

@ -392,16 +392,13 @@ fn implement_client_method_body(
}); });
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, let mut socket_ref = self.socket.borrow_mut())); quote_stmt!(cx, let mut socket = self.socket.write().unwrap(); ));
request_serialization_statements.push(
quote_stmt!(cx, let mut socket = socket_ref.deref_mut()));
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, let serialized_payload = ::ipc::binary::serialize(&payload).unwrap())); quote_stmt!(cx, let serialized_payload = ::ipc::binary::serialize(&payload).unwrap()));
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut socket))); quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut *socket)));
request_serialization_statements request_serialization_statements
@ -409,17 +406,15 @@ fn implement_client_method_body(
else { else {
let mut request_serialization_statements = Vec::new(); let mut request_serialization_statements = Vec::new();
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, let mut socket_ref = self.socket.borrow_mut())); quote_stmt!(cx, let mut socket = self.socket.write().unwrap(); ));
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, let mut socket = socket_ref.deref_mut())); quote_stmt!(cx, ::ipc::invoke($index_ident, &None, &mut *socket)));
request_serialization_statements.push(
quote_stmt!(cx, ::ipc::invoke($index_ident, &None, &mut socket)));
request_serialization_statements request_serialization_statements
}; };
if let Some(ref return_ty) = dispatch.return_type_ty { if let Some(ref return_ty) = dispatch.return_type_ty {
let return_expr = quote_expr!(cx, let return_expr = quote_expr!(cx,
::ipc::binary::deserialize_from::<$return_ty, _>(&mut socket).unwrap() ::ipc::binary::deserialize_from::<$return_ty, _>(&mut *socket).unwrap()
); );
quote_expr!(cx, { quote_expr!(cx, {
$request $request
@ -525,7 +520,7 @@ fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, interface_map:
let client_struct_item = quote_item!(cx, let client_struct_item = quote_item!(cx,
pub struct $client_short_ident $generics { pub struct $client_short_ident $generics {
socket: ::std::cell::RefCell<S>, socket: ::std::sync::RwLock<S>,
phantom: $phantom, phantom: $phantom,
}); });
@ -560,7 +555,7 @@ fn push_with_socket_client_implementation(
impl $generics ::ipc::WithSocket<S> for $client_ident $where_clause { impl $generics ::ipc::WithSocket<S> for $client_ident $where_clause {
fn init(socket: S) -> $client_ident { fn init(socket: S) -> $client_ident {
$client_short_ident { $client_short_ident {
socket: ::std::cell::RefCell::new(socket), socket: ::std::sync::RwLock::new(socket),
phantom: ::std::marker::PhantomData, phantom: ::std::marker::PhantomData,
} }
} }
@ -594,15 +589,13 @@ fn push_client_implementation(
reserved: vec![0u8; 64], reserved: vec![0u8; 64],
}; };
let mut socket_ref = self.socket.borrow_mut();
let mut socket = socket_ref.deref_mut();
::ipc::invoke( ::ipc::invoke(
0, 0,
&Some(::ipc::binary::serialize(&payload).unwrap()), &Some(::ipc::binary::serialize(&payload).unwrap()),
&mut socket); &mut *self.socket.write().unwrap());
let mut result = vec![0u8; 1]; let mut result = vec![0u8; 1];
if try!(socket.read(&mut result).map_err(|_| ::ipc::Error::HandshakeFailed)) == 1 { if try!(self.socket.write().unwrap().read(&mut result).map_err(|_| ::ipc::Error::HandshakeFailed)) == 1 {
match result[0] { match result[0] {
1 => Ok(()), 1 => Ok(()),
_ => Err(::ipc::Error::RemoteServiceUnsupported), _ => Err(::ipc::Error::RemoteServiceUnsupported),
@ -613,7 +606,7 @@ fn push_client_implementation(
let socket_item = quote_impl_item!(cx, let socket_item = quote_impl_item!(cx,
#[cfg(test)] #[cfg(test)]
pub fn socket(&self) -> &::std::cell::RefCell<S> { pub fn socket(&self) -> &::std::sync::RwLock<S> {
&self.socket &self.socket
}).unwrap(); }).unwrap();

View File

@ -230,6 +230,9 @@ fn binary_expr_struct(
let field_amount = builder.id(&format!("{}",fields.len())); let field_amount = builder.id(&format!("{}",fields.len()));
map_stmts.push(quote_stmt!(cx, let mut map = vec![0usize; $field_amount];).unwrap()); map_stmts.push(quote_stmt!(cx, let mut map = vec![0usize; $field_amount];).unwrap());
map_stmts.push(quote_stmt!(cx, let mut total = 0usize;).unwrap()); map_stmts.push(quote_stmt!(cx, let mut total = 0usize;).unwrap());
let mut post_write_stmts = Vec::<ast::Stmt>::new();
for (index, field) in fields.iter().enumerate() { for (index, field) in fields.iter().enumerate() {
let field_type_ident = builder.id( let field_type_ident = builder.id(
&::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty))); &::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)));
@ -249,6 +252,7 @@ fn binary_expr_struct(
}; };
let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)); let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty));
let range_ident = builder.id(format!("r{}", index));
match raw_ident.as_ref() { match raw_ident.as_ref() {
"u8" => { "u8" => {
write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap()); write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap());
@ -258,15 +262,17 @@ fn binary_expr_struct(
write_stmts.push(quote_stmt!(cx, let size = $member_expr .len();).unwrap()); write_stmts.push(quote_stmt!(cx, let size = $member_expr .len();).unwrap());
write_stmts.push(quote_stmt!(cx, let next_line = offset + size;).unwrap()); write_stmts.push(quote_stmt!(cx, let next_line = offset + size;).unwrap());
write_stmts.push(quote_stmt!(cx, length_stack.push_back(size);).unwrap()); write_stmts.push(quote_stmt!(cx, length_stack.push_back(size);).unwrap());
write_stmts.push(quote_stmt!(cx, buffer[offset..next_line].clone_from_slice($member_expr); ).unwrap()); write_stmts.push(quote_stmt!(cx, let $range_ident = offset..next_line; ).unwrap());
post_write_stmts.push(quote_stmt!(cx, buffer[$range_ident].clone_from_slice($member_expr); ).unwrap());
} }
_ => { _ => {
write_stmts.push(quote_stmt!(cx, let next_line = offset + match $field_type_ident_qualified::len_params() { write_stmts.push(quote_stmt!(cx, let next_line = offset + match $field_type_ident_qualified::len_params() {
0 => mem::size_of::<$field_type_ident>(), 0 => mem::size_of::<$field_type_ident>(),
_ => { let size = $member_expr .size(); length_stack.push_back(size); size }, _ => { let size = $member_expr .size(); length_stack.push_back(size); size },
}).unwrap()); }).unwrap());
write_stmts.push(quote_stmt!(cx, write_stmts.push(quote_stmt!(cx, let $range_ident = offset..next_line; ).unwrap());
if let Err(e) = $member_expr .to_bytes(&mut buffer[offset..next_line], length_stack) { return Err(e) };).unwrap()); post_write_stmts.push(quote_stmt!(cx,
if let Err(e) = $member_expr .to_bytes(&mut buffer[$range_ident], length_stack) { return Err(e) };).unwrap());
} }
} }
@ -312,7 +318,7 @@ fn binary_expr_struct(
Ok(BinaryExpressions { Ok(BinaryExpressions {
size: total_size_expr, size: total_size_expr,
write: quote_expr!(cx, { $write_stmts; Ok(()) } ), write: quote_expr!(cx, { $write_stmts; $post_write_stmts; Ok(()) } ),
read: read_expr, read: read_expr,
}) })
} }

View File

@ -30,6 +30,7 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}
use std::ops::Deref; use std::ops::Deref;
const POLL_TIMEOUT: isize = 100; const POLL_TIMEOUT: isize = 100;
const CLIENT_CONNECTION_TIMEOUT: isize = 2500;
/// Generic worker to handle service (binded) sockets /// Generic worker to handle service (binded) sockets
pub struct Worker<S> where S: IpcInterface<S> { pub struct Worker<S> where S: IpcInterface<S> {
@ -46,6 +47,12 @@ pub struct GuardedSocket<S> where S: WithSocket<Socket> {
_endpoint: Endpoint, _endpoint: Endpoint,
} }
impl<S> GuardedSocket<S> where S: WithSocket<Socket> {
pub fn service(&self) -> Arc<S> {
self.client.clone()
}
}
impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> { impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
type Target = S; type Target = S;
@ -63,6 +70,9 @@ pub fn init_duplex_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, Sock
SocketError::DuplexLink SocketError::DuplexLink
})); }));
// 2500 ms default timeout
socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap();
let endpoint = try!(socket.connect(socket_addr).map_err(|e| { let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::DuplexLink SocketError::DuplexLink
@ -83,6 +93,9 @@ pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError
SocketError::RequestLink SocketError::RequestLink
})); }));
// 2500 ms default timeout
socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap();
let endpoint = try!(socket.connect(socket_addr).map_err(|e| { let endpoint = try!(socket.connect(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::RequestLink SocketError::RequestLink

View File

@ -104,14 +104,31 @@ impl<R: BinaryConvertable, E: BinaryConvertable> BinaryConvertable for Result<R,
fn to_bytes(&self, buffer: &mut [u8], length_stack: &mut VecDeque<usize>) -> Result<(), BinaryConvertError> { fn to_bytes(&self, buffer: &mut [u8], length_stack: &mut VecDeque<usize>) -> Result<(), BinaryConvertError> {
match *self { match *self {
Ok(ref r) => { buffer[0] = 0; Ok(try!(r.to_bytes(&mut buffer[1..], length_stack))) }, Ok(ref r) => {
Err(ref e) => { buffer[1] = 1; Ok(try!(e.to_bytes(&mut buffer[1..], length_stack))) }, buffer[0] = 0;
if r.size() > 0 {
Ok(try!(r.to_bytes(&mut buffer[1..], length_stack)))
}
else { Ok(()) }
},
Err(ref e) => {
buffer[0] = 1;
if e.size() > 0 {
Ok(try!(e.to_bytes(&mut buffer[1..], length_stack)))
}
else { Ok(()) }
},
} }
} }
fn from_bytes(buffer: &[u8], length_stack: &mut VecDeque<usize>) -> Result<Self, BinaryConvertError> { fn from_bytes(buffer: &[u8], length_stack: &mut VecDeque<usize>) -> Result<Self, BinaryConvertError> {
match buffer[0] { match buffer[0] {
0 => Ok(Ok(try!(R::from_bytes(&buffer[1..], length_stack)))), 0 => {
match buffer.len() {
1 => Ok(Ok(try!(R::from_empty_bytes()))),
_ => Ok(Ok(try!(R::from_bytes(&buffer[1..], length_stack)))),
}
}
1 => Ok(Err(try!(E::from_bytes(&buffer[1..], length_stack)))), 1 => Ok(Err(try!(E::from_bytes(&buffer[1..], length_stack)))),
_ => Err(BinaryConvertError) _ => Err(BinaryConvertError)
} }
@ -154,6 +171,8 @@ impl<T> BinaryConvertable for Vec<T> where T: BinaryConvertable {
_ => 128, _ => 128,
}); });
if buffer.len() == 0 { return Ok(result); }
loop { loop {
let next_size = match T::len_params() { let next_size = match T::len_params() {
0 => mem::size_of::<T>(), 0 => mem::size_of::<T>(),
@ -300,8 +319,8 @@ pub fn deserialize_from<T, R>(r: &mut R) -> Result<T, BinaryConvertError>
let mut payload = Vec::new(); let mut payload = Vec::new();
try!(r.read_to_end(&mut payload).map_err(|_| BinaryConvertError)); try!(r.read_to_end(&mut payload).map_err(|_| BinaryConvertError));
let mut length_stack = VecDeque::<usize>::new();
let stack_len = try!(u64::from_bytes(&payload[0..8], &mut fake_stack)) as usize; let stack_len = try!(u64::from_bytes(&payload[0..8], &mut fake_stack)) as usize;
let mut length_stack = VecDeque::<usize>::with_capacity(stack_len);
if stack_len > 0 { if stack_len > 0 {
for idx in 0..stack_len { for idx in 0..stack_len {
@ -607,7 +626,7 @@ fn deserialize_simple_err() {
} }
#[test] #[test]
fn deserialize_opt_vec_in_out() { fn serialize_opt_vec_in_out() {
use std::io::{Cursor, SeekFrom, Seek}; use std::io::{Cursor, SeekFrom, Seek};
let mut buff = Cursor::new(Vec::new()); let mut buff = Cursor::new(Vec::new());
@ -619,3 +638,17 @@ fn deserialize_opt_vec_in_out() {
assert!(vec.is_none()); assert!(vec.is_none());
} }
#[test]
fn serialize_err_opt_vec_in_out() {
use std::io::{Cursor, SeekFrom, Seek};
let mut buff = Cursor::new(Vec::new());
let optional_vec: Result<Option<Vec<u8>>, u32> = Ok(None);
serialize_into(&optional_vec, &mut buff).unwrap();
buff.seek(SeekFrom::Start(0)).unwrap();
let vec = deserialize_from::<Result<Option<Vec<u8>>, u32>, _>(&mut buff).unwrap();
assert!(vec.is_ok());
}