Add Ws Json rpc client and command line utils

This commit is contained in:
Kristoffer Ström
2016-09-20 12:19:07 +02:00
committed by arkpar
parent 2226324495
commit 4e3f8bab10
14 changed files with 796 additions and 7 deletions

27
rpc_client/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
authors = ["Ethcore <admin@ethcore.io>"]
description = "Parity Rpc Client"
homepage = "http://ethcore.io"
license = "GPL-3.0"
name = "parity-rpc-client"
version = "1.4.0"
[dependencies]
futures = "0.1"
lazy_static = "0.2.1"
matches = "0.1.2"
rand = "0.3.14"
serde = "0.8"
serde_json = "0.8"
tempdir = "0.3.5"
url = "1.2.0"
ws = "0.5.3"
[dependencies.ethcore-rpc]
path = "../rpc"
[dependencies.ethcore-signer]
path = "../signer"
[dependencies.ethcore-util]
path = "../util"

332
rpc_client/src/client.rs Normal file
View File

@@ -0,0 +1,332 @@
use std::fmt::{Debug, Formatter, Error as FmtError};
use std::io::{BufReader, BufRead};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::BTreeMap;
use std::thread;
use std::mem;
use std::time;
use std::path::PathBuf;
use util::Hashable;
use url::Url;
use std::fs::File;
use ws::{connect,
Request,
Handler,
Sender,
Handshake,
Error as WsError,
ErrorKind as WsErrorKind,
Message,
Result as WsResult};
use serde::Serialize;
use serde::Deserialize;
use serde::ser::Serializer;
use serde_json::{from_str,
to_string,
from_value,
Value as JsonValue,
Error as JsonError};
use futures::{BoxFuture, Canceled, Complete, Future, oneshot, done};
/// The actual websocket connection handler, passed into the
/// event loop of ws-rs
struct RpcHandler {
pending: Pending,
// Option is used here as
// temporary storage until
// connection is setup
// and the values are moved into
// the new `Rpc`
complete: Option<Complete<Result<Rpc, RpcError>>>,
auth_code: String,
out: Option<Sender>,
}
impl RpcHandler {
fn new(out: Sender,
auth_code: String,
complete: Complete<Result<Rpc, RpcError>>)
-> Self {
RpcHandler {
out: Some(out),
auth_code: auth_code,
pending: Pending::new(),
complete: Some(complete),
}
}
}
impl Handler for RpcHandler {
fn build_request(&mut self, url: &Url) -> WsResult<Request> {
match Request::from_url(url) {
Ok(mut r) => {
let timestamp = time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let hashed = format!("{}:{}", self.auth_code, timestamp).sha3();
let proto = format!("{:?}_{}", hashed, timestamp);
r.add_protocol(&proto);
Ok(r)
},
Err(e) => Err(WsError::new(WsErrorKind::Internal, format!("{}", e))),
}
}
fn on_error(&mut self, err: WsError) {
match mem::replace(&mut self.complete, None) {
Some(c) => c.complete(Err(RpcError::WsError(err))),
None => println!("warning: unexpected error"),
}
}
fn on_open(&mut self, _: Handshake) -> WsResult<()> {
match mem::replace(&mut self.complete, None) {
Some(c) => c.complete(Ok(Rpc {
out: mem::replace(&mut self.out, None).unwrap(),
auth_code: self.auth_code.clone(),
counter: AtomicUsize::new(0),
pending: self.pending.clone(),
})),
// Should not be reachable
None => (),
}
Ok(())
}
fn on_message(&mut self, msg: Message) -> WsResult<()> {
match parse_response(&msg.to_string()) {
(Some(id), response) => {
match self.pending.remove(id) {
Some(c) => c.complete(response),
None => println!("warning: unexpected id: {}", id),
}
}
(None, response) => println!("warning: error: {:?}, {}", response, msg.to_string()),
}
Ok(())
}
}
/// Keeping track of issued requests to be matched up with responses
#[derive(Clone)]
struct Pending(Arc<Mutex<BTreeMap<usize, Complete<Result<JsonValue, RpcError>>>>>);
impl Pending {
fn new() -> Self {
Pending(Arc::new(Mutex::new(BTreeMap::new())))
}
fn insert(&mut self, k: usize, v: Complete<Result<JsonValue, RpcError>>) {
self.0.lock().unwrap().insert(k, v);
}
fn remove(&mut self, k: usize) -> Option<Complete<Result<JsonValue, RpcError>>> {
self.0.lock().unwrap().remove(&k)
}
}
fn get_authcode(path: &PathBuf) -> Result<String, RpcError> {
match File::open(path) {
Ok(fd) => match BufReader::new(fd).lines().next() {
Some(Ok(code)) => Ok(code),
_ => Err(RpcError::NoAuthCode),
},
Err(_) => Err(RpcError::NoAuthCode)
}
}
/// The handle to the connection
pub struct Rpc {
out: Sender,
counter: AtomicUsize,
pending: Pending,
auth_code: String,
}
impl Rpc {
/// Blocking, returns a new initialized connection or RpcError
pub fn new(url: &str, authpath: &PathBuf) -> Result<Self, RpcError> {
let rpc = try!(Self::connect(url, authpath).map(|rpc| rpc).wait());
rpc
}
/// Non-blocking, returns a future
pub fn connect(url: &str, authpath: &PathBuf)
-> BoxFuture<Result<Self, RpcError>, Canceled> {
let (c, p) = oneshot::<Result<Self, RpcError>>();
match get_authcode(authpath) {
Err(e) => return done(Ok(Err(e))).boxed(),
Ok(code) => {
let url = String::from(url);
thread::spawn(move || {
// mem:replace Option hack to move `c` out
// of the FnMut closure
let mut swap = Some(c);
match connect(url, |out| {
let c = mem::replace(&mut swap, None).unwrap();
RpcHandler::new(out, code.clone(), c)
}) {
Err(err) => {
let c = mem::replace(&mut swap, None).unwrap();
c.complete(Err(RpcError::WsError(err)));
},
// c will complete on the `on_open` event in the Handler
_ => ()
}
});
p.boxed()
}
}
}
/// Non-blocking, returns a future of the request response
pub fn request<T>(&mut self, method: &'static str, params: Vec<JsonValue>)
-> BoxFuture<Result<T, RpcError>, Canceled>
where T: Deserialize + Send + Sized {
let (c, p) = oneshot::<Result<JsonValue, RpcError>>();
let id = self.counter.fetch_add(1, Ordering::Relaxed);
self.pending.insert(id, c);
let serialized = to_string(&RpcRequest::new(id, method, params)).unwrap();
let _ = self.out.send(serialized);
p.map(|result| {
match result {
Ok(json) => {
let t: T = try!(from_value(json));
Ok(t)
},
Err(err) => Err(err)
}
}).boxed()
}
}
struct RpcRequest {
method: &'static str,
params: Vec<JsonValue>,
id: usize,
}
impl RpcRequest {
fn new(id: usize, method: &'static str, params: Vec<JsonValue>) -> Self {
RpcRequest {
method: method,
id: id,
params: params,
}
}
}
impl Serialize for RpcRequest {
fn serialize<S>(&self, s: &mut S)
-> Result<(), S::Error>
where S: Serializer {
let mut state = try!(s.serialize_struct("RpcRequest" , 3));
try!(s.serialize_struct_elt(&mut state ,"jsonrpc", "2.0"));
try!(s.serialize_struct_elt(&mut state ,"id" , &self.id));
try!(s.serialize_struct_elt(&mut state ,"method" , &self.method));
try!(s.serialize_struct_elt(&mut state ,"params" , &self.params));
s.serialize_struct_end(state)
}
}
pub enum RpcError {
WrongVersion(String),
ParseError(JsonError),
MalformedResponse(String),
Remote(String),
WsError(WsError),
Canceled(Canceled),
UnexpectedId,
NoAuthCode,
}
impl Debug for RpcError {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match self {
&RpcError::WrongVersion(ref s)
=> write!(f, "Expected version 2.0, got {}", s),
&RpcError::ParseError(ref err)
=> write!(f, "ParseError: {}", err),
&RpcError::MalformedResponse(ref s)
=> write!(f, "Malformed response: {}", s),
&RpcError::Remote(ref s)
=> write!(f, "Remote error: {}", s),
&RpcError::WsError(ref s)
=> write!(f, "Websocket error: {}", s),
&RpcError::Canceled(ref s)
=> write!(f, "Futures error: {:?}", s),
&RpcError::UnexpectedId
=> write!(f, "Unexpected response id"),
&RpcError::NoAuthCode
=> write!(f, "No authcodes available"),
}
}
}
impl From<JsonError> for RpcError {
fn from(err: JsonError) -> RpcError {
RpcError::ParseError(err)
}
}
impl From<WsError> for RpcError {
fn from(err: WsError) -> RpcError {
RpcError::WsError(err)
}
}
impl From<Canceled> for RpcError {
fn from(err: Canceled) -> RpcError {
RpcError::Canceled(err)
}
}
fn parse_response(s: &str) -> (Option<usize>, Result<JsonValue, RpcError>) {
let mut json: JsonValue = match from_str(s) {
Err(e) => return (None, Err(RpcError::ParseError(e))),
Ok(json) => json,
};
let obj = match json.as_object_mut() {
Some(o) => o,
None => return
(None,
Err(RpcError::MalformedResponse("Not a JSON object".to_string()))),
};
let id;
match obj.get("id") {
Some(&JsonValue::U64(u)) => {
id = u as usize;
},
_ => return (None,
Err(RpcError::MalformedResponse("Missing id".to_string()))),
}
match obj.get("jsonrpc") {
Some(&JsonValue::String(ref s)) => {
if *s != "2.0".to_string() {
return (Some(id),
Err(RpcError::WrongVersion(s.clone())))
}
},
_ => return
(Some(id),
Err(RpcError::MalformedResponse("Not a jsonrpc object".to_string()))),
}
match obj.get("error") {
Some(err) => return
(Some(id),
Err(RpcError::Remote(format!("{}", err)))),
None => (),
};
match obj.remove("result") {
None => (Some(id),
Err(RpcError::MalformedResponse("No result".to_string()))),
Some(result) => (Some(id),
Ok(result)),
}
}

