Merge pull request #928 from ethcore/ipc-handshake

IPC handshake (negotiating protocol/api version)
This commit is contained in:
Nikolay Volf 2016-04-12 14:19:35 +03:00
commit 41f15929b9
8 changed files with 171 additions and 19 deletions

View File

@ -13,7 +13,6 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use aster; use aster;
use syntax::ast::{ use syntax::ast::{
@ -36,6 +35,8 @@ use syntax::ptr::P;
pub struct Error; pub struct Error;
const RESERVED_MESSAGE_IDS: u16 = 16;
pub fn expand_ipc_implementation( pub fn expand_ipc_implementation(
cx: &mut ExtCtxt, cx: &mut ExtCtxt,
span: Span, span: Span,
@ -59,10 +60,24 @@ pub fn expand_ipc_implementation(
}; };
push_client(cx, &builder, &item, &dispatches, push); push_client(cx, &builder, &item, &dispatches, push);
push_handshake_struct(cx, push);
push(Annotatable::Item(impl_item)) push(Annotatable::Item(impl_item))
} }
fn push_handshake_struct(cx: &ExtCtxt, push: &mut FnMut(Annotatable)) {
let handshake_item = quote_item!(cx,
#[derive(Serialize, Deserialize)]
pub struct BinHandshake {
api_version: String,
protocol_version: String,
_reserved: Vec<u8>,
}
).unwrap();
push(Annotatable::Item(handshake_item));
}
fn field_name(builder: &aster::AstBuilder, arg: &Arg) -> ast::Ident { fn field_name(builder: &aster::AstBuilder, arg: &Arg) -> ast::Ident {
match arg.pat.node { match arg.pat.node {
PatKind::Ident(_, ref ident, _) => builder.id(ident.node), PatKind::Ident(_, ref ident, _) => builder.id(ident.node),
@ -247,7 +262,7 @@ fn implement_dispatch_arm(
buffer: bool, buffer: bool,
) -> ast::Arm ) -> ast::Arm
{ {
let index_ident = builder.id(format!("{}", index).as_str()); let index_ident = builder.id(format!("{}", index + (RESERVED_MESSAGE_IDS as u32)).as_str());
let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer); let invoke_expr = implement_dispatch_arm_invoke(cx, builder, dispatch, buffer);
quote_arm!(cx, $index_ident => { $invoke_expr } ) quote_arm!(cx, $index_ident => { $invoke_expr } )
} }
@ -385,7 +400,7 @@ fn implement_client_method_body(
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, let serialized_payload = ::bincode::serde::serialize(&payload, ::bincode::SizeLimit::Infinite).unwrap())); quote_stmt!(cx, let serialized_payload = ::bincode::serde::serialize(&payload, ::bincode::SizeLimit::Infinite).unwrap()));
let index_ident = builder.id(format!("{}", index).as_str()); let index_ident = builder.id(format!("{}", index + RESERVED_MESSAGE_IDS).as_str());
request_serialization_statements.push( request_serialization_statements.push(
quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut socket))); quote_stmt!(cx, ::ipc::invoke($index_ident, &Some(serialized_payload), &mut socket)));
@ -397,21 +412,18 @@ fn implement_client_method_body(
vec![] vec![]
}; };
let wait_result_stmt = quote_stmt!(cx, while !socket.ready().load(::std::sync::atomic::Ordering::Relaxed) { });
if let Some(ref return_ty) = dispatch.return_type_ty { if let Some(ref return_ty) = dispatch.return_type_ty {
let return_expr = quote_expr!(cx, let return_expr = quote_expr!(cx,
::bincode::serde::deserialize_from::<_, $return_ty>(&mut socket, ::bincode::SizeLimit::Infinite).unwrap() ::bincode::serde::deserialize_from::<_, $return_ty>(&mut socket, ::bincode::SizeLimit::Infinite).unwrap()
); );
quote_expr!(cx, { quote_expr!(cx, {
$request $request
$wait_result_stmt
$return_expr $return_expr
}) })
} }
else { else {
quote_expr!(cx, { quote_expr!(cx, {
$request $request
$wait_result_stmt
}) })
} }
} }
@ -487,6 +499,7 @@ fn push_client_implementation(
.collect::<Vec<P<ast::ImplItem>>>(); .collect::<Vec<P<ast::ImplItem>>>();
let client_ident = builder.id(format!("{}Client", item.ident.name.as_str())); let client_ident = builder.id(format!("{}Client", item.ident.name.as_str()));
let item_ident = builder.id(format!("{}", item.ident.name.as_str()));
let implement = quote_item!(cx, let implement = quote_item!(cx,
impl<S> $client_ident<S> where S: ::ipc::IpcSocket { impl<S> $client_ident<S> where S: ::ipc::IpcSocket {
pub fn new(socket: S) -> $client_ident<S> { pub fn new(socket: S) -> $client_ident<S> {
@ -496,6 +509,30 @@ fn push_client_implementation(
} }
} }
pub fn handshake(&self) -> Result<(), ::ipc::Error> {
let payload = BinHandshake {
protocol_version: $item_ident::protocol_version().to_string(),
api_version: $item_ident::api_version().to_string(),
_reserved: vec![0u8, 64],
};
let mut socket_ref = self.socket.borrow_mut();
let mut socket = socket_ref.deref_mut();
::ipc::invoke(
0,
&Some(::bincode::serde::serialize(&payload, ::bincode::SizeLimit::Infinite).unwrap()),
&mut socket);
let mut result = vec![0u8; 1];
if try!(socket.read(&mut result).map_err(|_| ::ipc::Error::HandshakeFailed)) == 1 {
match result[0] {
1 => Ok(()),
_ => Err(::ipc::Error::RemoteServiceUnsupported),
}
}
else { Err(::ipc::Error::HandshakeFailed) }
}
#[cfg(test)] #[cfg(test)]
pub fn socket(&self) -> &::std::cell::RefCell<S> { pub fn socket(&self) -> &::std::cell::RefCell<S> {
&self.socket &self.socket
@ -507,6 +544,38 @@ fn push_client_implementation(
push(Annotatable::Item(implement)); push(Annotatable::Item(implement));
} }
/// implements dispatching of system handshake invocation (method_num 0)
fn implement_handshake_arm(
cx: &ExtCtxt,
) -> (ast::Arm, ast::Arm)
{
let handshake_deserialize = quote_stmt!(&cx,
let handshake_payload = ::bincode::serde::deserialize_from::<_, BinHandshake>(r, ::bincode::SizeLimit::Infinite).unwrap();
);
let handshake_deserialize_buf = quote_stmt!(&cx,
let handshake_payload = ::bincode::serde::deserialize::<BinHandshake>(buf).unwrap();
);
let handshake_serialize = quote_expr!(&cx,
::bincode::serde::serialize::<bool>(&Self::handshake(&::ipc::Handshake {
api_version: ::semver::Version::parse(&handshake_payload.api_version).unwrap(),
protocol_version: ::semver::Version::parse(&handshake_payload.protocol_version).unwrap(),
}), ::bincode::SizeLimit::Infinite).unwrap()
);
(
quote_arm!(&cx, 0 => {
$handshake_deserialize
$handshake_serialize
}),
quote_arm!(&cx, 0 => {
$handshake_deserialize_buf
$handshake_serialize
}),
)
}
/// implements `IpcInterface<C>` for the given class `C` /// implements `IpcInterface<C>` for the given class `C`
fn implement_interface( fn implement_interface(
cx: &ExtCtxt, cx: &ExtCtxt,
@ -546,6 +615,8 @@ fn implement_interface(
let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false); let dispatch_arms = implement_dispatch_arms(cx, builder, &dispatch_table, false);
let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true); let dispatch_arms_buffered = implement_dispatch_arms(cx, builder, &dispatch_table, true);
let (handshake_arm, handshake_arm_buf) = implement_handshake_arm(cx);
Ok((quote_item!(cx, Ok((quote_item!(cx,
impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause { impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause {
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> fn dispatch<R>(&self, r: &mut R) -> Vec<u8>
@ -557,7 +628,11 @@ fn implement_interface(
Err(e) => { panic!("ipc read error: {:?}, aborting", e); } Err(e) => { panic!("ipc read error: {:?}, aborting", e); }
_ => { } _ => { }
} }
match method_num[0] as u16 + (method_num[1] as u16)*256 { // method_num is a 16-bit little-endian unsigned number
match method_num[1] as u16 + (method_num[0] as u16)*256 {
// handshake
$handshake_arm
// user methods
$dispatch_arms $dispatch_arms
_ => vec![] _ => vec![]
} }
@ -566,6 +641,7 @@ fn implement_interface(
fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8> fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>
{ {
match method_num { match method_num {
$handshake_arm_buf
$dispatch_arms_buffered $dispatch_arms_buffered
_ => vec![] _ => vec![]
} }

View File

@ -8,3 +8,4 @@ license = "GPL-3.0"
[dependencies] [dependencies]
ethcore-devtools = { path = "../../devtools" } ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0"

View File

@ -19,12 +19,41 @@
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::marker::Sync; use std::marker::Sync;
use std::sync::atomic::*; use std::sync::atomic::*;
use semver::Version;
pub trait IpcInterface<T> { pub struct Handshake {
pub protocol_version: Version,
pub api_version: Version,
}
pub trait IpcConfig {
fn api_version() -> Version {
Version::parse("1.0.0").unwrap()
}
fn protocol_version() -> Version {
Version::parse("1.0.0").unwrap()
}
fn handshake(handshake: &Handshake) -> bool {
handshake.protocol_version == Self::protocol_version() &&
handshake.api_version == Self::api_version()
}
}
#[derive(Debug)]
pub enum Error {
UnkownSystemCall,
ClientUnsupported,
RemoteServiceUnsupported,
HandshakeFailed,
}
pub trait IpcInterface<T> where T: IpcConfig {
/// reads the message from io, dispatches the call and returns serialized result /// reads the message from io, dispatches the call and returns serialized result
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read; fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
/// deserialize the payload from buffer, dispatches invoke and returns serialized result /// deserializes the payload from buffer, dispatches invoke and returns serialized result
/// (for non-blocking io) /// (for non-blocking io)
fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>; fn dispatch_buf(&self, method_num: u16, buf: &[u8]) -> Vec<u8>;
} }
@ -52,11 +81,7 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
/// IpcSocket /// IpcSocket
pub trait IpcSocket: Read + Write + Sync { pub trait IpcSocket: Read + Write + Sync {
fn ready(&self) -> AtomicBool;
} }
impl IpcSocket for ::devtools::TestSocket { impl IpcSocket for ::devtools::TestSocket {
fn ready(&self) -> AtomicBool {
AtomicBool::new(true)
}
} }

View File

@ -17,6 +17,7 @@
//! IPC RPC interface //! IPC RPC interface
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;
extern crate semver;
pub mod interface; pub mod interface;
pub use interface::{IpcInterface, IpcSocket, invoke}; pub use interface::{IpcInterface, IpcSocket, invoke, IpcConfig, Handshake, Error};

View File

@ -12,6 +12,7 @@ path = "run.rs"
bincode = "*" bincode = "*"
serde = "0.7.0" serde = "0.7.0"
ethcore-devtools = { path = "../../devtools" } ethcore-devtools = { path = "../../devtools" }
semver = "0.2.0"
[build-dependencies] [build-dependencies]
syntex = "0.30.0" syntex = "0.30.0"

View File

@ -20,11 +20,12 @@ mod tests {
use super::super::service::*; use super::super::service::*;
use ipc::*; use ipc::*;
use devtools::*; use devtools::*;
use semver::Version;
#[test] #[test]
fn call_service() { fn call_service() {
// method_num = 0, f = 10 (method Service::commit) // method_num = 0, f = 10 (method Service::commit)
let mut socket = TestSocket::new_ready(vec![0, 0, 0, 0, 0, 10]); let mut socket = TestSocket::new_ready(vec![0, 16, 0, 0, 0, 10]);
let service = Service::new(); let service = Service::new();
assert_eq!(0, *service.commits.read().unwrap()); assert_eq!(0, *service.commits.read().unwrap());
@ -34,27 +35,70 @@ mod tests {
assert_eq!(10, *service.commits.read().unwrap()); assert_eq!(10, *service.commits.read().unwrap());
} }
#[test] #[test]
fn call_service_proxy() { fn call_service_handshake() {
let mut socket = TestSocket::new_ready(vec![0, 0,
// protocol version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// api version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// reserved
0, 0, 0, 0, 0, 0, 0, 64,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]);
let service = Service::new();
let result = service.dispatch(&mut socket);
// single `true`
assert_eq!(vec![1], result);
}
#[test]
fn call_service_client() {
let mut socket = TestSocket::new(); let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10]; socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket); let service_client = ServiceClient::new(socket);
let result = service_client.commit(5); let result = service_client.commit(5);
assert_eq!(vec![0, 0, 0, 0, 0, 5], service_client.socket().borrow().write_buffer.clone()); assert_eq!(vec![0, 16, 0, 0, 0, 5], service_client.socket().borrow().write_buffer.clone());
assert_eq!(10, result); assert_eq!(10, result);
} }
#[test] #[test]
fn call_service_proxy_optional() { fn call_service_client_optional() {
let mut socket = TestSocket::new(); let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10]; socket.read_buffer = vec![0, 0, 0, 10];
let service_client = ServiceClient::new(socket); let service_client = ServiceClient::new(socket);
let result = service_client.rollback(Some(5), 10); let result = service_client.rollback(Some(5), 10);
assert_eq!(vec![0, 1, 1, 0, 0, 0, 5, 0, 0, 0, 10], service_client.socket().borrow().write_buffer.clone()); assert_eq!(vec![0, 17, 1, 0, 0, 0, 5, 0, 0, 0, 10], service_client.socket().borrow().write_buffer.clone());
assert_eq!(10, result); assert_eq!(10, result);
} }
#[test]
fn query_default_version() {
let ver = Service::protocol_version();
assert_eq!(ver, Version::parse("1.0.0").unwrap());
let ver = Service::api_version();
assert_eq!(ver, Version::parse("1.0.0").unwrap());
}
#[test]
fn call_service_client_handshake() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![1];
let service_client = ServiceClient::new(socket);
let result = service_client.handshake();
assert!(result.is_ok());
}
} }

View File

@ -18,6 +18,7 @@ extern crate bincode;
extern crate ethcore_ipc as ipc; extern crate ethcore_ipc as ipc;
extern crate serde; extern crate serde;
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;
extern crate semver;
pub mod service; pub mod service;
mod examples; mod examples;

View File

@ -16,6 +16,7 @@
use std::sync::RwLock; use std::sync::RwLock;
use std::ops::*; use std::ops::*;
use ipc::IpcConfig;
pub struct Service { pub struct Service {
pub commits: RwLock<usize>, pub commits: RwLock<usize>,
@ -45,3 +46,5 @@ impl Service {
} }
} }
} }
impl ::ipc::IpcConfig for Service {}