dispatch buf and proper polling

This commit is contained in:
NikVolf 2016-04-05 12:08:42 +03:00
parent 4cde01d81a
commit 0d7e52ac6f
3 changed files with 139 additions and 89 deletions

View File

@ -145,14 +145,20 @@ struct Dispatch {
return_type_ty: Option<P<Ty>>, return_type_ty: Option<P<Ty>>,
} }
fn implement_dispatch_arm_invoke( // This is the expanded version of this:
//
// let invoke_serialize_stmt = quote_stmt!(cx, {
// ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap()
// });
//
// But the above does not allow comma-separated expressions for arbitrary number
// of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n
fn implement_dispatch_arm_invoke_stmt(
cx: &ExtCtxt, cx: &ExtCtxt,
builder: &aster::AstBuilder, builder: &aster::AstBuilder,
dispatch: &Dispatch, dispatch: &Dispatch,
) -> P<ast::Expr> ) -> ast::Stmt
{ {
let deserialize_expr = quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting"));
let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str());
let function_name = builder.id(dispatch.function_name.as_str()); let function_name = builder.id(dispatch.function_name.as_str());
let output_type_id = builder.id(dispatch.return_type_name.clone().unwrap().as_str()); let output_type_id = builder.id(dispatch.return_type_name.clone().unwrap().as_str());
@ -161,63 +167,71 @@ fn implement_dispatch_arm_invoke(
quote_expr!(cx, input. $arg_ident) quote_expr!(cx, input. $arg_ident)
}).collect::<Vec<P<ast::Expr>>>(); }).collect::<Vec<P<ast::Expr>>>();
// This is the expanded version of this: let ext_cx = &*cx;
// ::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts(
// let invoke_serialize_stmt = quote_stmt!(cx, { ext_cx.parse_sess(),
// ::bincode::serde::serialize(& $output_type_id { payload: self. $function_name ($hand_param_a, $hand_param_b) }, ::bincode::SizeLimit::Infinite).unwrap() ext_cx.cfg(),
// }); {
// let _sp = ext_cx.call_site();
// But the above does not allow comma-separated expressions for arbitrary number let mut tt = ::std::vec::Vec::new();
// of parameters ...$hand_param_a, $hand_param_b, ... $hand_param_n tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
let invoke_serialize_stmt = { tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
let ext_cx = &*cx; tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
::quasi::parse_stmt_panic(&mut ::syntax::parse::new_parser_from_tts( tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
ext_cx.parse_sess(), tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName)));
ext_cx.cfg(), tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
{ tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain)));
let _sp = ext_cx.call_site(); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
let mut tt = ::std::vec::Vec::new(); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace))); tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter());
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serde"), ::syntax::parse::token::ModName))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("serialize"), ::syntax::parse::token::Plain))); tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter());
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::BinOp(::syntax::parse::token::And)));
tt.extend(::quasi::ToTokens::to_tokens(&output_type_id, ext_cx).into_iter());
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Brace)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("payload"), ::syntax::parse::token::Plain)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Colon));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("self"), ::syntax::parse::token::Plain)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
tt.extend(::quasi::ToTokens::to_tokens(&function_name, ext_cx).into_iter());
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
for arg_expr in input_args_exprs { for arg_expr in input_args_exprs {
tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter()); tt.extend(::quasi::ToTokens::to_tokens(&arg_expr, ext_cx).into_iter());
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
}
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); }
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Comma));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("bincode"), ::syntax::parse::token::ModName)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot)); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("SizeLimit"), ::syntax::parse::token::ModName)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::ModSep));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("Infinite"), ::syntax::parse::token::Plain)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace))); tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
tt tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Dot));
})) tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::Ident(ext_cx.ident_of("unwrap"), ::syntax::parse::token::Plain)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::OpenDelim(::syntax::parse::token::Paren)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Paren)));
tt.push(::syntax::ast::TokenTree::Token(_sp, ::syntax::parse::token::CloseDelim(::syntax::parse::token::Brace)));
tt
})).unwrap()
}
fn implement_dispatch_arm_invoke(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
dispatch: &Dispatch,
buffer: bool,
) -> P<ast::Expr>
{
let deserialize_expr = if buffer {
quote_expr!(cx, ::bincode::serde::deserialize(buf).expect("ipc deserialization error, aborting"))
} else {
quote_expr!(cx, ::bincode::serde::deserialize_from(r, ::bincode::SizeLimit::Infinite).expect("ipc deserialization error, aborting"))
}; };
let input_type_id = builder.id(dispatch.input_type_name.clone().unwrap().as_str());
let invoke_serialize_stmt = implement_dispatch_arm_invoke_stmt(cx, builder, dispatch);
quote_expr!(cx, { quote_expr!(cx, {
let input: $input_type_id = $deserialize_expr; let input: $input_type_id = $deserialize_expr;
$invoke_serialize_stmt $invoke_serialize_stmt
@ -225,14 +239,31 @@ fn implement_dispatch_arm_invoke(
} }
/// generates dispatch match for method id /// generates dispatch match for method id
fn implement_dispatch_arm(cx: &ExtCtxt, builder: &aster::AstBuilder, index: u32, dispatch: &Dispatch) fn implement_dispatch_arm(
-> ast::Arm cx: &ExtCtxt,
builder: &aster::AstBuilder,
index: u32,
dispatch: &Dispatch,
buffer: bool,
) -> ast::Arm
{ {
let index_ident = builder.id(format!("{}", index).as_str()); let index_ident = builder.id(format!("{}", index).as_str());
let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch); let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer);
quote_arm!(cx, $index_ident => { $invoke_expr } ) quote_arm!(cx, $index_ident => { $invoke_expr } )
} }
fn implement_dispatch_arms(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
dispatches: &[Dispatch],
buffer: bool,
) -> Vec<ast::Arm>
{
let mut index = -1;
dispatches.iter()
.map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch, buffer) }).collect()
}
/// generates client type for specified server type /// generates client type for specified server type
/// for say `Service` it generates `ServiceClient` /// for say `Service` it generates `ServiceClient`
fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, item: &Item, push: &mut FnMut(Annotatable)) { fn push_client_struct(cx: &ExtCtxt, builder: &aster::AstBuilder, item: &Item, push: &mut FnMut(Annotatable)) {
@ -511,9 +542,9 @@ fn implement_interface(
dispatch_table.push(push_invoke_signature_aster(builder, &impl_item, signature, push)); dispatch_table.push(push_invoke_signature_aster(builder, &impl_item, signature, push));
} }
} }
let mut index = -1;
let dispatch_arms: Vec<_> = dispatch_table.iter() let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false);
.map(|dispatch| { index = index + 1; implement_dispatch_arm(cx, builder, index as u32, dispatch) }).collect(); let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true);
Ok((quote_item!(cx, Ok((quote_item!(cx,
impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause { impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause {
@ -532,11 +563,10 @@ fn implement_interface(
} }
} }
fn dispatch_buf<R>(&self, method_num: u16, r: &mut R) -> Vec<u8> fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>
where R: ::std::io::Read
{ {
match method_num { match method_num {
$dispatch_arms $dispatch_arms_buffered
_ => vec![] _ => vec![]
} }
} }

View File

@ -23,13 +23,11 @@ extern crate nanomsg;
pub use ipc::*; pub use ipc::*;
use std::sync::*; use std::sync::*;
use std::io::{Write, Read};
use nanomsg::{Socket, Protocol, Error, Endpoint}; use nanomsg::{Socket, Protocol, Error, Endpoint};
pub struct Worker<S> where S: IpcInterface<S> { pub struct Worker<S> where S: IpcInterface<S> {
service: Arc<S>, service: Arc<S>,
sockets: Vec<(Socket, Endpoint)>, sockets: Vec<(Socket, Endpoint)>,
method_buf: [u8;2],
} }
#[derive(Debug)] #[derive(Debug)]
@ -42,21 +40,26 @@ impl<S> Worker<S> where S: IpcInterface<S> {
Worker::<S> { Worker::<S> {
service: service.clone(), service: service.clone(),
sockets: Vec::new(), sockets: Vec::new(),
method_buf: [0,0]
} }
} }
pub fn poll(&mut self) { pub fn poll(&mut self) {
for item in self.sockets.iter_mut() { for item in self.sockets.iter_mut() {
let socket = &mut item.0; let socket = &mut item.0;
let mut buf = Vec::new();
// non-blocking read only ok if there is something to read from socket // non-blocking read only ok if there is something to read from socket
match socket.nb_read(&mut self.method_buf) { match socket.nb_read_to_end(&mut buf) {
Ok(method_sign_len) => { Ok(method_sign_len) => {
if method_sign_len == 2 { if method_sign_len >= 2 {
let result = self.service.dispatch_buf( // method_num
self.method_buf[1] as u16 * 256 + self.method_buf[0] as u16, let method_num = buf[1] as u16 * 256 + buf[0] as u16;
socket); // payload
if let Err(e) = socket.write(&result) { let payload = &buf[2..];
// dispatching for ipc interface
let result = self.service.dispatch_buf(method_num, payload);
if let Err(e) = socket.nb_write(&result) {
warn!(target: "ipc", "Failed to write response: {:?}", e); warn!(target: "ipc", "Failed to write response: {:?}", e);
} }
} }
@ -67,7 +70,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
Err(Error::TryAgain) => { Err(Error::TryAgain) => {
}, },
Err(x) => { Err(x) => {
warn!(target: "ipc", "Error polling connection {:?}", x); warn!(target: "ipc", "Error polling connections {:?}", x);
panic!(); panic!();
} }
} }
@ -97,8 +100,7 @@ mod tests {
use ipc::*; use ipc::*;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use nanomsg::{Socket, Protocol}; use nanomsg::{Socket, Protocol, Endpoint};
use std::thread;
struct TestInvoke { struct TestInvoke {
method_num: u16, method_num: u16,
@ -119,23 +121,22 @@ mod tests {
fn dispatch<R>(&self, _r: &mut R) -> Vec<u8> where R: Read { fn dispatch<R>(&self, _r: &mut R) -> Vec<u8> where R: Read {
vec![] vec![]
} }
fn dispatch_buf<R>(&self, method_num: u16, r: &mut R) -> Vec<u8> where R: Read { fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8> {
let mut buf = vec![0u8; 4096];
let size = r.read_to_end(&mut buf).unwrap();
self.methods_stack.write().unwrap().push( self.methods_stack.write().unwrap().push(
TestInvoke { TestInvoke {
method_num: method_num, method_num: method_num,
params: unsafe { Vec::from_raw_parts(buf.as_mut_ptr(), size, size) } params: buf.to_vec(),
}); });
vec![] vec![]
} }
} }
fn dummy_write(addr: &str, buf: &[u8]) { fn dummy_write(addr: &str, buf: &[u8]) -> (Socket, Endpoint) {
let mut socket = Socket::new(Protocol::Pair).unwrap(); let mut socket = Socket::new(Protocol::Pair).unwrap();
let endpoint = socket.connect(addr).unwrap(); let endpoint = socket.connect(addr).unwrap();
thread::sleep_ms(10); //thread::sleep_ms(10);
socket.write_all(buf).unwrap(); socket.write(buf).unwrap();
(socket, endpoint)
} }
#[test] #[test]
@ -167,9 +168,28 @@ mod tests {
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new())); let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
worker.add_duplex(url).unwrap(); worker.add_duplex(url).unwrap();
dummy_write(url, &vec![0, 0, 7, 7, 6, 6]); let (_socket, _endpoint) = dummy_write(url, &vec![0, 0, 7, 7, 6, 6]);
worker.poll(); for _ in 0..1000 { worker.poll(); }
assert_eq!(1, worker.service.methods_stack.read().unwrap().len()); assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
assert_eq!([7, 7, 6, 6], worker.service.methods_stack.read().unwrap()[0].params[..]);
}
#[test]
fn worker_can_poll_long() {
let url = "ipc:///tmp/parity-test30.ipc";
let mut worker = Worker::<DummyService>::new(Arc::new(DummyService::new()));
worker.add_duplex(url).unwrap();
let message = [0u8; 1024*1024];
let (_socket, _endpoint) = dummy_write(url, &message);
for _ in 0..10000 { worker.poll(); }
assert_eq!(1, worker.service.methods_stack.read().unwrap().len());
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params);
} }
} }

View File

@ -26,7 +26,7 @@ pub trait IpcInterface<T> {
/// deserialize the payload from the io `r` and invokes method specified by `method_num` /// deserialize the payload from the io `r` and invokes method specified by `method_num`
/// (for non-blocking io) /// (for non-blocking io)
fn dispatch_buf<R>(&self, method_num: u16, r: &mut R) -> Vec<u8> where R: Read; fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>;
} }
/// serializes method invocation (method_num and parameters) to the stream specified by `w` /// serializes method invocation (method_num and parameters) to the stream specified by `w`