74
rpc_client/src/lib.rs Normal file
View File

@@ -0,0 +1,74 @@
pub mod client;
pub mod signer;
mod mock;
extern crate ws;
extern crate ethcore_signer;
extern crate url;
extern crate futures;
extern crate ethcore_util as util;
extern crate ethcore_rpc as rpc;
extern crate serde;
extern crate serde_json;
extern crate rand;
extern crate tempdir;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate matches;
mod test {
use futures::Future;
use url::Url;
use std::path::PathBuf;
use client::{Rpc, RpcError};
use mock;
#[test]
fn test_connection_refused() {
let (srv, port, tmpdir, _) = mock::serve();
let mut path = PathBuf::from(tmpdir.path());
path.push("authcodes");
let connect = Rpc::connect(&format!("ws://127.0.0.1:{}", port - 1), &path);
connect.map(|conn| {
assert!(matches!(&conn, &Err(RpcError::WsError(_))));
}).wait();
drop(srv);
}
#[test]
fn test_authcode_fail() {
let (srv, port, _, _) = mock::serve();
let path = PathBuf::from("nonexist");
let connect = Rpc::connect(&format!("ws://127.0.0.1:{}", port), &path);
connect.map(|conn| {
assert!(matches!(&conn, &Err(RpcError::NoAuthCode)));
}).wait();
drop(srv);
}
#[test]
fn test_authcode_correct() {
let (srv, port, tmpdir, _) = mock::serve();
let mut path = PathBuf::from(tmpdir.path());
path.push("authcodes");
let connect = Rpc::connect(&format!("ws://127.0.0.1:{}", port), &path);
connect.map(|conn| {
assert!(conn.is_ok())
}).wait();
drop(srv);
}
}

