From 1b507e0147c9cae55595a0f710257da1f6110716 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Tue, 2 Aug 2016 17:02:47 +0300 Subject: [PATCH] Util & ipc clenup (#1807) * removed frombytes stuff * removed jsonrpc handler from nano --- Cargo.lock | 1 - ipc/nano/Cargo.toml | 1 - ipc/nano/src/lib.rs | 174 +----------------------------- util/src/bytes.rs | 251 -------------------------------------------- 4 files changed, 1 insertion(+), 426 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94aabf26b..a346f1ab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,7 +352,6 @@ name = "ethcore-ipc-nano" version = "1.3.0" dependencies = [ "ethcore-ipc 1.3.0", - "jsonrpc-core 2.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", ] diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml index 4096ed443..a0e915767 100644 --- a/ipc/nano/Cargo.toml +++ b/ipc/nano/Cargo.toml @@ -7,7 +7,6 @@ license = "GPL-3.0" [features] [dependencies] -jsonrpc-core = "2.0" ethcore-ipc = { path = "../rpc" } nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" } log = "0.3" diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index ee5dd500d..4759217b0 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -19,14 +19,11 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; -extern crate jsonrpc_core; -use jsonrpc_core::IoHandler; pub use ipc::{WithSocket, IpcInterface, IpcConfig}; pub use nanomsg::Socket as NanoSocket; use std::sync::*; -use std::sync::atomic::*; use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; use std::ops::Deref; @@ -218,149 +215,14 @@ impl Worker where S: IpcInterface { } } -/// Error in handling JSON RPC request -pub enum IoHandlerError { - BadRequest, - HandlerError, -} - -/// Worker to handle JSON RPC requests -pub struct IoHandlerWorker { - handler: Arc, - socket: Socket, - _endpoint: Endpoint, - poll: Vec, - buf: Vec, -} - -/// IPC server for json-rpc handler (single thread) -pub struct IoHandlerServer { - is_stopping: Arc, - is_stopped: Arc, - handler: Arc, - socket_addr: String, -} - -impl IoHandlerServer { - /// New IPC server for JSON RPC `handler` and ipc socket address `socket_addr` - pub fn new(handler: &Arc, socket_addr: &str) -> IoHandlerServer { - IoHandlerServer { - handler: handler.clone(), - is_stopping: Arc::new(AtomicBool::new(false)), - is_stopped: Arc::new(AtomicBool::new(true)), - socket_addr: socket_addr.to_owned(), - } - } - - /// IPC Server starts (non-blocking, in seprate thread) - pub fn start(&self) -> Result<(), SocketError> { - let mut worker = try!(IoHandlerWorker::new(&self.handler, &self.socket_addr)); - self.is_stopping.store(false, Ordering::Relaxed); - let worker_is_stopping = self.is_stopping.clone(); - let worker_is_stopped = self.is_stopped.clone(); - - ::std::thread::spawn(move || { - worker_is_stopped.store(false, Ordering::Relaxed); - while !worker_is_stopping.load(Ordering::Relaxed) { - worker.poll() - } - worker_is_stopped.store(true, Ordering::Relaxed); - }); - - Ok(()) - } - - /// IPC server stop (func will wait until effective stop) - pub fn stop(&self) { - self.is_stopping.store(true, Ordering::Relaxed); - while !self.is_stopped.load(Ordering::Relaxed) { - std::thread::sleep(std::time::Duration::from_millis(50)); - } - } -} - -impl Drop for IoHandlerServer { - fn drop(&mut self) { - self.stop() - } -} - -impl IoHandlerWorker { - pub fn new(handler: &Arc, socket_addr: &str) -> Result { - let mut socket = try!(Socket::new(Protocol::Rep).map_err(|e| { - warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); - SocketError::RequestLink - })); - - let endpoint = try!(socket.bind(socket_addr).map_err(|e| { - warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); - SocketError::RequestLink - })); - - let poll = vec![socket.new_pollfd(PollInOut::In)]; - - Ok(IoHandlerWorker { - handler: handler.clone(), - socket: socket, - _endpoint: endpoint, - poll: poll, - buf: Vec::with_capacity(1024), - }) - } - - pub fn poll(&mut self) { - let mut request = PollRequest::new(&mut self.poll[..]); - let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); - let fd = request.get_fds()[0]; // guaranteed to exist and be the only one - // because contains only immutable socket field as a member - if !fd.can_read() { - return; - } - - unsafe { self.buf.set_len(0); } - match self.socket.nb_read_to_end(&mut self.buf) { - Ok(0) => { - warn!(target: "ipc", "RPC empty message received"); - return; - }, - Ok(_) => { - let rpc_msg = match String::from_utf8(self.buf.clone()) { - Ok(val) => val, - Err(e) => { - warn!(target: "ipc", "RPC decoding error (utf-8): {:?}", e); - return; - } - }; - let response: Option = self.handler.handle_request(&rpc_msg); - if let Some(response_str) = response { - let response_bytes = response_str.into_bytes(); - if let Err(e) = self.socket.nb_write(&response_bytes) { - warn!(target: "ipc", "Failed to write response: {:?}", e); - } - } - }, - Err(Error::TryAgain) => { - // no data - }, - Err(x) => { - warn!(target: "ipc", "Error polling connections {:?}", x); - panic!("IPC RPC fatal error"); - }, - } - } - -} - #[cfg(test)] mod service_tests { - use super::{Worker, IoHandlerServer}; + use super::Worker; use ipc::*; use std::io::{Read, Write}; use std::sync::{Arc, RwLock}; use nanomsg::{Socket, Protocol, Endpoint}; - use jsonrpc_core; - use jsonrpc_core::{IoHandler, Value, Params, MethodCommand}; struct TestInvoke { method_num: u16, @@ -400,15 +262,6 @@ mod service_tests { (socket, endpoint) } - fn dummy_request(addr: &str, buf: &[u8]) -> Vec { - let mut socket = Socket::new(Protocol::Req).unwrap(); - let _endpoint = socket.connect(addr).unwrap(); - socket.write(buf).unwrap(); - let mut buf = Vec::new(); - socket.read_to_end(&mut buf).unwrap(); - buf - } - #[test] fn can_create_worker() { let worker = Worker::::new(&Arc::new(DummyService::new())); @@ -462,29 +315,4 @@ mod service_tests { assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params); } - - #[test] - fn test_jsonrpc_handler() { - let url = "ipc:///tmp/parity-test50.ipc"; - - struct SayHello; - impl MethodCommand for SayHello { - fn execute(&self, _params: Params) -> Result { - Ok(Value::String("hello".to_string())) - } - } - - let io = Arc::new(IoHandler::new()); - io.add_method("say_hello", SayHello); - - let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#; - - let server = IoHandlerServer::new(&io, url); - server.start().unwrap(); - - assert_eq!(String::from_utf8(dummy_request(url, request.as_bytes())).unwrap(), response.to_string()); - - server.stop(); - } } diff --git a/util/src/bytes.rs b/util/src/bytes.rs index dfd21fd90..ef2156b1a 100644 --- a/util/src/bytes.rs +++ b/util/src/bytes.rs @@ -35,10 +35,7 @@ use std::fmt; use std::slice; use std::ops::{Deref, DerefMut}; -use hash::FixedHash; use elastic_array::*; -use std::mem; -use std::cmp::Ordering; /// Vector like object pub trait VecLike { @@ -230,213 +227,6 @@ impl Populatable for [T] where T: Sized { } } -#[derive(Debug)] -/// Bytes array deserialization error -pub enum FromBytesError { - /// Not enough bytes for the requested type - NotLongEnough, - /// Too many bytes for the requested type - TooLong, - /// Invalid marker for (enums) - UnknownMarker, -} - -/// Value that can be serialized from bytes array -pub trait FromRawBytes: Sized { - /// function that will instantiate and initialize object from slice - fn from_bytes(d: &[u8]) -> Result; -} - -impl FromRawBytes for T where T: FixedHash { - fn from_bytes(bytes: &[u8]) -> Result { - match bytes.len().cmp(&mem::size_of::()) { - Ordering::Less => return Err(FromBytesError::NotLongEnough), - Ordering::Greater => return Err(FromBytesError::TooLong), - Ordering::Equal => () - }; - - let mut res = T::zero(); - res.copy_raw(bytes); - Ok(res) - } -} - -#[macro_export] -macro_rules! sized_binary_map { - ($target_ty: ident) => { - impl FromRawBytes for $target_ty { - fn from_bytes(bytes: &[u8]) -> Result { - match bytes.len().cmp(&::std::mem::size_of::<$target_ty>()) { - ::std::cmp::Ordering::Less => return Err(FromBytesError::NotLongEnough), - ::std::cmp::Ordering::Greater => return Err(FromBytesError::TooLong), - ::std::cmp::Ordering::Equal => () - }; - let mut res: Self = 0; - res.copy_raw(bytes); - Ok(res) - } - } - impl ToBytesWithMap for $target_ty { - fn to_bytes_map(&self) -> Vec { - let sz = ::std::mem::size_of::<$target_ty>(); - let mut res = Vec::::with_capacity(sz); - - let ip: *const $target_ty = self; - let ptr: *const u8 = ip as *const _; - unsafe { - res.set_len(sz); - ::std::ptr::copy(ptr, res.as_mut_ptr(), sz); - } - res - } - } - } -} - -sized_binary_map!(u16); -sized_binary_map!(u32); -sized_binary_map!(u64); - -/// Value that can be serialized from variable-length byte array -pub trait FromRawBytesVariable: Sized { - /// Create value from slice - fn from_bytes_variable(bytes: &[u8]) -> Result; -} - -impl FromRawBytesVariable for T where T: FromRawBytes { - fn from_bytes_variable(bytes: &[u8]) -> Result { - match bytes.len().cmp(&mem::size_of::()) { - Ordering::Less => return Err(FromBytesError::NotLongEnough), - Ordering::Greater => return Err(FromBytesError::TooLong), - Ordering::Equal => () - }; - - T::from_bytes(bytes) - } -} - -impl FromRawBytesVariable for String { - fn from_bytes_variable(bytes: &[u8]) -> Result { - Ok(::std::str::from_utf8(bytes).unwrap().to_owned()) - } -} - -impl FromRawBytesVariable for Vec where T: FromRawBytes { - fn from_bytes_variable(bytes: &[u8]) -> Result { - let size_of_t = mem::size_of::(); - let length_in_chunks = bytes.len() / size_of_t; - - let mut result = Vec::with_capacity(length_in_chunks); - unsafe { result.set_len(length_in_chunks) }; - for i in 0..length_in_chunks { - *result.get_mut(i).unwrap() = try!(T::from_bytes( - &bytes[size_of_t * i..size_of_t * (i+1)])) - } - Ok(result) - } -} - -impl FromRawBytes for (V1, T2) where V1: FromRawBytesVariable, T2: FromRawBytes { - fn from_bytes(bytes: &[u8]) -> Result { - let header = 8usize; - let mut map: (u64, ) = (0,); - - if bytes.len() < header { return Err(FromBytesError::NotLongEnough); } - map.copy_raw(&bytes[0..header]); - - Ok(( - try!(V1::from_bytes_variable(&bytes[header..header + (map.0 as usize)])), - try!(T2::from_bytes(&bytes[header + (map.0 as usize)..bytes.len()])), - )) - } -} - -impl FromRawBytes for (V1, V2, T3) - where V1: FromRawBytesVariable, - V2: FromRawBytesVariable, - T3: FromRawBytes -{ - fn from_bytes(bytes: &[u8]) -> Result { - let header = 16usize; - let mut map: (u64, u64, ) = (0, 0,); - - if bytes.len() < header { return Err(FromBytesError::NotLongEnough); } - map.copy_raw(&bytes[0..header]); - - let map_1 = (header, header + map.0 as usize); - let map_2 = (map_1.1 as usize, map_1.1 as usize + map.1 as usize); - Ok(( - try!(V1::from_bytes_variable(&bytes[map_1.0..map_1.1])), - try!(V2::from_bytes_variable(&bytes[map_2.0..map_2.1])), - try!(T3::from_bytes(&bytes[map_2.1..bytes.len()])), - )) - } -} - -impl<'a, V1, X1, T2> ToBytesWithMap for (X1, &'a T2) where V1: ToBytesWithMap, X1: Deref, T2: ToBytesWithMap { - fn to_bytes_map(&self) -> Vec { - let header = 8usize; - let v1_size = mem::size_of::(); - let mut result = Vec::with_capacity(header + self.0.len() * v1_size + mem::size_of::()); - result.extend(((self.0.len() * v1_size) as u64).to_bytes_map()); - - for i in 0..self.0.len() { - result.extend(self.0[i].to_bytes_map()); - } - result.extend(self.1.to_bytes_map()); - - result - } - -} - -impl<'a, V1, X1, V2, X2, T3> ToBytesWithMap for (X1, X2, &'a T3) - where V1: ToBytesWithMap, X1: Deref, - V2: ToBytesWithMap, X2: Deref, - T3: ToBytesWithMap -{ - fn to_bytes_map(&self) -> Vec { - let header = 16usize; - let v1_size = mem::size_of::(); - let v2_size = mem::size_of::(); - let mut result = Vec::with_capacity( - header + - self.0.len() * v1_size + - self.1.len() * v2_size + - mem::size_of::() - ); - result.extend(((self.0.len() * v1_size) as u64).to_bytes_map()); - result.extend(((self.1.len() * v2_size) as u64).to_bytes_map()); - for i in 0..self.0.len() { - result.extend(self.0[i].to_bytes_map()); - } - for i in 0..self.1.len() { - result.extend(self.1[i].to_bytes_map()); - } - result.extend(self.2.to_bytes_map()); - - result - } -} - -impl FromRawBytesVariable for Vec { - fn from_bytes_variable(bytes: &[u8]) -> Result, FromBytesError> { - Ok(bytes.to_vec()) - } -} - -/// Value that serializes directly to variable-sized byte array and stores map -pub trait ToBytesWithMap { - /// serialize to variable-sized byte array and store map - fn to_bytes_map(&self) -> Vec; -} - -impl ToBytesWithMap for T where T: FixedHash { - fn to_bytes_map(&self) -> Vec { - self.as_slice().to_owned() - } -} - #[test] fn fax_raw() { let mut x = [255u8; 4]; @@ -488,44 +278,3 @@ fn populate_big_types() { h.copy_raw_from(&a); assert_eq!(h, h256_from_hex("ffffffffffffffffffffffffffffffffffffffff000000000000000000000069")); } - -#[test] -fn raw_bytes_from_tuple() { - type Tup = (Vec, u16); - - let tup: (&[u16], u16) = (&[1; 4], 10); - let bytes = vec![ - // map - 8u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, - // four 1u16 - 1u8, 0u8, - 1u8, 0u8, - 1u8, 0u8, - 1u8, 0u8, - // 10u16 - 10u8, 0u8]; - - let (v, x) = Tup::from_bytes(&bytes).unwrap(); - assert_eq!(tup, (&v[..], x)); - let tup_from = (v, x); - - let tup_to = (tup_from.0, &tup_from.1); - let bytes_to = tup_to.to_bytes_map(); - assert_eq!(bytes_to, bytes); -} - -#[test] -fn bytes_map_from_triple() { - let data: (&[u16], &[u32], u64) = (&[2; 6], &[6; 3], 12u64); - let bytes_map = (data.0, data.1, &data.2).to_bytes_map(); - assert_eq!(bytes_map, vec![ - // data map 2 x u64 - 12, 0, 0, 0, 0, 0, 0, 0, - 12, 0, 0, 0, 0, 0, 0, 0, - // vec![2u16; 6] - 2, 0, 2, 0, 2, 0, 2, 0, 2, 0, 2, 0, - // vec![6u32; 3] - 6, 0, 0, 0, 6, 0, 0, 0, 6, 0, 0, 0, - // 12u64 - 12, 0, 0, 0, 0, 0, 0, 0]); -}