// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of OpenEthereum.
// OpenEthereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// OpenEthereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with OpenEthereum. If not, see .
use std::{
collections::BTreeMap,
fmt::{Debug, Error as FmtError, Formatter},
io::{BufRead, BufReader},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread, time,
};
use hash::keccak;
use parking_lot::Mutex;
use std::{fs::File, path::PathBuf};
use url::Url;
use ws::ws::{
self, Error as WsError, ErrorKind as WsErrorKind, Handler, Handshake, Message, Request,
Result as WsResult, Sender,
};
use serde::de::DeserializeOwned;
use serde_json::{self as json, Error as JsonError, Value as JsonValue};
use futures::{done, oneshot, Canceled, Complete, Future};
use jsonrpc_core::{
request::MethodCall,
response::{Failure, Output, Success},
Error as JsonRpcError, Id, Params, Version,
};
use BoxFuture;
/// The actual websocket connection handler, passed into the
/// event loop of ws-rs
struct RpcHandler {
pending: Pending,
// Option is used here as temporary storage until connection
// is setup and the values are moved into the new `Rpc`
complete: Option>>,
auth_code: String,
out: Option,
}
impl RpcHandler {
fn new(out: Sender, auth_code: String, complete: Complete>) -> Self {
RpcHandler {
out: Some(out),
auth_code: auth_code,
pending: Pending::new(),
complete: Some(complete),
}
}
}
impl Handler for RpcHandler {
fn build_request(&mut self, url: &Url) -> WsResult {
match Request::from_url(url) {
Ok(mut r) => {
let timestamp = time::UNIX_EPOCH
.elapsed()
.map_err(|err| WsError::new(WsErrorKind::Internal, format!("{}", err)))?;
let secs = timestamp.as_secs();
let hashed = keccak(format!("{}:{}", self.auth_code, secs));
let proto = format!("{:x}_{}", hashed, secs);
r.add_protocol(&proto);
Ok(r)
}
Err(e) => Err(WsError::new(WsErrorKind::Internal, format!("{}", e))),
}
}
fn on_error(&mut self, err: WsError) {
match self.complete.take() {
Some(c) => match c.send(Err(RpcError::WsError(err))) {
Ok(_) => {}
Err(_) => warn!(target: "rpc-client", "Unable to notify about error."),
},
None => warn!(target: "rpc-client", "unexpected error: {}", err),
}
}
fn on_open(&mut self, _: Handshake) -> WsResult<()> {
match (self.complete.take(), self.out.take()) {
(Some(c), Some(out)) => {
let res = c.send(Ok(Rpc {
out: out,
counter: AtomicUsize::new(0),
pending: self.pending.clone(),
}));
if let Err(_) = res {
warn!(target: "rpc-client", "Unable to open a connection.")
}
Ok(())
}
_ => {
let msg = format!("on_open called twice");
Err(WsError::new(WsErrorKind::Internal, msg))
}
}
}
fn on_message(&mut self, msg: Message) -> WsResult<()> {
let ret: Result;
let response_id;
let string = &msg.to_string();
match json::from_str::