From 248cd5e036a1c0b2cf9b20e880d9c3a7035bee8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 4 Feb 2017 22:18:19 +0100 Subject: [PATCH] RPC middleware: Informant & Client.keep_alive (#4384) * Adding RPC informant structs * RPC informant * Middleware counting RPC requests * Moving client keep_alive to middleware --- Cargo.lock | 44 +-- dapps/src/lib.rs | 11 +- dapps/src/rpc.rs | 12 +- dapps/src/tests/rpc.rs | 6 +- ethcore/src/client/client.rs | 23 +- ethcore/src/client/traits.rs | 4 - parity/blockchain.rs | 2 +- parity/dapps.rs | 4 +- parity/informant.rs | 32 ++- parity/rpc.rs | 10 +- parity/rpc_apis.rs | 31 ++- parity/run.rs | 9 +- parity/signer.rs | 25 +- rpc/Cargo.toml | 1 + rpc/src/lib.rs | 12 +- rpc/src/v1/helpers/informant.rs | 301 +++++++++++++++++++++ rpc/src/v1/helpers/mod.rs | 7 +- rpc/src/v1/impls/eth.rs | 75 ----- rpc/src/v1/impls/eth_filter.rs | 16 -- rpc/src/v1/impls/parity.rs | 70 ----- rpc/src/v1/impls/parity_accounts.rs | 28 +- rpc/src/v1/impls/parity_set.rs | 37 +-- rpc/src/v1/impls/personal.rs | 11 - rpc/src/v1/impls/signer.rs | 14 - rpc/src/v1/impls/signing.rs | 20 +- rpc/src/v1/impls/signing_unsafe.rs | 7 - rpc/src/v1/impls/traces.rs | 14 - rpc/src/v1/mod.rs | 2 +- rpc/src/v1/tests/mocked/parity_accounts.rs | 13 +- signer/src/ws_server/mod.rs | 46 +++- signer/src/ws_server/session.rs | 40 ++- 31 files changed, 520 insertions(+), 407 deletions(-) create mode 100644 rpc/src/v1/helpers/informant.rs diff --git a/Cargo.lock b/Cargo.lock index 5ca4ce1e3..48b95149b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.9.14 (registry+https://github.com/rust-lang/crates.io-index)", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -423,7 +423,7 @@ dependencies = [ "fetch 0.1.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/ethcore/hyper)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -594,11 +594,12 @@ dependencies = [ "ethsync 1.6.0", "fetch 0.1.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-ipc-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "order-stat 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "parity-reactor 0.1.0", "parity-updater 1.6.0", "rlp 0.1.0", @@ -621,7 +622,7 @@ dependencies = [ "ethcore-io 1.6.0", "ethcore-rpc 1.6.0", "ethcore-util 1.6.0", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps-glue 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-ui 1.6.0", @@ -641,7 +642,7 @@ dependencies = [ "ethcore-ipc-nano 1.6.0", "ethcore-util 1.6.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git)", "jsonrpc-tcp-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1001,8 +1002,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" -version = "5.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" +version = "5.1.0" +source = "git+https://github.com/ethcore/jsonrpc.git#d179ce34d8da8ea1cd67e93a5b4cb1e30f48c156" dependencies = [ "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1015,11 +1016,10 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "7.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" +source = "git+https://github.com/ethcore/jsonrpc.git#d179ce34d8da8ea1cd67e93a5b4cb1e30f48c156" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/ethcore/hyper)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1028,11 +1028,11 @@ dependencies = [ [[package]] name = "jsonrpc-ipc-server" version = "1.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" +source = "git+https://github.com/ethcore/jsonrpc.git#d179ce34d8da8ea1cd67e93a5b4cb1e30f48c156" dependencies = [ "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1044,21 +1044,19 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "0.2.0" -source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" +source = "git+https://github.com/ethcore/jsonrpc.git#d179ce34d8da8ea1cd67e93a5b4cb1e30f48c156" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "serde 0.8.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-tcp-server" version = "1.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" +source = "git+https://github.com/ethcore/jsonrpc.git#d179ce34d8da8ea1cd67e93a5b4cb1e30f48c156" dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1477,6 +1475,11 @@ dependencies = [ "user32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "order-stat" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "owning_ref" version = "0.2.2" @@ -1528,7 +1531,7 @@ dependencies = [ "ethcore-signer 1.6.0", "ethcore-util 1.6.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2521,7 +2524,7 @@ dependencies = [ "checksum itertools 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "086e1fa5fe48840b1cfdef3a20c7e3115599f8d5c4c87ef32a794a7cdd184d76" "checksum itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3088ea4baeceb0284ee9eea42f591226e6beaecf65373e41b38d95a1b8e7a1" "checksum itoa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "91fd9dc2c587067de817fec4ad355e3818c3d893a78cab32a0a474c7a15bb8d5" -"checksum jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" +"checksum jsonrpc-core 5.1.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" "checksum jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" "checksum jsonrpc-ipc-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" "checksum jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" @@ -2570,6 +2573,7 @@ dependencies = [ "checksum ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2c49021782e5233cd243168edfa8037574afed4eba4bbaf538b3d8d1789d8c" "checksum openssl 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "12be61c7eaa23228316ff02c39807e4c1b1af84ba81420f19fd58dade304b25c" "checksum openssl-sys 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d2845e841700e7b04282ceaa115407ea84e0db918ae689ad9ceb6f06fa6046bd" +"checksum order-stat 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "efa535d5117d3661134dbf1719b6f0ffe06f2375843b13935db186cd094105eb" "checksum owning_ref 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8d91377085359426407a287ab16884a0111ba473aa6844ff01d4ec20ce3d75e7" "checksum parity-dapps-glue 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "98378dec0a185da2b7180308752f0bad73aaa949c3e0a3b0528d0e067945f7ab" "checksum parity-ui-precompiled 1.4.0 (git+https://github.com/ethcore/js-precompiled.git)" = "" diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index 9ae163f88..50dcb39b1 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -70,9 +70,10 @@ use std::sync::{Arc, Mutex}; use std::net::SocketAddr; use std::collections::HashMap; -use ethcore_rpc::Metadata; +use ethcore_rpc::{Metadata}; use fetch::{Fetch, Client as FetchClient}; use hash_fetch::urlhint::ContractClient; +use jsonrpc_core::Middleware; use jsonrpc_core::reactor::RpcHandler; use router::auth::{Authorization, NoAuth, HttpBasicAuth}; use parity_reactor::Remote; @@ -179,7 +180,7 @@ impl ServerBuilder { /// Asynchronously start server with no authentication, /// returns result with `Server` handle on success or an error. - pub fn start_unsecured_http(self, addr: &SocketAddr, handler: RpcHandler) -> Result { + pub fn start_unsecured_http>(self, addr: &SocketAddr, handler: RpcHandler) -> Result { let fetch = self.fetch_client()?; Server::start_http( addr, @@ -199,7 +200,7 @@ impl ServerBuilder { /// Asynchronously start server with `HTTP Basic Authentication`, /// return result with `Server` handle on success or an error. - pub fn start_basic_auth_http(self, addr: &SocketAddr, username: &str, password: &str, handler: RpcHandler) -> Result { + pub fn start_basic_auth_http>(self, addr: &SocketAddr, username: &str, password: &str, handler: RpcHandler) -> Result { let fetch = self.fetch_client()?; Server::start_http( addr, @@ -258,11 +259,11 @@ impl Server { } } - fn start_http( + fn start_http>( addr: &SocketAddr, hosts: Option>, authorization: A, - handler: RpcHandler, + handler: RpcHandler, dapps_path: PathBuf, extra_dapps: Vec, signer_address: Option<(String, u16)>, diff --git a/dapps/src/rpc.rs b/dapps/src/rpc.rs index 1fffec6ee..bf1b1dc93 100644 --- a/dapps/src/rpc.rs +++ b/dapps/src/rpc.rs @@ -18,11 +18,15 @@ use std::sync::{Arc, Mutex}; use hyper; use ethcore_rpc::{Metadata, Origin}; +use jsonrpc_core::Middleware; use jsonrpc_core::reactor::RpcHandler; use jsonrpc_http_server::{Rpc, ServerHandler, PanicHandler, AccessControlAllowOrigin, HttpMetaExtractor}; use endpoint::{Endpoint, EndpointPath, Handler}; -pub fn rpc(handler: RpcHandler, panic_handler: Arc () + Send>>>>) -> Box { +pub fn rpc>( + handler: RpcHandler, + panic_handler: Arc () + Send>>>>, +) -> Box { Box::new(RpcEndpoint { handler: handler, meta_extractor: Arc::new(MetadataExtractor), @@ -33,15 +37,15 @@ pub fn rpc(handler: RpcHandler, panic_handler: Arc, +struct RpcEndpoint> { + handler: RpcHandler, meta_extractor: Arc>, panic_handler: Arc () + Send>>>>, cors_domain: Option>, allowed_hosts: Option>, } -impl Endpoint for RpcEndpoint { +impl> Endpoint for RpcEndpoint { fn to_async_handler(&self, _path: EndpointPath, control: hyper::Control) -> Box { let panic_handler = PanicHandler { handler: self.panic_handler.clone() }; Box::new(ServerHandler::new( diff --git a/dapps/src/tests/rpc.rs b/dapps/src/tests/rpc.rs index 54834e264..7c2486099 100644 --- a/dapps/src/tests/rpc.rs +++ b/dapps/src/tests/rpc.rs @@ -23,7 +23,7 @@ use tests::helpers::{serve_with_rpc, request}; #[test] fn should_serve_rpc() { // given - let mut io = MetaIoHandler::new(); + let mut io = MetaIoHandler::default(); io.add_method("rpc_test", |_| { Ok(Value::String("Hello World!".into())) }); @@ -53,7 +53,7 @@ fn should_serve_rpc() { #[test] fn should_extract_metadata() { // given - let mut io = MetaIoHandler::new(); + let mut io = MetaIoHandler::default(); io.add_method_with_meta("rpc_test", |_params, meta: Metadata| { assert_eq!(meta.dapp_id, Some("https://parity.io/".to_owned())); assert_eq!(meta.origin, Origin::Dapps); @@ -87,7 +87,7 @@ fn should_extract_metadata() { #[test] fn should_extract_metadata_from_custom_header() { // given - let mut io = MetaIoHandler::new(); + let mut io = MetaIoHandler::default(); io.add_method_with_meta("rpc_test", |_params, meta: Metadata| { assert_eq!(meta.dapp_id, Some("https://parity.io/".to_owned())); assert_eq!(meta.origin, Origin::Dapps); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 77189c3fd..d85685f3f 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -262,6 +262,18 @@ impl Client { Ok(client) } + /// Wakes up client if it's a sleep. + pub fn keep_alive(&self) { + let should_wake = match *self.mode.lock() { + Mode::Dark(..) | Mode::Passive(..) => true, + _ => false, + }; + if should_wake { + self.wake_up(); + (*self.sleep_state.lock()).last_activity = Some(Instant::now()); + } + } + /// Adds an actor to be notified on certain events pub fn add_notify(&self, target: Arc) { self.notify.write().push(Arc::downgrade(&target)); @@ -1011,17 +1023,6 @@ impl BlockChainClient for Client { Ok(ret) } - fn keep_alive(&self) { - let should_wake = match *self.mode.lock() { - Mode::Dark(..) | Mode::Passive(..) => true, - _ => false, - }; - if should_wake { - self.wake_up(); - (*self.sleep_state.lock()).last_activity = Some(Instant::now()); - } - } - fn mode(&self) -> IpcMode { let r = self.mode.lock().clone().into(); trace!(target: "mode", "Asked for mode = {:?}. returning {:?}", &*self.mode.lock(), r); diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 519c9185a..dfb251296 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -46,10 +46,6 @@ use encoded; /// Blockchain database client. Owns and manages a blockchain and a block queue. pub trait BlockChainClient : Sync + Send { - /// Should be called by any external-facing interface when actively using the client. - /// To minimise chatter, there's no need to call more than once every 30s. - fn keep_alive(&self) {} - /// Get raw block header data by block id. fn block_header(&self, id: BlockId) -> Option; diff --git a/parity/blockchain.rs b/parity/blockchain.rs index c7e9e780f..59cfd0a59 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -242,7 +242,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> { } }; - let informant = Arc::new(Informant::new(client.clone(), None, None, None, cmd.with_color)); + let informant = Arc::new(Informant::new(client.clone(), None, None, None, None, cmd.with_color)); service.register_io_handler(informant).map_err(|_| "Unable to register informant handler".to_owned())?; let do_import = |bytes| { diff --git a/parity/dapps.rs b/parity/dapps.rs index 9f674eadb..572d32b48 100644 --- a/parity/dapps.rs +++ b/parity/dapps.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use dir::default_data_path; use ethcore::client::Client; +use ethcore_rpc::informant::RpcStats; use ethsync::SyncProvider; use hash_fetch::fetch::Client as FetchClient; use helpers::replace_home; @@ -64,6 +65,7 @@ pub struct Dependencies { pub remote: Remote, pub fetch: FetchClient, pub signer: Arc, + pub stats: Arc, } pub fn new(configuration: Configuration, deps: Dependencies) -> Result, String> { @@ -174,7 +176,7 @@ mod server { } else { rpc_apis::ApiSet::UnsafeContext }; - let apis = rpc_apis::setup_rpc(Default::default(), deps.apis.clone(), api_set); + let apis = rpc_apis::setup_rpc(deps.stats, deps.apis.clone(), api_set); let handler = RpcHandler::new(Arc::new(apis), deps.remote); let start_result = match auth { None => { diff --git a/parity/informant.rs b/parity/informant.rs index c82c9b2ae..2fa95db9a 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -30,7 +30,8 @@ use ethcore::service::ClientIoMessage; use ethcore::snapshot::service::Service as SnapshotService; use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; use number_prefix::{binary_prefix, Standalone, Prefixed}; -use ethcore_rpc::is_major_importing; +use ethcore_rpc::{is_major_importing}; +use ethcore_rpc::informant::RpcStats; use rlp::View; pub struct Informant { @@ -41,6 +42,7 @@ pub struct Informant { snapshot: Option>, sync: Option>, net: Option>, + rpc_stats: Option>, last_import: Mutex, skipped: AtomicUsize, skipped_txs: AtomicUsize, @@ -63,13 +65,20 @@ pub trait MillisecondDuration { impl MillisecondDuration for Duration { fn as_milliseconds(&self) -> u64 { - self.as_secs() * 1000 + self.subsec_nanos() as u64 / 1000000 + self.as_secs() * 1000 + self.subsec_nanos() as u64 / 1_000_000 } } impl Informant { /// Make a new instance potentially `with_color` output. - pub fn new(client: Arc, sync: Option>, net: Option>, snapshot: Option>, with_color: bool) -> Self { + pub fn new( + client: Arc, + sync: Option>, + net: Option>, + snapshot: Option>, + rpc_stats: Option>, + with_color: bool, + ) -> Self { Informant { report: RwLock::new(None), last_tick: RwLock::new(Instant::now()), @@ -78,6 +87,7 @@ impl Informant { snapshot: snapshot, sync: sync, net: net, + rpc_stats: rpc_stats, last_import: Mutex::new(Instant::now()), skipped: AtomicUsize::new(0), skipped_txs: AtomicUsize::new(0), @@ -102,6 +112,7 @@ impl Informant { let cache_info = self.client.blockchain_cache_info(); let network_config = self.net.as_ref().map(|n| n.network_config()); let sync_status = self.sync.as_ref().map(|s| s.status()); + let rpc_stats = self.rpc_stats.as_ref(); let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info()); let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s| @@ -126,10 +137,10 @@ impl Informant { false => t, }; - info!(target: "import", "{} {} {}", + info!(target: "import", "{} {} {} {}", match importing { true => match snapshot_sync { - false => format!("Syncing {} {} {} {}+{} Qed", + false => format!("Syncing {} {} {} {}+{} Qed", paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), paint(White.bold(), format!("{}", chain_info.best_block_hash)), { @@ -170,7 +181,16 @@ impl Informant { Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", format_bytes(sync_info.mem_used)))), _ => String::new(), } - ) + ), + match rpc_stats { + Some(ref rpc_stats) => format!( + "RPC: {} conn, {} req/s, {} µs", + paint(Blue.bold(), format!("{:2}", rpc_stats.sessions())), + paint(Blue.bold(), format!("{:2}", rpc_stats.requests_rate())), + paint(Blue.bold(), format!("{:3}", rpc_stats.approximated_roundtrip())), + ), + _ => String::new(), + }, ); *write_report = Some(report); diff --git a/parity/rpc.rs b/parity/rpc.rs index 53d4a4293..fd077f1ff 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -22,6 +22,7 @@ use io::PanicHandler; use dir::default_data_path; use ethcore_rpc::{self as rpc, RpcServerError, IpcServerError, Metadata}; +use ethcore_rpc::informant::{RpcStats, Middleware}; use helpers::parity_ipc_path; use jsonrpc_core::MetaIoHandler; use jsonrpc_core::reactor::{RpcHandler, Remote}; @@ -85,6 +86,7 @@ pub struct Dependencies { pub panic_handler: Arc, pub apis: Arc, pub remote: Remote, + pub stats: Arc, } pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Result, String> { @@ -97,8 +99,8 @@ pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Result MetaIoHandler { - rpc_apis::setup_rpc(MetaIoHandler::default(), deps.apis.clone(), apis) +fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler { + rpc_apis::setup_rpc(deps.stats.clone(), deps.apis.clone(), apis) } pub fn setup_http_rpc_server( @@ -122,12 +124,12 @@ pub fn setup_http_rpc_server( } } -pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Result>, String> { +pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Result>, String> { if !conf.enabled { return Ok(None); } Ok(Some(setup_ipc_rpc_server(deps, &conf.socket_addr, conf.apis)?)) } -pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: ApiSet) -> Result, String> { +pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: ApiSet) -> Result, String> { let apis = setup_apis(apis, dependencies); let handler = RpcHandler::new(Arc::new(apis), dependencies.remote.clone()); match rpc::start_ipc(addr, handler) { diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index b77bb3899..4c5e88406 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -14,22 +14,25 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::cmp::PartialEq; use std::collections::BTreeMap; use std::collections::HashSet; -use std::cmp::PartialEq; use std::str::FromStr; use std::sync::Arc; -use util::RotatingLogger; -use jsonrpc_core::{MetaIoHandler}; -use ethcore::miner::{Miner, ExternalMiner}; -use ethcore::client::Client; -use ethcore::account_provider::AccountProvider; -use ethcore::snapshot::SnapshotService; -use ethsync::{ManageNetwork, SyncProvider}; -use ethcore_rpc::{Metadata, NetworkSettings}; + pub use ethcore_rpc::SignerService; -use updater::Updater; + +use ethcore::account_provider::AccountProvider; +use ethcore::client::Client; +use ethcore::miner::{Miner, ExternalMiner}; +use ethcore::snapshot::SnapshotService; +use ethcore_rpc::{Metadata, NetworkSettings}; +use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier}; +use ethsync::{ManageNetwork, SyncProvider}; use hash_fetch::fetch::Client as FetchClient; +use jsonrpc_core::{MetaIoHandler}; +use updater::Updater; +use util::RotatingLogger; #[derive(Debug, PartialEq, Clone, Eq, Hash)] pub enum Api { @@ -182,9 +185,13 @@ macro_rules! add_signing_methods { } } -pub fn setup_rpc(mut handler: MetaIoHandler, deps: Arc, apis: ApiSet) -> MetaIoHandler { +pub fn setup_rpc(stats: Arc, deps: Arc, apis: ApiSet) -> MetaIoHandler { use ethcore_rpc::v1::*; + let mut handler = MetaIoHandler::with_middleware(Middleware::new(stats, ClientNotifier { + client: deps.client.clone(), + })); + // it's turned into vector, cause ont of the cases requires &[] let apis = apis.list_apis().into_iter().collect::>(); for api in &apis { @@ -244,7 +251,7 @@ pub fn setup_rpc(mut handler: MetaIoHandler, deps: Arc, add_signing_methods!(ParitySigning, handler, deps); }, Api::ParityAccounts => { - handler.extend_with(ParityAccountsClient::new(&deps.secret_store, &deps.client).to_delegate()); + handler.extend_with(ParityAccountsClient::new(&deps.secret_store).to_delegate()); }, Api::ParitySet => { handler.extend_with(ParitySetClient::new( diff --git a/parity/run.rs b/parity/run.rs index d16430a5e..9247b254a 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::net::{TcpListener}; use ctrlc::CtrlC; use fdlimit::raise_fd_limit; -use ethcore_rpc::{NetworkSettings, is_major_importing}; +use ethcore_rpc::{NetworkSettings, informant, is_major_importing}; use ethsync::NetworkConfiguration; use util::{Colour, version, RotatingLogger, Mutex, Condvar}; use io::{MayPanic, ForwardPanic, PanicHandler}; @@ -358,6 +358,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R service.add_notify(updater.clone()); // set up dependencies for rpc servers + let rpc_stats = Arc::new(informant::RpcStats::default()); let signer_path = cmd.signer_conf.signer_path.clone(); let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies { signer_service: Arc::new(rpc_apis::SignerService::new(move || { @@ -390,6 +391,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R panic_handler: panic_handler.clone(), apis: deps_for_rpc_apis.clone(), remote: event_loop.raw_remote(), + stats: rpc_stats.clone(), }; // start rpc servers @@ -405,6 +407,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R remote: event_loop.raw_remote(), fetch: fetch.clone(), signer: deps_for_rpc_apis.signer_service.clone(), + stats: rpc_stats.clone(), }; let dapps_server = dapps::new(cmd.dapps_conf.clone(), dapps_deps)?; @@ -413,6 +416,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R panic_handler: panic_handler.clone(), apis: deps_for_rpc_apis.clone(), remote: event_loop.raw_remote(), + rpc_stats: rpc_stats.clone(), }; let signer_server = signer::start(cmd.signer_conf.clone(), signer_deps)?; @@ -422,7 +426,8 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R Some(sync_provider.clone()), Some(manage_network.clone()), Some(snapshot_service.clone()), - cmd.logger_config.color + Some(rpc_stats.clone()), + cmd.logger_config.color, )); service.add_notify(informant.clone()); service.register_io_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?; diff --git a/parity/signer.rs b/parity/signer.rs index ded4e68b6..9b22968dc 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -15,18 +15,21 @@ // along with Parity. If not, see . use std::io; -use std::sync::Arc; use std::path::PathBuf; -use ansi_term::Colour; -use io::{ForwardPanic, PanicHandler}; -use util::path::restrict_permissions_owner; -use rpc_apis; -use ethcore_signer as signer; -use dir::default_data_path; -use helpers::replace_home; -use jsonrpc_core::reactor::{RpcHandler, Remote}; +use std::sync::Arc; + pub use ethcore_signer::Server as SignerServer; +use ansi_term::Colour; +use dir::default_data_path; +use ethcore_rpc::informant::RpcStats; +use ethcore_signer as signer; +use helpers::replace_home; +use io::{ForwardPanic, PanicHandler}; +use jsonrpc_core::reactor::{RpcHandler, Remote}; +use rpc_apis; +use util::path::restrict_permissions_owner; + const CODES_FILENAME: &'static str = "authcodes"; #[derive(Debug, PartialEq, Clone)] @@ -55,6 +58,7 @@ pub struct Dependencies { pub panic_handler: Arc, pub apis: Arc, pub remote: Remote, + pub rpc_stats: Arc, } pub struct NewToken { @@ -126,7 +130,8 @@ fn do_start(conf: Configuration, deps: Dependencies) -> Result( +pub fn start_http>( addr: &SocketAddr, cors_domains: Option>, allowed_hosts: Option>, panic_handler: Arc, - handler: RpcHandler, + handler: RpcHandler, ) -> Result { let cors_domains = cors_domains.map(|domains| { @@ -95,7 +96,10 @@ pub fn start_http( } /// Start ipc server asynchronously and returns result with `Server` handle on success or an error. -pub fn start_ipc(addr: &str, handler: RpcHandler) -> Result, ipc::Error> { +pub fn start_ipc>( + addr: &str, + handler: RpcHandler, +) -> Result, ipc::Error> { let server = ipc::Server::with_rpc_handler(addr, handler)?; server.run_async()?; Ok(server) diff --git a/rpc/src/v1/helpers/informant.rs b/rpc/src/v1/helpers/informant.rs new file mode 100644 index 000000000..c1bbbe6c2 --- /dev/null +++ b/rpc/src/v1/helpers/informant.rs @@ -0,0 +1,301 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! RPC Requests Statistics + +use std::fmt; +use std::sync::Arc; +use std::sync::atomic::{self, AtomicUsize}; +use std::time; +use futures::Future; +use jsonrpc_core as rpc; +use order_stat; +use util::RwLock; + +const RATE_SECONDS: usize = 10; +const STATS_SAMPLES: usize = 60; + +struct RateCalculator { + era: time::Instant, + samples: [u16; RATE_SECONDS], +} + +impl fmt::Debug for RateCalculator { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{} req/s", self.rate()) + } +} + +impl Default for RateCalculator { + fn default() -> Self { + RateCalculator { + era: time::Instant::now(), + samples: [0; RATE_SECONDS], + } + } +} + +impl RateCalculator { + fn elapsed(&self) -> u64 { + self.era.elapsed().as_secs() + } + + pub fn tick(&mut self) -> u16 { + if self.elapsed() >= RATE_SECONDS as u64 { + self.era = time::Instant::now(); + self.samples[0] = 0; + } + + let pos = self.elapsed() as usize % RATE_SECONDS; + let next = (pos + 1) % RATE_SECONDS; + self.samples[next] = 0; + self.samples[pos] = self.samples[pos].saturating_add(1); + self.samples[pos] + } + + fn current_rate(&self) -> usize { + let now = match self.elapsed() { + i if i >= RATE_SECONDS as u64 => RATE_SECONDS, + i => i as usize + 1, + }; + let sum: usize = self.samples[0..now].iter().map(|x| *x as usize).sum(); + sum / now + } + + pub fn rate(&self) -> usize { + if self.elapsed() > RATE_SECONDS as u64 { + 0 + } else { + self.current_rate() + } + } +} + +struct StatsCalculator { + filled: bool, + idx: usize, + samples: [T; STATS_SAMPLES], +} + +impl Default for StatsCalculator { + fn default() -> Self { + StatsCalculator { + filled: false, + idx: 0, + samples: [T::default(); STATS_SAMPLES], + } + } +} + +impl fmt::Debug for StatsCalculator { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "median: {} ms", self.approximated_median()) + } +} + +impl StatsCalculator { + pub fn add(&mut self, sample: T) { + self.idx += 1; + if self.idx >= STATS_SAMPLES { + self.filled = true; + self.idx = 0; + } + + self.samples[self.idx] = sample; + } + + /// Returns aproximate of media + pub fn approximated_median(&self) -> T { + let mut copy = [T::default(); STATS_SAMPLES]; + copy.copy_from_slice(&self.samples); + let bound = if self.filled { STATS_SAMPLES } else { self.idx + 1 }; + + let (_, &mut median) = order_stat::median_of_medians(&mut copy[0..bound]); + median + } +} + +/// RPC Statistics +#[derive(Default, Debug)] +pub struct RpcStats { + requests: RwLock, + roundtrips: RwLock>, + active_sessions: AtomicUsize, +} + +impl RpcStats { + /// Count session opened + pub fn open_session(&self) { + self.active_sessions.fetch_add(1, atomic::Ordering::SeqCst); + } + + /// Count session closed. + /// Silently overflows if closing unopened session. + pub fn close_session(&self) { + self.active_sessions.fetch_sub(1, atomic::Ordering::SeqCst); + } + + /// Count request. Returns number of requests in current second. + pub fn count_request(&self) -> u16 { + self.requests.write().tick() + } + + /// Add roundtrip time (microseconds) + pub fn add_roundtrip(&self, microseconds: u32) { + self.roundtrips.write().add(microseconds) + } + + /// Returns number of open sessions + pub fn sessions(&self) -> usize { + self.active_sessions.load(atomic::Ordering::Relaxed) + } + + /// Returns requests rate + pub fn requests_rate(&self) -> usize { + self.requests.read().rate() + } + + /// Returns approximated roundtrip in microseconds + pub fn approximated_roundtrip(&self) -> u32 { + self.roundtrips.read().approximated_median() + } +} + +/// Notifies about RPC activity. +pub trait ActivityNotifier: Send + Sync + 'static { + /// Activity on RPC interface + fn active(&self); +} + +/// Stats-counting RPC middleware +pub struct Middleware { + stats: Arc, + notifier: T, +} + +impl Middleware { + /// Create new Middleware with stats counter and activity notifier. + pub fn new(stats: Arc, notifier: T) -> Self { + Middleware { + stats: stats, + notifier: notifier, + } + } + + fn as_micro(dur: time::Duration) -> u32 { + (dur.as_secs() * 1_000_000) as u32 + dur.subsec_nanos() / 1_000 + } +} + +impl rpc::Middleware for Middleware { + fn on_request(&self, request: rpc::Request, meta: M, process: F) -> rpc::FutureResponse where + F: FnOnce(rpc::Request, M) -> rpc::FutureResponse, + { + let start = time::Instant::now(); + let response = process(request, meta); + + self.notifier.active(); + let stats = self.stats.clone(); + stats.count_request(); + response.map(move |res| { + stats.add_roundtrip(Self::as_micro(start.elapsed())); + res + }).boxed() + } +} + +/// Client Notifier +pub struct ClientNotifier { + /// Client + pub client: Arc<::ethcore::client::Client>, +} + +impl ActivityNotifier for ClientNotifier { + fn active(&self) { + self.client.keep_alive() + } +} + +#[cfg(test)] +mod tests { + + use super::{RateCalculator, StatsCalculator, RpcStats}; + + #[test] + fn should_calculate_rate() { + // given + let mut avg = RateCalculator::default(); + + // when + avg.tick(); + avg.tick(); + avg.tick(); + let rate = avg.rate(); + + // then + assert_eq!(rate, 3usize); + } + + #[test] + fn should_approximate_median() { + // given + let mut stats = StatsCalculator::default(); + stats.add(5); + stats.add(100); + stats.add(3); + stats.add(15); + stats.add(20); + stats.add(6); + + // when + let median = stats.approximated_median(); + + // then + assert_eq!(median, 5); + } + + #[test] + fn should_count_rpc_stats() { + // given + let stats = RpcStats::default(); + assert_eq!(stats.sessions(), 0); + assert_eq!(stats.requests_rate(), 0); + assert_eq!(stats.approximated_roundtrip(), 0); + + // when + stats.open_session(); + stats.close_session(); + stats.open_session(); + stats.count_request(); + stats.count_request(); + stats.add_roundtrip(125); + + // then + assert_eq!(stats.sessions(), 1); + assert_eq!(stats.requests_rate(), 2); + assert_eq!(stats.approximated_roundtrip(), 125); + } + + #[test] + fn should_be_sync_and_send() { + let stats = RpcStats::default(); + is_sync(stats); + } + + fn is_sync(x: F) { + drop(x) + } +} diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index da24f0a5b..04953ae31 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -17,16 +17,18 @@ #[macro_use] pub mod errors; -pub mod dispatch; pub mod block_import; +pub mod dispatch; +pub mod informant; +mod network_settings; mod poll_manager; mod poll_filter; mod requests; mod signer; mod signing_queue; -mod network_settings; +pub use self::network_settings::NetworkSettings; pub use self::poll_manager::PollManager; pub use self::poll_filter::{PollFilter, limit_logs}; pub use self::requests::{ @@ -36,4 +38,3 @@ pub use self::signing_queue::{ ConfirmationsQueue, ConfirmationPromise, ConfirmationResult, SigningQueue, QueueEvent, DefaultAccount, }; pub use self::signer::SignerService; -pub use self::network_settings::NetworkSettings; diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 5270e417d..1784cda18 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -258,20 +258,6 @@ fn check_known(client: &C, number: BlockNumber) -> Result<(), Error> where C: const MAX_QUEUE_SIZE_TO_MINE_ON: usize = 4; // because uncles go back 6. -impl EthClient where - C: MiningBlockChainClient + 'static, - SN: SnapshotService + 'static, - S: SyncProvider + 'static, - M: MinerService + 'static, - EM: ExternalMinerService + 'static { - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } -} - #[cfg(windows)] static SOLC: &'static str = "solc.exe"; @@ -288,8 +274,6 @@ impl Eth for EthClient where type Metadata = Metadata; fn protocol_version(&self) -> Result { - self.active()?; - let version = take_weak!(self.sync).status().protocol_version.to_owned(); Ok(format!("{}", version)) } @@ -297,7 +281,6 @@ impl Eth for EthClient where fn syncing(&self) -> Result { use ethcore::snapshot::RestorationStatus; - self.active()?; let status = take_weak!(self.sync).status(); let client = take_weak!(self.client); let snapshot_status = take_weak!(self.snapshot).status(); @@ -331,8 +314,6 @@ impl Eth for EthClient where let dapp = meta.dapp_id.unwrap_or_default(); let author = move || { - self.active()?; - let mut miner = take_weak!(self.miner).author(); if miner == 0.into() { let accounts = self.dapp_accounts(dapp.into())?; @@ -348,20 +329,14 @@ impl Eth for EthClient where } fn is_mining(&self) -> Result { - self.active()?; - Ok(take_weak!(self.miner).is_sealing()) } fn hashrate(&self) -> Result { - self.active()?; - Ok(RpcU256::from(self.external_miner.hashrate())) } fn gas_price(&self) -> Result { - self.active()?; - let (client, miner) = (take_weak!(self.client), take_weak!(self.miner)); Ok(RpcU256::from(default_gas_price(&*client, &*miner))) } @@ -370,8 +345,6 @@ impl Eth for EthClient where let dapp = meta.dapp_id.unwrap_or_default(); let accounts = move || { - self.active()?; - let accounts = self.dapp_accounts(dapp.into())?; Ok(accounts.into_iter().map(Into::into).collect()) }; @@ -380,14 +353,10 @@ impl Eth for EthClient where } fn block_number(&self) -> Result { - self.active()?; - Ok(RpcU256::from(take_weak!(self.client).chain_info().best_block_number)) } fn balance(&self, address: RpcH160, num: Trailing) -> Result { - self.active()?; - let address = address.into(); match num.0 { BlockNumber::Pending => Ok(take_weak!(self.miner).balance(&*take_weak!(self.client), &address).into()), @@ -404,7 +373,6 @@ impl Eth for EthClient where } fn storage_at(&self, address: RpcH160, pos: RpcU256, num: Trailing) -> Result { - self.active()?; let address: Address = RpcH160::into(address); let position: U256 = RpcU256::into(pos); match num.0 { @@ -422,8 +390,6 @@ impl Eth for EthClient where } fn transaction_count(&self, address: RpcH160, num: Trailing) -> Result { - self.active()?; - let address: Address = RpcH160::into(address); match num.0 { BlockNumber::Pending => Ok(take_weak!(self.miner).nonce(&*take_weak!(self.client), &address).into()), @@ -440,7 +406,6 @@ impl Eth for EthClient where } fn block_transaction_count_by_hash(&self, hash: RpcH256) -> Result, Error> { - self.active()?; Ok( take_weak!(self.client).block(BlockId::Hash(hash.into())) .map(|block| block.transactions_count().into()) @@ -448,8 +413,6 @@ impl Eth for EthClient where } fn block_transaction_count_by_number(&self, num: BlockNumber) -> Result, Error> { - self.active()?; - match num { BlockNumber::Pending => Ok(Some( take_weak!(self.miner).status().transactions_in_pending_block.into() @@ -462,8 +425,6 @@ impl Eth for EthClient where } fn block_uncles_count_by_hash(&self, hash: RpcH256) -> Result, Error> { - self.active()?; - Ok( take_weak!(self.client).block(BlockId::Hash(hash.into())) .map(|block| block.uncles_count().into()) @@ -471,8 +432,6 @@ impl Eth for EthClient where } fn block_uncles_count_by_number(&self, num: BlockNumber) -> Result, Error> { - self.active()?; - match num { BlockNumber::Pending => Ok(Some(0.into())), _ => Ok( @@ -483,8 +442,6 @@ impl Eth for EthClient where } fn code_at(&self, address: RpcH160, num: Trailing) -> Result { - self.active()?; - let address: Address = RpcH160::into(address); match num.0 { BlockNumber::Pending => Ok(take_weak!(self.miner).code(&*take_weak!(self.client), &address).map_or_else(Bytes::default, Bytes::new)), @@ -501,19 +458,14 @@ impl Eth for EthClient where } fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> Result, Error> { - self.active()?; - self.block(BlockId::Hash(hash.into()), include_txs) } fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> Result, Error> { - self.active()?; - self.block(num.into(), include_txs) } fn transaction_by_hash(&self, hash: RpcH256) -> Result, Error> { - self.active()?; let hash: H256 = hash.into(); let miner = take_weak!(self.miner); let client = take_weak!(self.client); @@ -521,20 +473,14 @@ impl Eth for EthClient where } fn transaction_by_block_hash_and_index(&self, hash: RpcH256, index: Index) -> Result, Error> { - self.active()?; - self.transaction(TransactionId::Location(BlockId::Hash(hash.into()), index.value())) } fn transaction_by_block_number_and_index(&self, num: BlockNumber, index: Index) -> Result, Error> { - self.active()?; - self.transaction(TransactionId::Location(num.into(), index.value())) } fn transaction_receipt(&self, hash: RpcH256) -> Result, Error> { - self.active()?; - let miner = take_weak!(self.miner); let best_block = take_weak!(self.client).chain_info().best_block_number; let hash: H256 = hash.into(); @@ -549,20 +495,14 @@ impl Eth for EthClient where } fn uncle_by_block_hash_and_index(&self, hash: RpcH256, index: Index) -> Result, Error> { - self.active()?; - self.uncle(UncleId { block: BlockId::Hash(hash.into()), position: index.value() }) } fn uncle_by_block_number_and_index(&self, num: BlockNumber, index: Index) -> Result, Error> { - self.active()?; - self.uncle(UncleId { block: num.into(), position: index.value() }) } fn compilers(&self) -> Result, Error> { - self.active()?; - let mut compilers = vec![]; if Command::new(SOLC).output().is_ok() { compilers.push("solidity".to_owned()) @@ -591,7 +531,6 @@ impl Eth for EthClient where } fn work(&self, no_new_work_timeout: Trailing) -> Result { - self.active()?; let no_new_work_timeout = no_new_work_timeout.0; let client = take_weak!(self.client); @@ -643,8 +582,6 @@ impl Eth for EthClient where } fn submit_work(&self, nonce: RpcH64, pow_hash: RpcH256, mix_hash: RpcH256) -> Result { - self.active()?; - let nonce: H64 = nonce.into(); let pow_hash: H256 = pow_hash.into(); let mix_hash: H256 = mix_hash.into(); @@ -657,14 +594,11 @@ impl Eth for EthClient where } fn submit_hashrate(&self, rate: RpcU256, id: RpcH256) -> Result { - self.active()?; self.external_miner.submit_hashrate(rate.into(), id.into()); Ok(true) } fn send_raw_transaction(&self, raw: Bytes) -> Result { - self.active()?; - UntrustedRlp::new(&raw.into_vec()).as_val() .map_err(errors::from_rlp_error) .and_then(|tx| SignedTransaction::new(tx).map_err(errors::from_transaction_error)) @@ -679,8 +613,6 @@ impl Eth for EthClient where } fn call(&self, request: CallRequest, num: Trailing) -> Result { - self.active()?; - let request = CallRequest::into(request); let signed = self.sign_call(request)?; @@ -695,8 +627,6 @@ impl Eth for EthClient where } fn estimate_gas(&self, request: CallRequest, num: Trailing) -> Result { - self.active()?; - let request = CallRequest::into(request); let signed = self.sign_call(request)?; take_weak!(self.client).estimate_gas(&signed, num.0.into()) @@ -705,19 +635,14 @@ impl Eth for EthClient where } fn compile_lll(&self, _: String) -> Result { - self.active()?; - rpc_unimplemented!() } fn compile_serpent(&self, _: String) -> Result { - self.active()?; - rpc_unimplemented!() } fn compile_solidity(&self, code: String) -> Result { - self.active()?; let maybe_child = Command::new(SOLC) .arg("--bin") .arg("--optimize") diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index 9ca9692b1..cf3398498 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -50,19 +50,12 @@ impl EthFilterClient where polls: Mutex::new(PollManager::new()), } } - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } } impl EthFilter for EthFilterClient where C: BlockChainClient + 'static, M: MinerService + 'static { fn new_filter(&self, filter: Filter) -> Result { - self.active()?; let mut polls = self.polls.lock(); let block_number = take_weak!(self.client).chain_info().best_block_number; let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); @@ -70,16 +63,12 @@ impl EthFilter for EthFilterClient } fn new_block_filter(&self) -> Result { - self.active()?; - let mut polls = self.polls.lock(); let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); Ok(id.into()) } fn new_pending_transaction_filter(&self) -> Result { - self.active()?; - let mut polls = self.polls.lock(); let best_block = take_weak!(self.client).chain_info().best_block_number; let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(best_block); @@ -88,7 +77,6 @@ impl EthFilter for EthFilterClient } fn filter_changes(&self, index: Index) -> Result { - self.active()?; let client = take_weak!(self.client); let mut polls = self.polls.lock(); match polls.poll_mut(&index.value()) { @@ -180,8 +168,6 @@ impl EthFilter for EthFilterClient } fn filter_logs(&self, index: Index) -> Result, Error> { - self.active()?; - let mut polls = self.polls.lock(); match polls.poll(&index.value()) { Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { @@ -207,8 +193,6 @@ impl EthFilter for EthFilterClient } fn uninstall_filter(&self, index: Index) -> Result { - self.active()?; - self.polls.lock().remove_poll(&index.value()); Ok(true) } diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 055ccebeb..e872b8dc3 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -101,12 +101,6 @@ impl ParityClient where dapps_port: dapps_port, } } - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } } impl Parity for ParityClient where @@ -118,8 +112,6 @@ impl Parity for ParityClient where type Metadata = Metadata; fn accounts_info(&self, dapp: Trailing) -> Result>, Error> { - self.active()?; - let dapp = dapp.0; let store = take_weak!(self.accounts); @@ -149,8 +141,6 @@ impl Parity for ParityClient where fn default_account(&self, meta: Self::Metadata) -> BoxFuture { let dapp_id = meta.dapp_id.unwrap_or_default(); let default_account = move || { - self.active()?; - Ok(take_weak!(self.accounts) .dapps_addresses(dapp_id.into()) .ok() @@ -163,57 +153,39 @@ impl Parity for ParityClient where } fn transactions_limit(&self) -> Result { - self.active()?; - Ok(take_weak!(self.miner).transactions_limit()) } fn min_gas_price(&self) -> Result { - self.active()?; - Ok(U256::from(take_weak!(self.miner).minimal_gas_price())) } fn extra_data(&self) -> Result { - self.active()?; - Ok(Bytes::new(take_weak!(self.miner).extra_data())) } fn gas_floor_target(&self) -> Result { - self.active()?; - Ok(U256::from(take_weak!(self.miner).gas_floor_target())) } fn gas_ceil_target(&self) -> Result { - self.active()?; - Ok(U256::from(take_weak!(self.miner).gas_ceil_target())) } fn dev_logs(&self) -> Result, Error> { - self.active()?; - let logs = self.logger.logs(); Ok(logs.as_slice().to_owned()) } fn dev_logs_levels(&self) -> Result { - self.active()?; - Ok(self.logger.levels().to_owned()) } fn net_chain(&self) -> Result { - self.active()?; - Ok(self.settings.chain.clone()) } fn net_peers(&self) -> Result { - self.active()?; - let sync = take_weak!(self.sync); let sync_status = sync.status(); let net_config = take_weak!(self.net).network_config(); @@ -228,20 +200,14 @@ impl Parity for ParityClient where } fn net_port(&self) -> Result { - self.active()?; - Ok(self.settings.network_port) } fn node_name(&self) -> Result { - self.active()?; - Ok(self.settings.name.clone()) } fn registry_address(&self) -> Result, Error> { - self.active()?; - Ok( take_weak!(self.client) .additional_params() @@ -252,7 +218,6 @@ impl Parity for ParityClient where } fn rpc_settings(&self) -> Result { - self.active()?; Ok(RpcSettings { enabled: self.settings.rpc_enabled, interface: self.settings.rpc_interface.clone(), @@ -261,19 +226,14 @@ impl Parity for ParityClient where } fn default_extra_data(&self) -> Result { - self.active()?; - Ok(Bytes::new(version_data())) } fn gas_price_histogram(&self) -> Result { - self.active()?; take_weak!(self.client).gas_price_histogram(100, 10).ok_or_else(errors::not_enough_data).map(Into::into) } fn unsigned_transactions_count(&self) -> Result { - self.active()?; - match self.signer { None => Err(errors::signer_disabled()), Some(ref signer) => Ok(signer.len()), @@ -281,56 +241,40 @@ impl Parity for ParityClient where } fn generate_secret_phrase(&self) -> Result { - self.active()?; - Ok(random_phrase(12)) } fn phrase_to_address(&self, phrase: String) -> Result { - self.active()?; - Ok(Brain::new(phrase).generate().unwrap().address().into()) } fn list_accounts(&self, count: u64, after: Option, block_number: Trailing) -> Result>, Error> { - self.active()?; - Ok(take_weak!(self.client) .list_accounts(block_number.0.into(), after.map(Into::into).as_ref(), count) .map(|a| a.into_iter().map(Into::into).collect())) } fn list_storage_keys(&self, address: H160, count: u64, after: Option, block_number: Trailing) -> Result>, Error> { - self.active()?; - Ok(take_weak!(self.client) .list_storage(block_number.0.into(), &address.into(), after.map(Into::into).as_ref(), count) .map(|a| a.into_iter().map(Into::into).collect())) } fn encrypt_message(&self, key: H512, phrase: Bytes) -> Result { - self.active()?; - ecies::encrypt(&key.into(), &DEFAULT_MAC, &phrase.0) .map_err(errors::encryption_error) .map(Into::into) } fn pending_transactions(&self) -> Result, Error> { - self.active()?; - Ok(take_weak!(self.miner).pending_transactions().into_iter().map(Into::into).collect::>()) } fn future_transactions(&self) -> Result, Error> { - self.active()?; - Ok(take_weak!(self.miner).future_transactions().into_iter().map(Into::into).collect::>()) } fn pending_transactions_stats(&self) -> Result, Error> { - self.active()?; - let stats = take_weak!(self.sync).transactions_stats(); Ok(stats.into_iter() .map(|(hash, stats)| (hash.into(), stats.into())) @@ -339,8 +283,6 @@ impl Parity for ParityClient where } fn local_transactions(&self) -> Result, Error> { - self.active()?; - let transactions = take_weak!(self.miner).local_transactions(); Ok(transactions .into_iter() @@ -350,8 +292,6 @@ impl Parity for ParityClient where } fn signer_port(&self) -> Result { - self.active()?; - self.signer .clone() .and_then(|signer| signer.address()) @@ -360,21 +300,16 @@ impl Parity for ParityClient where } fn dapps_port(&self) -> Result { - self.active()?; - self.dapps_port .ok_or_else(|| errors::dapps_disabled()) } fn dapps_interface(&self) -> Result { - self.active()?; - self.dapps_interface.clone() .ok_or_else(|| errors::dapps_disabled()) } fn next_nonce(&self, address: H160) -> Result { - self.active()?; let address: Address = address.into(); let miner = take_weak!(self.miner); let client = take_weak!(self.client); @@ -400,26 +335,21 @@ impl Parity for ParityClient where } fn consensus_capability(&self) -> Result { - self.active()?; let updater = take_weak!(self.updater); Ok(updater.capability().into()) } fn version_info(&self) -> Result { - self.active()?; let updater = take_weak!(self.updater); Ok(updater.version_info().into()) } fn releases_info(&self) -> Result, Error> { - self.active()?; let updater = take_weak!(self.updater); Ok(updater.info().map(Into::into)) } fn chain_status(&self) -> Result { - self.active()?; - let chain_info = take_weak!(self.client).chain_info(); let gap = chain_info.ancient_block_number.map(|x| U256::from(x + 1)) diff --git a/rpc/src/v1/impls/parity_accounts.rs b/rpc/src/v1/impls/parity_accounts.rs index 05ac7e5e2..1034b1df5 100644 --- a/rpc/src/v1/impls/parity_accounts.rs +++ b/rpc/src/v1/impls/parity_accounts.rs @@ -21,7 +21,6 @@ use util::{Address}; use ethkey::{Brain, Generator, Secret}; use ethcore::account_provider::AccountProvider; -use ethcore::client::MiningBlockChainClient; use jsonrpc_core::Error; use v1::helpers::errors; @@ -29,30 +28,21 @@ use v1::traits::ParityAccounts; use v1::types::{H160 as RpcH160, H256 as RpcH256, DappId}; /// Account management (personal) rpc implementation. -pub struct ParityAccountsClient where C: MiningBlockChainClient { +pub struct ParityAccountsClient { accounts: Weak, - client: Weak, } -impl ParityAccountsClient where C: MiningBlockChainClient { +impl ParityAccountsClient { /// Creates new PersonalClient - pub fn new(store: &Arc, client: &Arc) -> Self { + pub fn new(store: &Arc) -> Self { ParityAccountsClient { accounts: Arc::downgrade(store), - client: Arc::downgrade(client), } } - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } } -impl ParityAccounts for ParityAccountsClient where C: MiningBlockChainClient { +impl ParityAccounts for ParityAccountsClient { fn all_accounts_info(&self) -> Result>, Error> { - self.active()?; let store = take_weak!(self.accounts); let info = store.accounts_info().map_err(|e| errors::account("Could not fetch account info.", e))?; let other = store.addresses_info(); @@ -75,7 +65,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn new_account_from_phrase(&self, phrase: String, pass: String) -> Result { - self.active()?; let store = take_weak!(self.accounts); let brain = Brain::new(phrase).generate().unwrap(); @@ -85,7 +74,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn new_account_from_wallet(&self, json: String, pass: String) -> Result { - self.active()?; let store = take_weak!(self.accounts); store.import_presale(json.as_bytes(), &pass) @@ -95,7 +83,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn new_account_from_secret(&self, secret: RpcH256, pass: String) -> Result { - self.active()?; let store = take_weak!(self.accounts); let secret = Secret::from_slice(&secret.0) @@ -106,7 +93,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn test_password(&self, account: RpcH160, password: String) -> Result { - self.active()?; let account: Address = account.into(); take_weak!(self.accounts) @@ -115,7 +101,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn change_password(&self, account: RpcH160, password: String, new_password: String) -> Result { - self.active()?; let account: Address = account.into(); take_weak!(self.accounts) .change_password(&account, password, new_password) @@ -124,7 +109,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn kill_account(&self, account: RpcH160, password: String) -> Result { - self.active()?; let account: Address = account.into(); take_weak!(self.accounts) .kill_account(&account, &password) @@ -133,7 +117,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn remove_address(&self, addr: RpcH160) -> Result { - self.active()?; let store = take_weak!(self.accounts); let addr: Address = addr.into(); @@ -142,7 +125,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn set_account_name(&self, addr: RpcH160, name: String) -> Result { - self.active()?; let store = take_weak!(self.accounts); let addr: Address = addr.into(); @@ -152,7 +134,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn set_account_meta(&self, addr: RpcH160, meta: String) -> Result { - self.active()?; let store = take_weak!(self.accounts); let addr: Address = addr.into(); @@ -216,7 +197,6 @@ impl ParityAccounts for ParityAccountsClient where C: MiningBlock } fn geth_accounts(&self) -> Result, Error> { - self.active()?; let store = take_weak!(self.accounts); Ok(into_vec(store.list_geth_accounts(false))) diff --git a/rpc/src/v1/impls/parity_set.rs b/rpc/src/v1/impls/parity_set.rs index b5d864a72..a6f1129ba 100644 --- a/rpc/src/v1/impls/parity_set.rs +++ b/rpc/src/v1/impls/parity_set.rs @@ -23,7 +23,7 @@ use ethcore::client::MiningBlockChainClient; use ethcore::mode::Mode; use ethsync::ManageNetwork; use fetch::{self, Fetch}; -use futures::{self, BoxFuture, Future}; +use futures::{BoxFuture, Future}; use util::sha3; use updater::{Service as UpdateService}; @@ -62,12 +62,6 @@ impl ParitySetClient where fetch: fetch, } } - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } } impl ParitySet for ParitySetClient where @@ -78,63 +72,46 @@ impl ParitySet for ParitySetClient where { fn set_min_gas_price(&self, gas_price: U256) -> Result { - self.active()?; - take_weak!(self.miner).set_minimal_gas_price(gas_price.into()); Ok(true) } fn set_gas_floor_target(&self, target: U256) -> Result { - self.active()?; - take_weak!(self.miner).set_gas_floor_target(target.into()); Ok(true) } fn set_gas_ceil_target(&self, target: U256) -> Result { - self.active()?; - take_weak!(self.miner).set_gas_ceil_target(target.into()); Ok(true) } fn set_extra_data(&self, extra_data: Bytes) -> Result { - self.active()?; - take_weak!(self.miner).set_extra_data(extra_data.into_vec()); Ok(true) } fn set_author(&self, author: H160) -> Result { - self.active()?; - take_weak!(self.miner).set_author(author.into()); Ok(true) } fn set_engine_signer(&self, address: H160, password: String) -> Result { - self.active()?; take_weak!(self.miner).set_engine_signer(address.into(), password).map_err(Into::into).map_err(errors::from_password_error)?; Ok(true) } fn set_transactions_limit(&self, limit: usize) -> Result { - self.active()?; - take_weak!(self.miner).set_transactions_limit(limit); Ok(true) } fn set_tx_gas_limit(&self, limit: U256) -> Result { - self.active()?; - take_weak!(self.miner).set_tx_gas_limit(limit.into()); Ok(true) } fn add_reserved_peer(&self, peer: String) -> Result { - self.active()?; - match take_weak!(self.net).add_reserved_peer(peer) { Ok(()) => Ok(true), Err(e) => Err(errors::invalid_params("Peer address", e)), @@ -142,8 +119,6 @@ impl ParitySet for ParitySetClient where } fn remove_reserved_peer(&self, peer: String) -> Result { - self.active()?; - match take_weak!(self.net).remove_reserved_peer(peer) { Ok(()) => Ok(true), Err(e) => Err(errors::invalid_params("Peer address", e)), @@ -151,15 +126,11 @@ impl ParitySet for ParitySetClient where } fn drop_non_reserved_peers(&self) -> Result { - self.active()?; - take_weak!(self.net).deny_unreserved_peers(); Ok(true) } fn accept_non_reserved_peers(&self) -> Result { - self.active()?; - take_weak!(self.net).accept_unreserved_peers(); Ok(true) } @@ -186,10 +157,6 @@ impl ParitySet for ParitySetClient where } fn hash_content(&self, url: String) -> BoxFuture { - if let Err(e) = self.active() { - return futures::failed(e).boxed(); - } - self.fetch.process(self.fetch.fetch(&url).then(move |result| { result .map_err(errors::from_fetch_error) @@ -201,13 +168,11 @@ impl ParitySet for ParitySetClient where } fn upgrade_ready(&self) -> Result, Error> { - self.active()?; let updater = take_weak!(self.updater); Ok(updater.upgrade_ready().map(Into::into)) } fn execute_upgrade(&self) -> Result { - self.active()?; let updater = take_weak!(self.updater); Ok(updater.execute_upgrade()) } diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index b29cc41ee..44e854412 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -54,12 +54,6 @@ impl PersonalClient where allow_perm_unlock: allow_perm_unlock, } } - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } } impl Personal for PersonalClient where @@ -69,15 +63,12 @@ impl Personal for PersonalClient where type Metadata = Metadata; fn accounts(&self) -> Result, Error> { - self.active()?; - let store = take_weak!(self.accounts); let accounts = store.accounts().map_err(|e| errors::account("Could not fetch accounts.", e))?; Ok(accounts.into_iter().map(Into::into).collect::>()) } fn new_account(&self, pass: String) -> Result { - self.active()?; let store = take_weak!(self.accounts); store.new_account(&pass) @@ -86,7 +77,6 @@ impl Personal for PersonalClient where } fn unlock_account(&self, account: RpcH160, account_pass: String, duration: Option) -> Result { - self.active()?; let account: Address = account.into(); let store = take_weak!(self.accounts); let duration = match duration { @@ -117,7 +107,6 @@ impl Personal for PersonalClient where fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture { let sign_and_send = move || { - self.active()?; let client = take_weak!(self.client); let miner = take_weak!(self.miner); let accounts = take_weak!(self.accounts); diff --git a/rpc/src/v1/impls/signer.rs b/rpc/src/v1/impls/signer.rs index a25a5bbc1..90a811e37 100644 --- a/rpc/src/v1/impls/signer.rs +++ b/rpc/src/v1/impls/signer.rs @@ -55,17 +55,9 @@ impl SignerClient where C: MiningBlockChainClient, } } - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } - fn confirm_internal(&self, id: U256, modification: TransactionModification, f: F) -> Result, Error> where F: FnOnce(&C, &M, &AccountProvider, ConfirmationPayload) -> Result, Error>, { - self.active()?; - let id = id.into(); let accounts = take_weak!(self.accounts); let signer = take_weak!(self.signer); @@ -104,7 +96,6 @@ impl SignerClient where C: MiningBlockChainClient, impl Signer for SignerClient where C: MiningBlockChainClient, M: MinerService { fn requests_to_confirm(&self) -> Result, Error> { - self.active()?; let signer = take_weak!(self.signer); Ok(signer.requests() @@ -135,8 +126,6 @@ impl Signer for SignerClient where C: MiningBlockC } fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result { - self.active()?; - let id = id.into(); let signer = take_weak!(self.signer); let client = take_weak!(self.client); @@ -187,7 +176,6 @@ impl Signer for SignerClient where C: MiningBlockC } fn reject_request(&self, id: U256) -> Result { - self.active()?; let signer = take_weak!(self.signer); let res = signer.request_rejected(id.into()); @@ -195,7 +183,6 @@ impl Signer for SignerClient where C: MiningBlockC } fn generate_token(&self) -> Result { - self.active()?; let signer = take_weak!(self.signer); signer.generate_token() @@ -203,7 +190,6 @@ impl Signer for SignerClient where C: MiningBlockC } fn generate_web_proxy_token(&self) -> Result { - try!(self.active()); let signer = take_weak!(self.signer); Ok(signer.generate_web_proxy_access_token()) diff --git a/rpc/src/v1/impls/signing.rs b/rpc/src/v1/impls/signing.rs index 2d5fdbba7..0db90436c 100644 --- a/rpc/src/v1/impls/signing.rs +++ b/rpc/src/v1/impls/signing.rs @@ -74,12 +74,6 @@ impl SigningQueueClient where } } - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } - fn handle_dispatch(&self, res: Result, on_response: OnResponse) where OnResponse: FnOnce(Result) + Send + 'static { @@ -131,7 +125,6 @@ impl ParitySigning for SigningQueueClient where type Metadata = Metadata; fn post_sign(&self, address: RpcH160, data: RpcBytes) -> Result, Error> { - self.active()?; self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), DefaultAccount::Provided(address.into())) .map(|result| match result { DispatchResult::Value(v) => RpcEither::Or(v), @@ -145,7 +138,6 @@ impl ParitySigning for SigningQueueClient where fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture, Error> { let post_transaction = move || { - self.active()?; self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()) .map(|result| match result { DispatchResult::Value(v) => RpcEither::Or(v), @@ -160,7 +152,6 @@ impl ParitySigning for SigningQueueClient where } fn check_request(&self, id: RpcU256) -> Result, Error> { - self.active()?; let mut pending = self.pending.lock(); let id: U256 = id.into(); let res = match pending.get(&id) { @@ -176,8 +167,7 @@ impl ParitySigning for SigningQueueClient where } fn decrypt_message(&self, address: RpcH160, data: RpcBytes) -> BoxFuture { - let res = self.active() - .and_then(|_| self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into())); + let res = self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()); let (ready, p) = futures::oneshot(); // TODO [todr] typed handle_dispatch @@ -200,8 +190,7 @@ impl EthSigning for SigningQueueClient where type Metadata = Metadata; fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture { - let res = self.active() - .and_then(|_| self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into())); + let res = self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()); let (ready, p) = futures::oneshot(); self.handle_dispatch(res, |response| { @@ -216,8 +205,7 @@ impl EthSigning for SigningQueueClient where } fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture { - let res = self.active() - .and_then(|_| self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into())); + let res = self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()); let (ready, p) = futures::oneshot(); self.handle_dispatch(res, |response| { @@ -232,7 +220,7 @@ impl EthSigning for SigningQueueClient where } fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture { - let res = self.active().and_then(|_| self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into())); + let res = self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into()); let (ready, p) = futures::oneshot(); self.handle_dispatch(res, |response| { diff --git a/rpc/src/v1/impls/signing_unsafe.rs b/rpc/src/v1/impls/signing_unsafe.rs index a76ea00a5..b43362c76 100644 --- a/rpc/src/v1/impls/signing_unsafe.rs +++ b/rpc/src/v1/impls/signing_unsafe.rs @@ -62,14 +62,7 @@ impl SigningUnsafeClient where } } - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } - fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> Result { - self.active()?; let client = take_weak!(self.client); let miner = take_weak!(self.miner); let accounts = take_weak!(self.accounts); diff --git a/rpc/src/v1/impls/traces.rs b/rpc/src/v1/impls/traces.rs index fe3d5358e..9fe92518b 100644 --- a/rpc/src/v1/impls/traces.rs +++ b/rpc/src/v1/impls/traces.rs @@ -66,17 +66,10 @@ impl TracesClient where C: BlockChainClient, M: MinerService { data: request.data.map_or_else(Vec::new, |d| d.to_vec()) }.fake_sign(from)) } - - fn active(&self) -> Result<(), Error> { - // TODO: only call every 30s at most. - take_weak!(self.client).keep_alive(); - Ok(()) - } } impl Traces for TracesClient where C: BlockChainClient + 'static, M: MinerService + 'static { fn filter(&self, filter: TraceFilter) -> Result, Error> { - self.active()?; let client = take_weak!(self.client); let traces = client.filter_traces(filter.into()); let traces = traces.map_or_else(Vec::new, |traces| traces.into_iter().map(LocalizedTrace::from).collect()); @@ -84,7 +77,6 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn block_traces(&self, block_number: BlockNumber) -> Result, Error> { - self.active()?; let client = take_weak!(self.client); let traces = client.block_traces(block_number.into()); let traces = traces.map_or_else(Vec::new, |traces| traces.into_iter().map(LocalizedTrace::from).collect()); @@ -92,7 +84,6 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn transaction_traces(&self, transaction_hash: H256) -> Result, Error> { - self.active()?; let client = take_weak!(self.client); let traces = client.transaction_traces(TransactionId::Hash(transaction_hash.into())); let traces = traces.map_or_else(Vec::new, |traces| traces.into_iter().map(LocalizedTrace::from).collect()); @@ -100,7 +91,6 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn trace(&self, transaction_hash: H256, address: Vec) -> Result, Error> { - self.active()?; let client = take_weak!(self.client); let id = TraceId { transaction: TransactionId::Hash(transaction_hash.into()), @@ -113,7 +103,6 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn call(&self, request: CallRequest, flags: Vec, block: Trailing) -> Result, Error> { - self.active()?; let block = block.0; let request = CallRequest::into(request); @@ -125,7 +114,6 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn raw_transaction(&self, raw_transaction: Bytes, flags: Vec, block: Trailing) -> Result, Error> { - self.active()?; let block = block.0; UntrustedRlp::new(&raw_transaction.into_vec()).as_val() @@ -140,8 +128,6 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn replay_transaction(&self, transaction_hash: H256, flags: Vec) -> Result, Error> { - self.active()?; - Ok(match take_weak!(self.client).replay(TransactionId::Hash(transaction_hash.into()), to_call_analytics(flags)) { Ok(e) => Some(TraceResults::from(e)), _ => None, diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index f38e13538..0b0787717 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -29,5 +29,5 @@ pub mod types; pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, Signer, Personal, Traces, Rpc}; pub use self::impls::*; -pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import}; +pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant}; pub use self::metadata::{Metadata, Origin}; diff --git a/rpc/src/v1/tests/mocked/parity_accounts.rs b/rpc/src/v1/tests/mocked/parity_accounts.rs index be8cb3f91..e245cb92f 100644 --- a/rpc/src/v1/tests/mocked/parity_accounts.rs +++ b/rpc/src/v1/tests/mocked/parity_accounts.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use ethcore::account_provider::AccountProvider; -use ethcore::client::TestBlockChainClient; use jsonrpc_core::IoHandler; use v1::{ParityAccounts, ParityAccountsClient}; @@ -25,14 +24,6 @@ use v1::{ParityAccounts, ParityAccountsClient}; struct ParityAccountsTester { accounts: Arc, io: IoHandler, - // these unused fields are necessary to keep the data alive - // as the handler has only weak pointers. - _client: Arc, -} - -fn blockchain_client() -> Arc { - let client = TestBlockChainClient::new(); - Arc::new(client) } fn accounts_provider() -> Arc { @@ -41,8 +32,7 @@ fn accounts_provider() -> Arc { fn setup() -> ParityAccountsTester { let accounts = accounts_provider(); - let client = blockchain_client(); - let parity_accounts = ParityAccountsClient::new(&accounts, &client); + let parity_accounts = ParityAccountsClient::new(&accounts); let mut io = IoHandler::default(); io.extend_with(parity_accounts.to_delegate()); @@ -50,7 +40,6 @@ fn setup() -> ParityAccountsTester { let tester = ParityAccountsTester { accounts: accounts, io: io, - _client: client, }; tester diff --git a/signer/src/ws_server/mod.rs b/signer/src/ws_server/mod.rs index 6983ef2fe..7fc046f49 100644 --- a/signer/src/ws_server/mod.rs +++ b/signer/src/ws_server/mod.rs @@ -17,17 +17,19 @@ //! `WebSockets` server. use ws; -use std; -use std::thread; -use std::path::PathBuf; use std::default::Default; -use std::ops::Drop; -use std::sync::Arc; use std::net::SocketAddr; +use std::ops::Drop; +use std::path::PathBuf; +use std::sync::Arc; +use std::thread; +use std; + use io::{PanicHandler, OnPanicListener, MayPanic}; -use jsonrpc_core::Metadata; +use jsonrpc_core::{Metadata, Middleware}; use jsonrpc_core::reactor::RpcHandler; -use rpc::ConfirmationsQueue; +use rpc::{ConfirmationsQueue}; +use rpc::informant::RpcStats; mod session; @@ -54,6 +56,7 @@ pub struct ServerBuilder { queue: Arc, authcodes_path: PathBuf, skip_origin_validation: bool, + stats: Option>, } impl ServerBuilder { @@ -63,6 +66,7 @@ impl ServerBuilder { queue: queue, authcodes_path: authcodes_path, skip_origin_validation: false, + stats: None, } } @@ -73,10 +77,23 @@ impl ServerBuilder { self } + /// Configure statistic collection + pub fn stats(mut self, stats: Arc) -> Self { + self.stats = Some(stats); + self + } + /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. - pub fn start(self, addr: SocketAddr, handler: RpcHandler) -> Result { - Server::start(addr, handler, self.queue, self.authcodes_path, self.skip_origin_validation) + pub fn start>(self, addr: SocketAddr, handler: RpcHandler) -> Result { + Server::start( + addr, + handler, + self.queue, + self.authcodes_path, + self.skip_origin_validation, + self.stats, + ) } } @@ -97,7 +114,14 @@ impl Server { /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. - fn start(addr: SocketAddr, handler: RpcHandler, queue: Arc, authcodes_path: PathBuf, skip_origin_validation: bool) -> Result { + fn start>( + addr: SocketAddr, + handler: RpcHandler, + queue: Arc, + authcodes_path: PathBuf, + skip_origin_validation: bool, + stats: Option>, + ) -> Result { let config = { let mut config = ws::Settings::default(); // accept only handshakes beginning with GET @@ -111,7 +135,7 @@ impl Server { let origin = format!("{}", addr); let port = addr.port(); let ws = ws::Builder::new().with_settings(config).build( - session::Factory::new(handler, origin, port, authcodes_path, skip_origin_validation) + session::Factory::new(handler, origin, port, authcodes_path, skip_origin_validation, stats) )?; let panic_handler = PanicHandler::new_in_arc(); diff --git a/signer/src/ws_server/session.rs b/signer/src/ws_server/session.rs index 9fa80de93..f19e86215 100644 --- a/signer/src/ws_server/session.rs +++ b/signer/src/ws_server/session.rs @@ -21,8 +21,9 @@ use authcode_store::AuthCodes; use std::path::{PathBuf, Path}; use std::sync::Arc; use std::str::FromStr; -use jsonrpc_core::{Metadata}; +use jsonrpc_core::{Metadata, Middleware}; use jsonrpc_core::reactor::RpcHandler; +use rpc::informant::RpcStats; use util::{H256, version}; #[cfg(feature = "parity-ui")] @@ -124,17 +125,24 @@ fn add_headers(mut response: ws::Response, mime: &str) -> ws::Response { response } -pub struct Session { +pub struct Session> { out: ws::Sender, skip_origin_validation: bool, self_origin: String, self_port: u16, authcodes_path: PathBuf, - handler: RpcHandler, + handler: RpcHandler, file_handler: Arc, + stats: Option>, } -impl ws::Handler for Session { +impl> Drop for Session { + fn drop(&mut self) { + self.stats.as_ref().map(|stats| stats.close_session()); + } +} + +impl> ws::Handler for Session { #[cfg_attr(feature="dev", allow(collapsible_if))] fn on_request(&mut self, req: &ws::Request) -> ws::Result<(ws::Response)> { trace!(target: "signer", "Handling request: {:?}", req); @@ -221,17 +229,25 @@ impl ws::Handler for Session { } } -pub struct Factory { - handler: RpcHandler, +pub struct Factory> { + handler: RpcHandler, skip_origin_validation: bool, self_origin: String, self_port: u16, authcodes_path: PathBuf, file_handler: Arc, + stats: Option>, } -impl Factory { - pub fn new(handler: RpcHandler, self_origin: String, self_port: u16, authcodes_path: PathBuf, skip_origin_validation: bool) -> Self { +impl> Factory { + pub fn new( + handler: RpcHandler, + self_origin: String, + self_port: u16, + authcodes_path: PathBuf, + skip_origin_validation: bool, + stats: Option>, + ) -> Self { Factory { handler: handler, skip_origin_validation: skip_origin_validation, @@ -239,14 +255,17 @@ impl Factory { self_port: self_port, authcodes_path: authcodes_path, file_handler: Arc::new(ui::Handler::default()), + stats: stats, } } } -impl ws::Factory for Factory { - type Handler = Session; +impl> ws::Factory for Factory { + type Handler = Session; fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler { + self.stats.as_ref().map(|stats| stats.open_session()); + Session { out: sender, handler: self.handler.clone(), @@ -255,6 +274,7 @@ impl ws::Factory for Factory { self_port: self.self_port, authcodes_path: self.authcodes_path.clone(), file_handler: self.file_handler.clone(), + stats: self.stats.clone(), } } }