diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml index e941b1afc..87387f659 100644 --- a/ipc/nano/Cargo.toml +++ b/ipc/nano/Cargo.toml @@ -7,6 +7,8 @@ license = "GPL-3.0" [features] [dependencies] +jsonrpc-core = "2.0" "ethcore-ipc" = { path = "../rpc" } nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" } log = "0.3" + diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 624de776b..fdcf1f1f2 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -19,10 +19,13 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; +extern crate jsonrpc_core; +use jsonrpc_core::IoHandler; pub use ipc::{WithSocket, IpcInterface, IpcConfig}; use std::sync::*; +use std::sync::atomic::*; use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut}; use std::ops::Deref; @@ -199,14 +202,149 @@ impl Worker where S: IpcInterface { } } +/// Error in handling JSON RPC request +pub enum IoHandlerError { + BadRequest, + HandlerError, +} + +/// Worker to handle JSON RPC requests +pub struct IoHandlerWorker { + handler: Arc, + socket: Socket, + _endpoint: Endpoint, + poll: Vec, + buf: Vec, +} + +/// IPC server for json-rpc handler (single thread) +pub struct IoHandlerServer { + is_stopping: Arc, + is_stopped: Arc, + handler: Arc, + socket_addr: String, +} + +impl IoHandlerServer { + /// New IPC server for JSON RPC `handler` and ipc socket address `socket_addr` + pub fn new(handler: &Arc, socket_addr: &str) -> IoHandlerServer { + IoHandlerServer { + handler: handler.clone(), + is_stopping: Arc::new(AtomicBool::new(false)), + is_stopped: Arc::new(AtomicBool::new(true)), + socket_addr: socket_addr.to_owned(), + } + } + + /// IPC Server starts (non-blocking, in seprate thread) + pub fn start(&self) -> Result<(), SocketError> { + let mut worker = try!(IoHandlerWorker::new(&self.handler, &self.socket_addr)); + self.is_stopping.store(false, Ordering::Relaxed); + let worker_is_stopping = self.is_stopping.clone(); + let worker_is_stopped = self.is_stopped.clone(); + + ::std::thread::spawn(move || { + worker_is_stopped.store(false, Ordering::Relaxed); + while !worker_is_stopping.load(Ordering::Relaxed) { + worker.poll() + } + worker_is_stopped.store(true, Ordering::Relaxed); + }); + + Ok(()) + } + + /// IPC server stop (func will wait until effective stop) + pub fn stop(&self) { + self.is_stopping.store(true, Ordering::Relaxed); + while !self.is_stopped.load(Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_millis(50)); + } + } +} + +impl Drop for IoHandlerServer { + fn drop(&mut self) { + self.stop() + } +} + +impl IoHandlerWorker { + pub fn new(handler: &Arc, socket_addr: &str) -> Result { + let mut socket = try!(Socket::new(Protocol::Rep).map_err(|e| { + warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); + SocketError::RequestLink + })); + + let endpoint = try!(socket.bind(socket_addr).map_err(|e| { + warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); + SocketError::RequestLink + })); + + let poll = vec![socket.new_pollfd(PollInOut::In)]; + + Ok(IoHandlerWorker { + handler: handler.clone(), + socket: socket, + _endpoint: endpoint, + poll: poll, + buf: Vec::with_capacity(1024), + }) + } + + pub fn poll(&mut self) { + let mut request = PollRequest::new(&mut self.poll[..]); + let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); + let fd = request.get_fds()[0]; // guaranteed to exist and be the only one + // because contains only immutable socket field as a member + if !fd.can_read() { + return; + } + + unsafe { self.buf.set_len(0); } + match self.socket.nb_read_to_end(&mut self.buf) { + Ok(0) => { + warn!(target: "ipc", "RPC empty message received"); + return; + }, + Ok(_) => { + let rpc_msg = match String::from_utf8(self.buf.clone()) { + Ok(val) => val, + Err(e) => { + warn!(target: "ipc", "RPC decoding error (utf-8): {:?}", e); + return; + } + }; + let response: Option = self.handler.handle_request(&rpc_msg); + if let Some(response_str) = response { + let response_bytes = response_str.into_bytes(); + if let Err(e) = self.socket.nb_write(&response_bytes) { + warn!(target: "ipc", "Failed to write response: {:?}", e); + } + } + }, + Err(Error::TryAgain) => { + // no data + }, + Err(x) => { + warn!(target: "ipc", "Error polling connections {:?}", x); + panic!("IPC RPC fatal error"); + }, + } + } + +} + #[cfg(test)] mod service_tests { - use super::Worker; + use super::{Worker, IoHandlerServer}; use ipc::*; use std::io::{Read, Write}; use std::sync::{Arc, RwLock}; use nanomsg::{Socket, Protocol, Endpoint}; + use jsonrpc_core; + use jsonrpc_core::{IoHandler, Value, Params, MethodCommand}; struct TestInvoke { method_num: u16, @@ -246,6 +384,15 @@ mod service_tests { (socket, endpoint) } + fn dummy_request(addr: &str, buf: &[u8]) -> Vec { + let mut socket = Socket::new(Protocol::Req).unwrap(); + let _endpoint = socket.connect(addr).unwrap(); + socket.write(buf).unwrap(); + let mut buf = Vec::new(); + socket.read_to_end(&mut buf).unwrap(); + buf + } + #[test] fn can_create_worker() { let worker = Worker::::new(&Arc::new(DummyService::new())); @@ -299,4 +446,29 @@ mod service_tests { assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num); assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params); } + + #[test] + fn test_jsonrpc_handler() { + let url = "ipc:///tmp/parity-test50.ipc"; + + struct SayHello; + impl MethodCommand for SayHello { + fn execute(&self, _params: Params) -> Result { + Ok(Value::String("hello".to_string())) + } + } + + let io = Arc::new(IoHandler::new()); + io.add_method("say_hello", SayHello); + + let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#; + + let server = IoHandlerServer::new(&io, url); + server.start().unwrap(); + + assert_eq!(String::from_utf8(dummy_request(url, request.as_bytes())).unwrap(), response.to_string()); + + server.stop(); + } } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index f79cbe828..d9f6af65c 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -57,7 +57,7 @@ impl RpcServer { self.handler.add_delegate(delegate); } - /// Start server asynchronously and returns result with `Server` handle on success or an error. + /// Start http server asynchronously and returns result with `Server` handle on success or an error. pub fn start_http(&self, addr: &SocketAddr, cors_domain: Option) -> Result { let cors_domain = cors_domain.to_owned(); Server::start(addr, self.handler.clone(), cors_domain.map(jsonrpc_http_server::AccessControlAllowOrigin::Value))