Util & ipc clenup (#1807)

* removed frombytes stuff

* removed jsonrpc handler from nano
This commit is contained in:
Nikolay Volf 2016-08-02 17:02:47 +03:00 committed by Gav Wood
parent b165059327
commit 1b507e0147
4 changed files with 1 additions and 426 deletions

1
Cargo.lock generated
View File

@ -352,7 +352,6 @@ name = "ethcore-ipc-nano"
version = "1.3.0"
dependencies = [
"ethcore-ipc 1.3.0",
"jsonrpc-core 2.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
]

View File

@ -7,7 +7,6 @@ license = "GPL-3.0"
[features]
[dependencies]
jsonrpc-core = "2.0"
ethcore-ipc = { path = "../rpc" }
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
log = "0.3"

View File

@ -19,14 +19,11 @@
extern crate ethcore_ipc as ipc;
extern crate nanomsg;
#[macro_use] extern crate log;
extern crate jsonrpc_core;
use jsonrpc_core::IoHandler;
pub use ipc::{WithSocket, IpcInterface, IpcConfig};
pub use nanomsg::Socket as NanoSocket;
use std::sync::*;
use std::sync::atomic::*;
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
use std::ops::Deref;
@ -218,149 +215,14 @@ impl<S: ?Sized> Worker<S> where S: IpcInterface {
}
}
/// Error in handling JSON RPC request
pub enum IoHandlerError {
BadRequest,
HandlerError,
}
/// Worker to handle JSON RPC requests
pub struct IoHandlerWorker {
handler: Arc<IoHandler>,
socket: Socket,
_endpoint: Endpoint,
poll: Vec<PollFd>,
buf: Vec<u8>,
}
/// IPC server for json-rpc handler (single thread)
pub struct IoHandlerServer {
is_stopping: Arc<AtomicBool>,
is_stopped: Arc<AtomicBool>,
handler: Arc<IoHandler>,
socket_addr: String,
}
impl IoHandlerServer {
/// New IPC server for JSON RPC `handler` and ipc socket address `socket_addr`
pub fn new(handler: &Arc<IoHandler>, socket_addr: &str) -> IoHandlerServer {
IoHandlerServer {
handler: handler.clone(),
is_stopping: Arc::new(AtomicBool::new(false)),
is_stopped: Arc::new(AtomicBool::new(true)),
socket_addr: socket_addr.to_owned(),
}
}
/// IPC Server starts (non-blocking, in seprate thread)
pub fn start(&self) -> Result<(), SocketError> {
let mut worker = try!(IoHandlerWorker::new(&self.handler, &self.socket_addr));
self.is_stopping.store(false, Ordering::Relaxed);
let worker_is_stopping = self.is_stopping.clone();
let worker_is_stopped = self.is_stopped.clone();
::std::thread::spawn(move || {
worker_is_stopped.store(false, Ordering::Relaxed);
while !worker_is_stopping.load(Ordering::Relaxed) {
worker.poll()
}
worker_is_stopped.store(true, Ordering::Relaxed);
});
Ok(())
}
/// IPC server stop (func will wait until effective stop)
pub fn stop(&self) {
self.is_stopping.store(true, Ordering::Relaxed);
while !self.is_stopped.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
}
impl Drop for IoHandlerServer {
fn drop(&mut self) {
self.stop()
}
}
impl IoHandlerWorker {
pub fn new(handler: &Arc<IoHandler>, socket_addr: &str) -> Result<IoHandlerWorker, SocketError> {
let mut socket = try!(Socket::new(Protocol::Rep).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::RequestLink
}));
let endpoint = try!(socket.bind(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::RequestLink
}));
let poll = vec![socket.new_pollfd(PollInOut::In)];
Ok(IoHandlerWorker {
handler: handler.clone(),
socket: socket,
_endpoint: endpoint,
poll: poll,
buf: Vec::with_capacity(1024),
})
}
pub fn poll(&mut self) {
let mut request = PollRequest::new(&mut self.poll[..]);
let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
let fd = request.get_fds()[0]; // guaranteed to exist and be the only one
// because contains only immutable socket field as a member
if !fd.can_read() {
return;
}
unsafe { self.buf.set_len(0); }
match self.socket.nb_read_to_end(&mut self.buf) {
Ok(0) => {
warn!(target: "ipc", "RPC empty message received");
return;
},
Ok(_) => {
let rpc_msg = match String::from_utf8(self.buf.clone()) {
Ok(val) => val,
Err(e) => {
warn!(target: "ipc", "RPC decoding error (utf-8): {:?}", e);
return;
}
};
let response: Option<String> = self.handler.handle_request(&rpc_msg);
if let Some(response_str) = response {
let response_bytes = response_str.into_bytes();
if let Err(e) = self.socket.nb_write(&response_bytes) {
warn!(target: "ipc", "Failed to write response: {:?}", e);
}
}
},
Err(Error::TryAgain) => {
// no data
},
Err(x) => {
warn!(target: "ipc", "Error polling connections {:?}", x);
panic!("IPC RPC fatal error");
},
}
}
}
#[cfg(test)]
mod service_tests {
use super::{Worker, IoHandlerServer};
use super::Worker;
use ipc::*;
use std::io::{Read, Write};
use std::sync::{Arc, RwLock};
use nanomsg::{Socket, Protocol, Endpoint};
use jsonrpc_core;
use jsonrpc_core::{IoHandler, Value, Params, MethodCommand};
struct TestInvoke {
method_num: u16,
@ -400,15 +262,6 @@ mod service_tests {
(socket, endpoint)
}
fn dummy_request(addr: &str, buf: &[u8]) -> Vec<u8> {
let mut socket = Socket::new(Protocol::Req).unwrap();
let _endpoint = socket.connect(addr).unwrap();
socket.write(buf).unwrap();
let mut buf = Vec::new();
socket.read_to_end(&mut buf).unwrap();
buf
}
#[test]
fn can_create_worker() {
let worker = Worker::<DummyService>::new(&Arc::new(DummyService::new()));
@ -462,29 +315,4 @@ mod service_tests {
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params);
}
#[test]
fn test_jsonrpc_handler() {
let url = "ipc:///tmp/parity-test50.ipc";
struct SayHello;
impl MethodCommand for SayHello {
fn execute(&self, _params: Params) -> Result<Value, jsonrpc_core::Error> {
Ok(Value::String("hello".to_string()))
}
}
let io = Arc::new(IoHandler::new());
io.add_method("say_hello", SayHello);
let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
let server = IoHandlerServer::new(&io, url);
server.start().unwrap();
assert_eq!(String::from_utf8(dummy_request(url, request.as_bytes())).unwrap(), response.to_string());
server.stop();
}
}

View File

@ -35,10 +35,7 @@
use std::fmt;
use std::slice;
use std::ops::{Deref, DerefMut};
use hash::FixedHash;
use elastic_array::*;
use std::mem;
use std::cmp::Ordering;
/// Vector like object
pub trait VecLike<T> {
@ -230,213 +227,6 @@ impl<T> Populatable for [T] where T: Sized {
}
}
#[derive(Debug)]
/// Bytes array deserialization error
pub enum FromBytesError {
/// Not enough bytes for the requested type
NotLongEnough,
/// Too many bytes for the requested type
TooLong,
/// Invalid marker for (enums)
UnknownMarker,
}
/// Value that can be serialized from bytes array
pub trait FromRawBytes: Sized {
/// function that will instantiate and initialize object from slice
fn from_bytes(d: &[u8]) -> Result<Self, FromBytesError>;
}
impl<T> FromRawBytes for T where T: FixedHash {
fn from_bytes(bytes: &[u8]) -> Result<Self, FromBytesError> {
match bytes.len().cmp(&mem::size_of::<T>()) {
Ordering::Less => return Err(FromBytesError::NotLongEnough),
Ordering::Greater => return Err(FromBytesError::TooLong),
Ordering::Equal => ()
};
let mut res = T::zero();
res.copy_raw(bytes);
Ok(res)
}
}
#[macro_export]
macro_rules! sized_binary_map {
($target_ty: ident) => {
impl FromRawBytes for $target_ty {
fn from_bytes(bytes: &[u8]) -> Result<Self, FromBytesError> {
match bytes.len().cmp(&::std::mem::size_of::<$target_ty>()) {
::std::cmp::Ordering::Less => return Err(FromBytesError::NotLongEnough),
::std::cmp::Ordering::Greater => return Err(FromBytesError::TooLong),
::std::cmp::Ordering::Equal => ()
};
let mut res: Self = 0;
res.copy_raw(bytes);
Ok(res)
}
}
impl ToBytesWithMap for $target_ty {
fn to_bytes_map(&self) -> Vec<u8> {
let sz = ::std::mem::size_of::<$target_ty>();
let mut res = Vec::<u8>::with_capacity(sz);
let ip: *const $target_ty = self;
let ptr: *const u8 = ip as *const _;
unsafe {
res.set_len(sz);
::std::ptr::copy(ptr, res.as_mut_ptr(), sz);
}
res
}
}
}
}
sized_binary_map!(u16);
sized_binary_map!(u32);
sized_binary_map!(u64);
/// Value that can be serialized from variable-length byte array
pub trait FromRawBytesVariable: Sized {
/// Create value from slice
fn from_bytes_variable(bytes: &[u8]) -> Result<Self, FromBytesError>;
}
impl<T> FromRawBytesVariable for T where T: FromRawBytes {
fn from_bytes_variable(bytes: &[u8]) -> Result<Self, FromBytesError> {
match bytes.len().cmp(&mem::size_of::<T>()) {
Ordering::Less => return Err(FromBytesError::NotLongEnough),
Ordering::Greater => return Err(FromBytesError::TooLong),
Ordering::Equal => ()
};
T::from_bytes(bytes)
}
}
impl FromRawBytesVariable for String {
fn from_bytes_variable(bytes: &[u8]) -> Result<String, FromBytesError> {
Ok(::std::str::from_utf8(bytes).unwrap().to_owned())
}
}
impl<T> FromRawBytesVariable for Vec<T> where T: FromRawBytes {
fn from_bytes_variable(bytes: &[u8]) -> Result<Self, FromBytesError> {
let size_of_t = mem::size_of::<T>();
let length_in_chunks = bytes.len() / size_of_t;
let mut result = Vec::with_capacity(length_in_chunks);
unsafe { result.set_len(length_in_chunks) };
for i in 0..length_in_chunks {
*result.get_mut(i).unwrap() = try!(T::from_bytes(
&bytes[size_of_t * i..size_of_t * (i+1)]))
}
Ok(result)
}
}
impl<V1, T2> FromRawBytes for (V1, T2) where V1: FromRawBytesVariable, T2: FromRawBytes {
fn from_bytes(bytes: &[u8]) -> Result<Self, FromBytesError> {
let header = 8usize;
let mut map: (u64, ) = (0,);
if bytes.len() < header { return Err(FromBytesError::NotLongEnough); }
map.copy_raw(&bytes[0..header]);
Ok((
try!(V1::from_bytes_variable(&bytes[header..header + (map.0 as usize)])),
try!(T2::from_bytes(&bytes[header + (map.0 as usize)..bytes.len()])),
))
}
}
impl<V1, V2, T3> FromRawBytes for (V1, V2, T3)
where V1: FromRawBytesVariable,
V2: FromRawBytesVariable,
T3: FromRawBytes
{
fn from_bytes(bytes: &[u8]) -> Result<Self, FromBytesError> {
let header = 16usize;
let mut map: (u64, u64, ) = (0, 0,);
if bytes.len() < header { return Err(FromBytesError::NotLongEnough); }
map.copy_raw(&bytes[0..header]);
let map_1 = (header, header + map.0 as usize);
let map_2 = (map_1.1 as usize, map_1.1 as usize + map.1 as usize);
Ok((
try!(V1::from_bytes_variable(&bytes[map_1.0..map_1.1])),
try!(V2::from_bytes_variable(&bytes[map_2.0..map_2.1])),
try!(T3::from_bytes(&bytes[map_2.1..bytes.len()])),
))
}
}
impl<'a, V1, X1, T2> ToBytesWithMap for (X1, &'a T2) where V1: ToBytesWithMap, X1: Deref<Target=[V1]>, T2: ToBytesWithMap {
fn to_bytes_map(&self) -> Vec<u8> {
let header = 8usize;
let v1_size = mem::size_of::<V1>();
let mut result = Vec::with_capacity(header + self.0.len() * v1_size + mem::size_of::<T2>());
result.extend(((self.0.len() * v1_size) as u64).to_bytes_map());
for i in 0..self.0.len() {
result.extend(self.0[i].to_bytes_map());
}
result.extend(self.1.to_bytes_map());
result
}
}
impl<'a, V1, X1, V2, X2, T3> ToBytesWithMap for (X1, X2, &'a T3)
where V1: ToBytesWithMap, X1: Deref<Target=[V1]>,
V2: ToBytesWithMap, X2: Deref<Target=[V2]>,
T3: ToBytesWithMap
{
fn to_bytes_map(&self) -> Vec<u8> {
let header = 16usize;
let v1_size = mem::size_of::<V1>();
let v2_size = mem::size_of::<V2>();
let mut result = Vec::with_capacity(
header +
self.0.len() * v1_size +
self.1.len() * v2_size +
mem::size_of::<T3>()
);
result.extend(((self.0.len() * v1_size) as u64).to_bytes_map());
result.extend(((self.1.len() * v2_size) as u64).to_bytes_map());
for i in 0..self.0.len() {
result.extend(self.0[i].to_bytes_map());
}
for i in 0..self.1.len() {
result.extend(self.1[i].to_bytes_map());
}
result.extend(self.2.to_bytes_map());
result
}
}
impl FromRawBytesVariable for Vec<u8> {
fn from_bytes_variable(bytes: &[u8]) -> Result<Vec<u8>, FromBytesError> {
Ok(bytes.to_vec())
}
}
/// Value that serializes directly to variable-sized byte array and stores map
pub trait ToBytesWithMap {
/// serialize to variable-sized byte array and store map
fn to_bytes_map(&self) -> Vec<u8>;
}
impl<T> ToBytesWithMap for T where T: FixedHash {
fn to_bytes_map(&self) -> Vec<u8> {
self.as_slice().to_owned()
}
}
#[test]
fn fax_raw() {
let mut x = [255u8; 4];
@ -488,44 +278,3 @@ fn populate_big_types() {
h.copy_raw_from(&a);
assert_eq!(h, h256_from_hex("ffffffffffffffffffffffffffffffffffffffff000000000000000000000069"));
}
#[test]
fn raw_bytes_from_tuple() {
type Tup = (Vec<u16>, u16);
let tup: (&[u16], u16) = (&[1; 4], 10);
let bytes = vec![
// map
8u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
// four 1u16
1u8, 0u8,
1u8, 0u8,
1u8, 0u8,
1u8, 0u8,
// 10u16
10u8, 0u8];
let (v, x) = Tup::from_bytes(&bytes).unwrap();
assert_eq!(tup, (&v[..], x));
let tup_from = (v, x);
let tup_to = (tup_from.0, &tup_from.1);
let bytes_to = tup_to.to_bytes_map();
assert_eq!(bytes_to, bytes);
}
#[test]
fn bytes_map_from_triple() {
let data: (&[u16], &[u32], u64) = (&[2; 6], &[6; 3], 12u64);
let bytes_map = (data.0, data.1, &data.2).to_bytes_map();
assert_eq!(bytes_map, vec![
// data map 2 x u64
12, 0, 0, 0, 0, 0, 0, 0,
12, 0, 0, 0, 0, 0, 0, 0,
// vec![2u16; 6]
2, 0, 2, 0, 2, 0, 2, 0, 2, 0, 2, 0,
// vec![6u32; 3]
6, 0, 0, 0, 6, 0, 0, 0, 6, 0, 0, 0,
// 12u64
12, 0, 0, 0, 0, 0, 0, 0]);
}