2020-09-22 14:53:52 +02:00
|
|
|
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
|
|
|
|
// This file is part of OpenEthereum.
|
2018-06-04 10:19:50 +02:00
|
|
|
|
2020-09-22 14:53:52 +02:00
|
|
|
// OpenEthereum is free software: you can redistribute it and/or modify
|
2018-06-04 10:19:50 +02:00
|
|
|
// it under the terms of the GNU General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
|
2020-09-22 14:53:52 +02:00
|
|
|
// OpenEthereum is distributed in the hope that it will be useful,
|
2018-06-04 10:19:50 +02:00
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU General Public License for more details.
|
|
|
|
|
|
|
|
// You should have received a copy of the GNU General Public License
|
2020-09-22 14:53:52 +02:00
|
|
|
// along with OpenEthereum. If not, see <http://www.gnu.org/licenses/>.
|
2018-06-04 10:19:50 +02:00
|
|
|
|
2020-08-05 06:08:03 +02:00
|
|
|
use std::{
|
|
|
|
collections::BTreeMap,
|
|
|
|
fmt::{Debug, Error as FmtError, Formatter},
|
|
|
|
io::{BufRead, BufReader},
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicUsize, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
thread, time,
|
|
|
|
};
|
2016-09-20 12:19:07 +02:00
|
|
|
|
2017-08-31 11:35:41 +02:00
|
|
|
use hash::keccak;
|
2017-09-02 20:09:13 +02:00
|
|
|
use parking_lot::Mutex;
|
2020-08-05 06:08:03 +02:00
|
|
|
use std::{fs::File, path::PathBuf};
|
2016-09-20 12:19:07 +02:00
|
|
|
use url::Url;
|
|
|
|
|
2017-05-24 12:24:07 +02:00
|
|
|
use ws::ws::{
|
2020-08-05 06:08:03 +02:00
|
|
|
self, Error as WsError, ErrorKind as WsErrorKind, Handler, Handshake, Message, Request,
|
|
|
|
Result as WsResult, Sender,
|
2016-11-02 17:14:05 +01:00
|
|
|
};
|
2016-09-20 12:19:07 +02:00
|
|
|
|
2017-07-06 11:36:15 +02:00
|
|
|
use serde::de::DeserializeOwned;
|
2020-08-05 06:08:03 +02:00
|
|
|
use serde_json::{self as json, Error as JsonError, Value as JsonValue};
|
2016-09-20 12:19:07 +02:00
|
|
|
|
2020-08-05 06:08:03 +02:00
|
|
|
use futures::{done, oneshot, Canceled, Complete, Future};
|
2016-09-20 12:19:07 +02:00
|
|
|
|
2020-08-05 06:08:03 +02:00
|
|
|
use jsonrpc_core::{
|
|
|
|
request::MethodCall,
|
|
|
|
response::{Failure, Output, Success},
|
|
|
|
Error as JsonRpcError, Id, Params, Version,
|
|
|
|
};
|
2016-10-17 14:59:41 +02:00
|
|
|
|
2017-11-14 11:38:17 +01:00
|
|
|
use BoxFuture;
|
|
|
|
|
2016-09-20 12:19:07 +02:00
|
|
|
/// The actual websocket connection handler, passed into the
|
|
|
|
/// event loop of ws-rs
|
|
|
|
struct RpcHandler {
|
2020-08-05 06:08:03 +02:00
|
|
|
pending: Pending,
|
|
|
|
// Option is used here as temporary storage until connection
|
|
|
|
// is setup and the values are moved into the new `Rpc`
|
|
|
|
complete: Option<Complete<Result<Rpc, RpcError>>>,
|
|
|
|
auth_code: String,
|
|
|
|
out: Option<Sender>,
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl RpcHandler {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn new(out: Sender, auth_code: String, complete: Complete<Result<Rpc, RpcError>>) -> Self {
|
|
|
|
RpcHandler {
|
|
|
|
out: Some(out),
|
|
|
|
auth_code: auth_code,
|
|
|
|
pending: Pending::new(),
|
|
|
|
complete: Some(complete),
|
|
|
|
}
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Handler for RpcHandler {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn build_request(&mut self, url: &Url) -> WsResult<Request> {
|
|
|
|
match Request::from_url(url) {
|
|
|
|
Ok(mut r) => {
|
|
|
|
let timestamp = time::UNIX_EPOCH
|
|
|
|
.elapsed()
|
|
|
|
.map_err(|err| WsError::new(WsErrorKind::Internal, format!("{}", err)))?;
|
|
|
|
let secs = timestamp.as_secs();
|
|
|
|
let hashed = keccak(format!("{}:{}", self.auth_code, secs));
|
|
|
|
let proto = format!("{:x}_{}", hashed, secs);
|
|
|
|
r.add_protocol(&proto);
|
|
|
|
Ok(r)
|
|
|
|
}
|
|
|
|
Err(e) => Err(WsError::new(WsErrorKind::Internal, format!("{}", e))),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn on_error(&mut self, err: WsError) {
|
|
|
|
match self.complete.take() {
|
|
|
|
Some(c) => match c.send(Err(RpcError::WsError(err))) {
|
|
|
|
Ok(_) => {}
|
|
|
|
Err(_) => warn!(target: "rpc-client", "Unable to notify about error."),
|
|
|
|
},
|
|
|
|
None => warn!(target: "rpc-client", "unexpected error: {}", err),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn on_open(&mut self, _: Handshake) -> WsResult<()> {
|
|
|
|
match (self.complete.take(), self.out.take()) {
|
|
|
|
(Some(c), Some(out)) => {
|
|
|
|
let res = c.send(Ok(Rpc {
|
|
|
|
out: out,
|
|
|
|
counter: AtomicUsize::new(0),
|
|
|
|
pending: self.pending.clone(),
|
|
|
|
}));
|
|
|
|
if let Err(_) = res {
|
|
|
|
warn!(target: "rpc-client", "Unable to open a connection.")
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
let msg = format!("on_open called twice");
|
|
|
|
Err(WsError::new(WsErrorKind::Internal, msg))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn on_message(&mut self, msg: Message) -> WsResult<()> {
|
|
|
|
let ret: Result<JsonValue, JsonRpcError>;
|
|
|
|
let response_id;
|
|
|
|
let string = &msg.to_string();
|
|
|
|
match json::from_str::<Output>(&string) {
|
|
|
|
Ok(Output::Success(Success {
|
|
|
|
result,
|
|
|
|
id: Id::Num(id),
|
|
|
|
..
|
|
|
|
})) => {
|
|
|
|
ret = Ok(result);
|
|
|
|
response_id = id as usize;
|
|
|
|
}
|
|
|
|
Ok(Output::Failure(Failure {
|
|
|
|
error,
|
|
|
|
id: Id::Num(id),
|
|
|
|
..
|
|
|
|
})) => {
|
|
|
|
ret = Err(error);
|
|
|
|
response_id = id as usize;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!(
|
|
|
|
target: "rpc-client",
|
|
|
|
"recieved invalid message: {}\n {:?}",
|
|
|
|
string,
|
|
|
|
e
|
|
|
|
);
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
warn!(
|
|
|
|
target: "rpc-client",
|
|
|
|
"recieved invalid message: {}",
|
|
|
|
string
|
|
|
|
);
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match self.pending.remove(response_id) {
|
|
|
|
Some(c) => {
|
|
|
|
if let Err(_) = c.send(ret.map_err(|err| RpcError::JsonRpc(err))) {
|
|
|
|
warn!(target: "rpc-client", "Unable to send response.")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None => warn!(
|
|
|
|
target: "rpc-client",
|
|
|
|
"warning: unexpected id: {}",
|
|
|
|
response_id
|
|
|
|
),
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Keeping track of issued requests to be matched up with responses
|
|
|
|
#[derive(Clone)]
|
2020-08-05 06:08:03 +02:00
|
|
|
struct Pending(Arc<Mutex<BTreeMap<usize, Complete<Result<JsonValue, RpcError>>>>>);
|
2016-09-20 12:19:07 +02:00
|
|
|
|
|
|
|
impl Pending {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn new() -> Self {
|
|
|
|
Pending(Arc::new(Mutex::new(BTreeMap::new())))
|
|
|
|
}
|
|
|
|
fn insert(&mut self, k: usize, v: Complete<Result<JsonValue, RpcError>>) {
|
|
|
|
self.0.lock().insert(k, v);
|
|
|
|
}
|
|
|
|
fn remove(&mut self, k: usize) -> Option<Complete<Result<JsonValue, RpcError>>> {
|
|
|
|
self.0.lock().remove(&k)
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
fn get_authcode(path: &PathBuf) -> Result<String, RpcError> {
|
2020-08-05 06:08:03 +02:00
|
|
|
if let Ok(fd) = File::open(path) {
|
|
|
|
if let Some(Ok(line)) = BufReader::new(fd).lines().next() {
|
|
|
|
let mut parts = line.split(';');
|
|
|
|
let token = parts.next();
|
|
|
|
|
|
|
|
if let Some(code) = token {
|
|
|
|
return Ok(code.into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(RpcError::NoAuthCode)
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// The handle to the connection
|
|
|
|
pub struct Rpc {
|
2020-08-05 06:08:03 +02:00
|
|
|
out: Sender,
|
|
|
|
counter: AtomicUsize,
|
|
|
|
pending: Pending,
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Rpc {
|
2020-08-05 06:08:03 +02:00
|
|
|
/// Blocking, returns a new initialized connection or RpcError
|
|
|
|
pub fn new(url: &str, authpath: &PathBuf) -> Result<Self, RpcError> {
|
|
|
|
let rpc = Self::connect(url, authpath).map(|rpc| rpc).wait()?;
|
|
|
|
rpc
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Non-blocking, returns a future
|
|
|
|
pub fn connect(url: &str, authpath: &PathBuf) -> BoxFuture<Result<Self, RpcError>, Canceled> {
|
|
|
|
let (c, p) = oneshot::<Result<Self, RpcError>>();
|
|
|
|
match get_authcode(authpath) {
|
|
|
|
Err(e) => return Box::new(done(Ok(Err(e)))),
|
|
|
|
Ok(code) => {
|
|
|
|
let url = String::from(url);
|
|
|
|
// The ws::connect takes a FnMut closure, which means c cannot
|
|
|
|
// be moved into it, since it's consumed on complete.
|
|
|
|
// Therefore we wrap it in an option and pick it out once.
|
|
|
|
let mut once = Some(c);
|
|
|
|
thread::spawn(move || {
|
|
|
|
let conn = ws::connect(url, |out| {
|
|
|
|
// this will panic if the closure is called twice,
|
|
|
|
// which it should never be.
|
|
|
|
let c = once.take().expect("connection closure called only once");
|
|
|
|
RpcHandler::new(out, code.clone(), c)
|
|
|
|
});
|
|
|
|
match conn {
|
|
|
|
Err(err) => {
|
|
|
|
// since ws::connect is only called once, it cannot
|
|
|
|
// both fail and succeed.
|
|
|
|
let c = once.take().expect("connection closure called only once");
|
|
|
|
let _ = c.send(Err(RpcError::WsError(err)));
|
|
|
|
}
|
|
|
|
// c will complete on the `on_open` event in the Handler
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
});
|
|
|
|
Box::new(p)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Non-blocking, returns a future of the request response
|
|
|
|
pub fn request<T>(
|
|
|
|
&mut self,
|
|
|
|
method: &'static str,
|
|
|
|
params: Vec<JsonValue>,
|
|
|
|
) -> BoxFuture<Result<T, RpcError>, Canceled>
|
|
|
|
where
|
|
|
|
T: DeserializeOwned + Send + Sized,
|
|
|
|
{
|
|
|
|
let (c, p) = oneshot::<Result<JsonValue, RpcError>>();
|
|
|
|
|
|
|
|
let id = self.counter.fetch_add(1, Ordering::Relaxed);
|
|
|
|
self.pending.insert(id, c);
|
|
|
|
|
|
|
|
let request = MethodCall {
|
|
|
|
jsonrpc: Some(Version::V2),
|
|
|
|
method: method.to_owned(),
|
|
|
|
params: Params::Array(params),
|
|
|
|
id: Id::Num(id as u64),
|
|
|
|
};
|
|
|
|
|
|
|
|
let serialized = json::to_string(&request).expect("request is serializable");
|
|
|
|
let _ = self.out.send(serialized);
|
|
|
|
|
|
|
|
Box::new(p.map(|result| match result {
|
|
|
|
Ok(json) => {
|
|
|
|
let t: T = json::from_value(json)?;
|
|
|
|
Ok(t)
|
|
|
|
}
|
|
|
|
Err(err) => Err(err),
|
|
|
|
}))
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
pub enum RpcError {
|
2020-08-05 06:08:03 +02:00
|
|
|
WrongVersion(String),
|
|
|
|
ParseError(JsonError),
|
|
|
|
MalformedResponse(String),
|
|
|
|
JsonRpc(JsonRpcError),
|
|
|
|
WsError(WsError),
|
|
|
|
Canceled(Canceled),
|
|
|
|
UnexpectedId,
|
|
|
|
NoAuthCode,
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Debug for RpcError {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
|
|
|
|
match *self {
|
|
|
|
RpcError::WrongVersion(ref s) => write!(f, "Expected version 2.0, got {}", s),
|
|
|
|
RpcError::ParseError(ref err) => write!(f, "ParseError: {}", err),
|
|
|
|
RpcError::MalformedResponse(ref s) => write!(f, "Malformed response: {}", s),
|
|
|
|
RpcError::JsonRpc(ref json) => write!(f, "JsonRpc error: {:?}", json),
|
|
|
|
RpcError::WsError(ref s) => write!(f, "Websocket error: {}", s),
|
|
|
|
RpcError::Canceled(ref s) => write!(f, "Futures error: {:?}", s),
|
|
|
|
RpcError::UnexpectedId => write!(f, "Unexpected response id"),
|
|
|
|
RpcError::NoAuthCode => write!(f, "No authcodes available"),
|
|
|
|
}
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl From<JsonError> for RpcError {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn from(err: JsonError) -> RpcError {
|
|
|
|
RpcError::ParseError(err)
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl From<WsError> for RpcError {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn from(err: WsError) -> RpcError {
|
|
|
|
RpcError::WsError(err)
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl From<Canceled> for RpcError {
|
2020-08-05 06:08:03 +02:00
|
|
|
fn from(err: Canceled) -> RpcError {
|
|
|
|
RpcError::Canceled(err)
|
|
|
|
}
|
2016-09-20 12:19:07 +02:00
|
|
|
}
|