30
rpc_client/src/mock.rs Normal file
View File

@@ -0,0 +1,30 @@
use ethcore_signer::ServerBuilder;
use ethcore_signer::Server;
use rpc::ConfirmationsQueue;
use std::sync::Arc;
use std::time::{Duration};
use std::thread;
use rand;
use tempdir::TempDir;
use std::path::PathBuf;
use std::fs::{File, create_dir_all};
use std::io::Write;
// mock server
pub fn serve() -> (Server, usize, TempDir, Arc<ConfirmationsQueue>) {
let queue = Arc::new(ConfirmationsQueue::default());
let dir = TempDir::new("auth").unwrap();
let mut authpath = PathBuf::from(dir.path());
create_dir_all(&authpath).unwrap();
authpath.push("authcodes");
let mut authfile = File::create(&authpath).unwrap();
authfile.write_all(b"zzzRo0IzGi04mzzz\n").unwrap();
let builder = ServerBuilder::new(queue.clone(), authpath);
let port = 35000 + rand::random::<usize>() % 10000;
let res = builder.start(format!("127.0.0.1:{}", port).parse().unwrap()).unwrap();
thread::sleep(Duration::from_millis(25));
(res, port, dir, queue)
}

47
rpc_client/src/signer.rs Normal file
View File

@@ -0,0 +1,47 @@
use client::{Rpc, RpcError};
use rpc::v1::types::{ConfirmationRequest,
ConfirmationPayload,
TransactionModification,
U256};
use serde_json::{Value as JsonValue, to_value};
use std::path::PathBuf;
use futures::{BoxFuture, Canceled};
pub struct SignerRpc {
rpc: Rpc,
}
impl SignerRpc {
pub fn new(url: &str, authfile: &PathBuf) -> Result<Self, RpcError> {
match Rpc::new(&url, authfile) {
Ok(rpc) => Ok(SignerRpc { rpc: rpc }),
Err(e) => Err(e),
}
}
pub fn requests_to_confirm(&mut self) ->
BoxFuture<Result<Vec<ConfirmationRequest>, RpcError>, Canceled>
{
self.rpc.request::<Vec<ConfirmationRequest>>
("personal_requestsToConfirm", vec![])
}
pub fn confirm_request(&mut self,
id: U256,
new_gas_price: Option<U256>,
pwd: &str) ->
BoxFuture<Result<U256, RpcError>, Canceled>
{
self.rpc.request::<U256>("personal_confirmRequest", vec![
to_value(&format!("{:#x}", id)),
to_value(&TransactionModification { gas_price: new_gas_price }),
to_value(&pwd),
])
}
pub fn reject_request(&mut self, id: U256) ->
BoxFuture<Result<bool, RpcError>, Canceled>
{
self.rpc.request::<bool>("personal_rejectRequest", vec![
JsonValue::String(format!("{:#x}", id))
])
}
}