IPC JSON RPC (for external interface) (#1009)

* initial

* rpc file

* compiling nano part

* remove from rpc lib so far

* drop & stop improved

* ok(0)
This commit is contained in:
Nikolay Volf 2016-04-28 17:58:18 +03:00 committed by Gav Wood
parent 0260e9322a
commit a86c39f7fa
3 changed files with 176 additions and 2 deletions

View File

@ -7,6 +7,8 @@ license = "GPL-3.0"
[features]
[dependencies]
jsonrpc-core = "2.0"
"ethcore-ipc" = { path = "../rpc" }
nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
log = "0.3"

View File

@ -19,10 +19,13 @@
extern crate ethcore_ipc as ipc;
extern crate nanomsg;
#[macro_use] extern crate log;
extern crate jsonrpc_core;
use jsonrpc_core::IoHandler;
pub use ipc::{WithSocket, IpcInterface, IpcConfig};
use std::sync::*;
use std::sync::atomic::*;
use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut};
use std::ops::Deref;
@ -199,14 +202,149 @@ impl<S> Worker<S> where S: IpcInterface<S> {
}
}
/// Error in handling JSON RPC request
pub enum IoHandlerError {
BadRequest,
HandlerError,
}
/// Worker to handle JSON RPC requests
pub struct IoHandlerWorker {
handler: Arc<IoHandler>,
socket: Socket,
_endpoint: Endpoint,
poll: Vec<PollFd>,
buf: Vec<u8>,
}
/// IPC server for json-rpc handler (single thread)
pub struct IoHandlerServer {
is_stopping: Arc<AtomicBool>,
is_stopped: Arc<AtomicBool>,
handler: Arc<IoHandler>,
socket_addr: String,
}
impl IoHandlerServer {
/// New IPC server for JSON RPC `handler` and ipc socket address `socket_addr`
pub fn new(handler: &Arc<IoHandler>, socket_addr: &str) -> IoHandlerServer {
IoHandlerServer {
handler: handler.clone(),
is_stopping: Arc::new(AtomicBool::new(false)),
is_stopped: Arc::new(AtomicBool::new(true)),
socket_addr: socket_addr.to_owned(),
}
}
/// IPC Server starts (non-blocking, in seprate thread)
pub fn start(&self) -> Result<(), SocketError> {
let mut worker = try!(IoHandlerWorker::new(&self.handler, &self.socket_addr));
self.is_stopping.store(false, Ordering::Relaxed);
let worker_is_stopping = self.is_stopping.clone();
let worker_is_stopped = self.is_stopped.clone();
::std::thread::spawn(move || {
worker_is_stopped.store(false, Ordering::Relaxed);
while !worker_is_stopping.load(Ordering::Relaxed) {
worker.poll()
}
worker_is_stopped.store(true, Ordering::Relaxed);
});
Ok(())
}
/// IPC server stop (func will wait until effective stop)
pub fn stop(&self) {
self.is_stopping.store(true, Ordering::Relaxed);
while !self.is_stopped.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
}
impl Drop for IoHandlerServer {
fn drop(&mut self) {
self.stop()
}
}
impl IoHandlerWorker {
pub fn new(handler: &Arc<IoHandler>, socket_addr: &str) -> Result<IoHandlerWorker, SocketError> {
let mut socket = try!(Socket::new(Protocol::Rep).map_err(|e| {
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
SocketError::RequestLink
}));
let endpoint = try!(socket.bind(socket_addr).map_err(|e| {
warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e);
SocketError::RequestLink
}));
let poll = vec![socket.new_pollfd(PollInOut::In)];
Ok(IoHandlerWorker {
handler: handler.clone(),
socket: socket,
_endpoint: endpoint,
poll: poll,
buf: Vec::with_capacity(1024),
})
}
pub fn poll(&mut self) {
let mut request = PollRequest::new(&mut self.poll[..]);
let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
let fd = request.get_fds()[0]; // guaranteed to exist and be the only one
// because contains only immutable socket field as a member
if !fd.can_read() {
return;
}
unsafe { self.buf.set_len(0); }
match self.socket.nb_read_to_end(&mut self.buf) {
Ok(0) => {
warn!(target: "ipc", "RPC empty message received");
return;
},
Ok(_) => {
let rpc_msg = match String::from_utf8(self.buf.clone()) {
Ok(val) => val,
Err(e) => {
warn!(target: "ipc", "RPC decoding error (utf-8): {:?}", e);
return;
}
};
let response: Option<String> = self.handler.handle_request(&rpc_msg);
if let Some(response_str) = response {
let response_bytes = response_str.into_bytes();
if let Err(e) = self.socket.nb_write(&response_bytes) {
warn!(target: "ipc", "Failed to write response: {:?}", e);
}
}
},
Err(Error::TryAgain) => {
// no data
},
Err(x) => {
warn!(target: "ipc", "Error polling connections {:?}", x);
panic!("IPC RPC fatal error");
},
}
}
}
#[cfg(test)]
mod service_tests {
use super::Worker;
use super::{Worker, IoHandlerServer};
use ipc::*;
use std::io::{Read, Write};
use std::sync::{Arc, RwLock};
use nanomsg::{Socket, Protocol, Endpoint};
use jsonrpc_core;
use jsonrpc_core::{IoHandler, Value, Params, MethodCommand};
struct TestInvoke {
method_num: u16,
@ -246,6 +384,15 @@ mod service_tests {
(socket, endpoint)
}
fn dummy_request(addr: &str, buf: &[u8]) -> Vec<u8> {
let mut socket = Socket::new(Protocol::Req).unwrap();
let _endpoint = socket.connect(addr).unwrap();
socket.write(buf).unwrap();
let mut buf = Vec::new();
socket.read_to_end(&mut buf).unwrap();
buf
}
#[test]
fn can_create_worker() {
let worker = Worker::<DummyService>::new(&Arc::new(DummyService::new()));
@ -299,4 +446,29 @@ mod service_tests {
assert_eq!(0, worker.service.methods_stack.read().unwrap()[0].method_num);
assert_eq!(vec![0u8; 1024*1024-2], worker.service.methods_stack.read().unwrap()[0].params);
}
#[test]
fn test_jsonrpc_handler() {
let url = "ipc:///tmp/parity-test50.ipc";
struct SayHello;
impl MethodCommand for SayHello {
fn execute(&self, _params: Params) -> Result<Value, jsonrpc_core::Error> {
Ok(Value::String("hello".to_string()))
}
}
let io = Arc::new(IoHandler::new());
io.add_method("say_hello", SayHello);
let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
let server = IoHandlerServer::new(&io, url);
server.start().unwrap();
assert_eq!(String::from_utf8(dummy_request(url, request.as_bytes())).unwrap(), response.to_string());
server.stop();
}
}

View File

@ -57,7 +57,7 @@ impl RpcServer {
self.handler.add_delegate(delegate);
}
/// Start server asynchronously and returns result with `Server` handle on success or an error.
/// Start http server asynchronously and returns result with `Server` handle on success or an error.
pub fn start_http(&self, addr: &SocketAddr, cors_domain: Option<String>) -> Result<Server, RpcServerError> {
let cors_domain = cors_domain.to_owned();
Server::start(addr, self.handler.clone(), cors_domain.map(jsonrpc_http_server::AccessControlAllowOrigin::Value))