Switch to jsonrpc request/response types

This commit is contained in:
Kristoffer Ström 2016-10-17 14:59:41 +02:00 committed by arkpar
parent 91b7780eb5
commit f700aa39fe
3 changed files with 68 additions and 112 deletions

View File

@ -10,6 +10,7 @@ version = "1.4.0"
futures = "0.1" futures = "0.1"
jsonrpc-core = "3.0.2" jsonrpc-core = "3.0.2"
lazy_static = "0.2.1" lazy_static = "0.2.1"
log = "0.3.6"
matches = "0.1.2" matches = "0.1.2"
rand = "0.3.14" rand = "0.3.14"
serde = "0.8" serde = "0.8"
@ -17,6 +18,12 @@ serde_json = "0.8"
tempdir = "0.3.5" tempdir = "0.3.5"
url = "1.2.0" url = "1.2.0"
ws = "0.5.3" ws = "0.5.3"
ethcore-rpc = { path = "../rpc" }
ethcore-signer = { path = "../signer" } [dependencies.ethcore-rpc]
ethcore-util = { path = "../util" } path = "../rpc"
[dependencies.ethcore-signer]
path = "../signer"
[dependencies.ethcore-util]
path = "../util"

View File

@ -1,3 +1,5 @@
extern crate jsonrpc_core;
use std::fmt::{Debug, Formatter, Error as FmtError}; use std::fmt::{Debug, Formatter, Error as FmtError};
use std::io::{BufReader, BufRead}; use std::io::{BufReader, BufRead};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -21,26 +23,23 @@ use ws::{self,
Message, Message,
Result as WsResult}; Result as WsResult};
use serde::Serialize;
use serde::Deserialize; use serde::Deserialize;
use serde::ser::Serializer;
use serde_json::{self as json, use serde_json::{self as json,
Value as JsonValue, Value as JsonValue,
Error as JsonError}; Error as JsonError};
//use jsonrpc_core::
use futures::{BoxFuture, Canceled, Complete, Future, oneshot, done}; use futures::{BoxFuture, Canceled, Complete, Future, oneshot, done};
use jsonrpc_core::{Id, Version, Params, Error as JsonRpcError};
use jsonrpc_core::request::MethodCall;
use jsonrpc_core::response::{SyncOutput, Success, Failure};
/// The actual websocket connection handler, passed into the /// The actual websocket connection handler, passed into the
/// event loop of ws-rs /// event loop of ws-rs
struct RpcHandler { struct RpcHandler {
pending: Pending, pending: Pending,
// Option is used here as // Option is used here as temporary storage until connection
// temporary storage until // is setup and the values are moved into the new `Rpc`
// connection is setup
// and the values are moved into
// the new `Rpc`
complete: Option<Complete<Result<Rpc, RpcError>>>, complete: Option<Complete<Result<Rpc, RpcError>>>,
auth_code: String, auth_code: String,
out: Option<Sender>, out: Option<Sender>,
@ -98,14 +97,33 @@ impl Handler for RpcHandler {
} }
} }
fn on_message(&mut self, msg: Message) -> WsResult<()> { fn on_message(&mut self, msg: Message) -> WsResult<()> {
match parse_response(&msg.to_string()) { let ret: Result<JsonValue, JsonRpcError>;
(Some(id), response) => { let response_id;
match self.pending.remove(id) { let string = &msg.to_string();
Some(c) => c.complete(response), match json::from_str::<SyncOutput>(&string) {
None => println!("warning: unexpected id: {}", id), Ok(SyncOutput::Success(Success { result, id: Id::Num(id), .. })) => {
ret = Ok(result);
response_id = id as usize;
}
Ok(SyncOutput::Failure(Failure { error, id: Id::Num(id), .. })) => {
ret = Err(error);
response_id = id as usize;
}
Err(e) => {
warn!(target: "rpc-client", "recieved invalid message: {}\n {:?}", string, e);
return Ok(())
},
_ => {
warn!(target: "rpc-client", "recieved invalid message: {}", string);
return Ok(())
} }
} }
(None, response) => println!("warning: error: {:?}, {}", response, msg.to_string()),
match self.pending.remove(response_id) {
Some(c) => c.complete(ret.map_err(|err| {
RpcError::JsonRpc(err)
})),
None => warn!(target: "rpc-client", "warning: unexpected id: {}", response_id),
} }
Ok(()) Ok(())
} }
@ -120,10 +138,10 @@ impl Pending {
Pending(Arc::new(Mutex::new(BTreeMap::new()))) Pending(Arc::new(Mutex::new(BTreeMap::new())))
} }
fn insert(&mut self, k: usize, v: Complete<Result<JsonValue, RpcError>>) { fn insert(&mut self, k: usize, v: Complete<Result<JsonValue, RpcError>>) {
self.0.lock().unwrap().insert(k, v); self.0.lock().expect("no panics in mutex guard").insert(k, v);
} }
fn remove(&mut self, k: usize) -> Option<Complete<Result<JsonValue, RpcError>>> { fn remove(&mut self, k: usize) -> Option<Complete<Result<JsonValue, RpcError>>> {
self.0.lock().unwrap().remove(&k) self.0.lock().expect("no panics in mutex guard").remove(&k)
} }
} }
@ -194,7 +212,14 @@ impl Rpc {
let id = self.counter.fetch_add(1, Ordering::Relaxed); let id = self.counter.fetch_add(1, Ordering::Relaxed);
self.pending.insert(id, c); self.pending.insert(id, c);
let serialized = json::to_string(&RpcRequest::new(id, method, params)).unwrap(); let request = MethodCall {
jsonrpc: Version::V2,
method: method.to_owned(),
params: Some(Params::Array(params)),
id: Id::Num(id as u64),
};
let serialized = json::to_string(&request).expect("request is serializable");
let _ = self.out.send(serialized); let _ = self.out.send(serialized);
p.map(|result| { p.map(|result| {
@ -209,41 +234,11 @@ impl Rpc {
} }
} }
struct RpcRequest {
method: &'static str,
params: Vec<JsonValue>,
id: usize,
}
impl RpcRequest {
fn new(id: usize, method: &'static str, params: Vec<JsonValue>) -> Self {
RpcRequest {
method: method,
id: id,
params: params,
}
}
}
impl Serialize for RpcRequest {
fn serialize<S>(&self, s: &mut S)
-> Result<(), S::Error>
where S: Serializer {
let mut state = try!(s.serialize_struct("RpcRequest" , 3));
try!(s.serialize_struct_elt(&mut state ,"jsonrpc", "2.0"));
try!(s.serialize_struct_elt(&mut state ,"id" , &self.id));
try!(s.serialize_struct_elt(&mut state ,"method" , &self.method));
try!(s.serialize_struct_elt(&mut state ,"params" , &self.params));
s.serialize_struct_end(state)
}
}
pub enum RpcError { pub enum RpcError {
WrongVersion(String), WrongVersion(String),
ParseError(JsonError), ParseError(JsonError),
MalformedResponse(String), MalformedResponse(String),
Remote(String), JsonRpc(JsonRpcError),
WsError(WsError), WsError(WsError),
Canceled(Canceled), Canceled(Canceled),
UnexpectedId, UnexpectedId,
@ -252,22 +247,22 @@ pub enum RpcError {
impl Debug for RpcError { impl Debug for RpcError {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match self { match *self {
&RpcError::WrongVersion(ref s) RpcError::WrongVersion(ref s)
=> write!(f, "Expected version 2.0, got {}", s), => write!(f, "Expected version 2.0, got {}", s),
&RpcError::ParseError(ref err) RpcError::ParseError(ref err)
=> write!(f, "ParseError: {}", err), => write!(f, "ParseError: {}", err),
&RpcError::MalformedResponse(ref s) RpcError::MalformedResponse(ref s)
=> write!(f, "Malformed response: {}", s), => write!(f, "Malformed response: {}", s),
&RpcError::Remote(ref s) RpcError::JsonRpc(ref json)
=> write!(f, "Remote error: {}", s), => write!(f, "JsonRpc error: {:?}", json),
&RpcError::WsError(ref s) RpcError::WsError(ref s)
=> write!(f, "Websocket error: {}", s), => write!(f, "Websocket error: {}", s),
&RpcError::Canceled(ref s) RpcError::Canceled(ref s)
=> write!(f, "Futures error: {:?}", s), => write!(f, "Futures error: {:?}", s),
&RpcError::UnexpectedId RpcError::UnexpectedId
=> write!(f, "Unexpected response id"), => write!(f, "Unexpected response id"),
&RpcError::NoAuthCode RpcError::NoAuthCode
=> write!(f, "No authcodes available"), => write!(f, "No authcodes available"),
} }
} }
@ -290,52 +285,3 @@ impl From<Canceled> for RpcError {
RpcError::Canceled(err) RpcError::Canceled(err)
} }
} }
fn parse_response(s: &str) -> (Option<usize>, Result<JsonValue, RpcError>) {
let mut json: JsonValue = match json::from_str(s) {
Err(e) => return (None, Err(RpcError::ParseError(e))),
Ok(json) => json,
};
let obj = match json.as_object_mut() {
Some(o) => o,
None => return
(None,
Err(RpcError::MalformedResponse("Not a JSON object".to_string()))),
};
let id;
match obj.get("id") {
Some(&JsonValue::U64(u)) => {
id = u as usize;
},
_ => return (None,
Err(RpcError::MalformedResponse("Missing id".to_string()))),
}
match obj.get("jsonrpc") {
Some(&JsonValue::String(ref s)) => {
if *s != "2.0".to_string() {
return (Some(id),
Err(RpcError::WrongVersion(s.clone())))
}
},
_ => return
(Some(id),
Err(RpcError::MalformedResponse("Not a jsonrpc object".to_string()))),
}
match obj.get("error") {
Some(err) => return
(Some(id),
Err(RpcError::Remote(format!("{}", err)))),
None => (),
};
match obj.remove("result") {
None => (Some(id),
Err(RpcError::MalformedResponse("No result".to_string()))),
Some(result) => (Some(id),
Ok(result)),
}
}

View File

@ -19,6 +19,9 @@ extern crate lazy_static;
#[macro_use] #[macro_use]
extern crate matches; extern crate matches;
#[macro_use]
extern crate log;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::Future; use futures::Future;