diff --git a/Cargo.lock b/Cargo.lock index 6cfc9fc2c..b4cdb7f72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1061,7 +1061,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1073,7 +1073,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1086,7 +1086,7 @@ dependencies = [ [[package]] name = "jsonrpc-ipc-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1099,7 +1099,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1109,7 +1109,7 @@ dependencies = [ [[package]] name = "jsonrpc-minihttp-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1124,7 +1124,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1134,8 +1134,9 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ + "bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1146,7 +1147,7 @@ dependencies = [ [[package]] name = "jsonrpc-tcp-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1160,11 +1161,13 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#d12476f42ee672fa9d023f66fcfa5981d9aaba3a" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#5e79be8a098cdda221713992f4a46b41a1d4d8f0" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "ws 0.7.1 (git+https://github.com/tomusdrw/ws-rs)", ] @@ -1775,6 +1778,7 @@ dependencies = [ "ethsync 1.7.0", "fetch 0.1.0", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-http-server 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-ipc-server 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 009a6dc38..c82d861c7 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -180,8 +180,10 @@ usage! { or |c: &Config| otry!(c.rpc).apis.as_ref().map(|vec| vec.join(",")), flag_jsonrpc_hosts: String = "none", or |c: &Config| otry!(c.rpc).hosts.as_ref().map(|vec| vec.join(",")), - flag_jsonrpc_threads: Option = None, - or |c: &Config| otry!(c.rpc).threads.map(Some), + flag_jsonrpc_server_threads: Option = None, + or |c: &Config| otry!(c.rpc).server_threads.map(Some), + flag_jsonrpc_threads: usize = 0usize, + or |c: &Config| otry!(c.rpc).processing_threads, // WS flag_no_ws: bool = false, @@ -468,7 +470,8 @@ struct Rpc { cors: Option, apis: Option>, hosts: Option>, - threads: Option, + server_threads: Option, + processing_threads: Option, } #[derive(Default, Debug, PartialEq, Deserialize)] @@ -749,7 +752,8 @@ mod tests { flag_jsonrpc_cors: Some("null".into()), flag_jsonrpc_apis: "web3,eth,net,parity,traces,rpc,secretstore".into(), flag_jsonrpc_hosts: "none".into(), - flag_jsonrpc_threads: None, + flag_jsonrpc_server_threads: None, + flag_jsonrpc_threads: 0, // WS flag_no_ws: false, @@ -977,7 +981,8 @@ mod tests { cors: None, apis: None, hosts: None, - threads: None, + server_threads: None, + processing_threads: None, }), ipc: Some(Ipc { disable: None, diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index 5020f00ba..ae184d446 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -176,9 +176,12 @@ API and Console Options: is additional security against some attack vectors. Special options: "all", "none", (default: {flag_jsonrpc_hosts}). - --jsonrpc-threads THREADS Enables experimental faster implementation of JSON-RPC server. + --jsonrpc-server-threads NUM Enables experimental faster implementation of JSON-RPC server. Requires Dapps server to be disabled - using --no-dapps. (default: {flag_jsonrpc_threads:?}) + using --no-dapps. (default: {flag_jsonrpc_server_threads:?}) + --jsonrpc-threads THREADS Turn on additional processing threads in all RPC servers. + Setting this to non-zero value allows parallel cpu-heavy queries + execution. (default: {flag_jsonrpc_threads}) --no-ws Disable the WebSockets server. (default: {flag_no_ws}) --ws-port PORT Specify the port portion of the WebSockets server diff --git a/parity/configuration.rs b/parity/configuration.rs index 6d1879779..e55ae7409 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -137,7 +137,7 @@ impl Configuration { let secretstore_conf = self.secretstore_config()?; let format = self.format()?; - if self.args.flag_jsonrpc_threads.is_some() && dapps_conf.enabled { + if self.args.flag_jsonrpc_server_threads.is_some() && dapps_conf.enabled { dapps_conf.enabled = false; writeln!(&mut stderr(), "Warning: Disabling Dapps server because fast RPC server was enabled.").expect("Error writing to stderr.") } @@ -825,11 +825,12 @@ impl Configuration { }, hosts: self.rpc_hosts(), cors: self.rpc_cors(), - threads: match self.args.flag_jsonrpc_threads { + server_threads: match self.args.flag_jsonrpc_server_threads { Some(threads) if threads > 0 => Some(threads), None => None, - _ => return Err("--jsonrpc-threads number needs to be positive.".into()), - } + _ => return Err("--jsonrpc-server-threads number needs to be positive.".into()), + }, + processing_threads: self.args.flag_jsonrpc_threads, }; Ok(conf) diff --git a/parity/rpc.rs b/parity/rpc.rs index 3d5717236..8b15db09e 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -30,6 +30,7 @@ use rpc_apis::{self, ApiSet}; pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware}; pub use parity_rpc::ws::Server as WsServer; +pub use parity_rpc::informant::CpuPool; pub const DAPPS_DOMAIN: &'static str = "web3.site"; @@ -42,7 +43,8 @@ pub struct HttpConfiguration { pub apis: ApiSet, pub cors: Option>, pub hosts: Option>, - pub threads: Option, + pub server_threads: Option, + pub processing_threads: usize, } impl HttpConfiguration { @@ -63,7 +65,8 @@ impl Default for HttpConfiguration { apis: ApiSet::UnsafeContext, cors: None, hosts: Some(Vec::new()), - threads: None, + server_threads: None, + processing_threads: 0, } } } @@ -94,7 +97,8 @@ impl From for HttpConfiguration { apis: rpc_apis::ApiSet::SafeContext, cors: None, hosts: conf.hosts, - threads: None, + server_threads: None, + processing_threads: 0, } } } @@ -176,6 +180,7 @@ pub struct Dependencies { pub apis: Arc, pub remote: TokioRemote, pub stats: Arc, + pub pool: Option, } pub fn new_ws( @@ -192,11 +197,12 @@ pub fn new_ws( let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?; - let full_handler = setup_apis(rpc_apis::ApiSet::SafeContext, deps); + let pool = deps.pool.clone(); + let full_handler = setup_apis(rpc_apis::ApiSet::SafeContext, deps, pool.clone()); let handler = { let mut handler = MetaIoHandler::with_middleware(( rpc::WsDispatcher::new(full_handler), - Middleware::new(deps.stats.clone(), deps.apis.activity_notifier()) + Middleware::new(deps.stats.clone(), deps.apis.activity_notifier(), pool) )); let apis = conf.apis.list_apis(); deps.apis.extend_with_set(&mut handler, &apis); @@ -252,7 +258,8 @@ pub fn new_http( let http_address = (conf.interface, conf.port); let url = format!("{}:{}", http_address.0, http_address.1); let addr = url.parse().map_err(|_| format!("Invalid {} listen host/port given: {}", id, url))?; - let handler = setup_apis(conf.apis, deps); + let pool = deps.pool.clone(); + let handler = setup_apis(conf.apis, deps, pool); let remote = deps.remote.clone(); let cors_domains = into_domains(conf.cors); @@ -265,7 +272,7 @@ pub fn new_http( handler, remote, rpc::RpcExtractor, - match (conf.threads, middleware) { + match (conf.server_threads, middleware) { (Some(threads), None) => rpc::HttpSettings::Threads(threads), (None, middleware) => rpc::HttpSettings::Dapps(middleware), (Some(_), Some(_)) => { @@ -291,7 +298,8 @@ pub fn new_ipc( return Ok(None); } - let handler = setup_apis(conf.apis, dependencies); + let pool = dependencies.pool.clone(); + let handler = setup_apis(conf.apis, dependencies, pool); let remote = dependencies.remote.clone(); match rpc::start_ipc(&conf.socket_addr, handler, remote, rpc::RpcExtractor) { Ok(server) => Ok(Some(server)), @@ -318,11 +326,11 @@ fn with_domain(items: Option>, domain: &str, addresses: &[Option<(St }) } -fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler> +fn setup_apis(apis: ApiSet, deps: &Dependencies, pool: Option) -> MetaIoHandler> where D: rpc_apis::Dependencies { let mut handler = MetaIoHandler::with_middleware( - Middleware::new(deps.stats.clone(), deps.apis.activity_notifier()) + Middleware::new(deps.stats.clone(), deps.apis.activity_notifier(), pool) ); let apis = apis.list_apis(); deps.apis.extend_with_set(&mut handler, &apis); diff --git a/parity/run.rs b/parity/run.rs index 3cdfd19f5..ff424e440 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -316,6 +316,11 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> apis: deps_for_rpc_apis.clone(), remote: event_loop.raw_remote(), stats: rpc_stats.clone(), + pool: if cmd.http_conf.processing_threads > 0 { + Some(rpc::CpuPool::new(cmd.http_conf.processing_threads)) + } else { + None + }, }; // start rpc servers @@ -663,6 +668,12 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R apis: deps_for_rpc_apis.clone(), remote: event_loop.raw_remote(), stats: rpc_stats.clone(), + pool: if cmd.http_conf.processing_threads > 0 { + Some(rpc::CpuPool::new(cmd.http_conf.processing_threads)) + } else { + None + }, + }; // start rpc servers diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 3d0b1e7d3..05cfe1057 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -10,6 +10,7 @@ authors = ["Parity Technologies "] [dependencies] cid = "0.2" futures = "0.1" +futures-cpupool = "0.1" log = "0.3" multihash ="0.6" order-stat = "0.1" diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 38dafc0a4..64e1fc463 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -23,6 +23,7 @@ extern crate cid; extern crate crypto as rust_crypto; extern crate futures; +extern crate futures_cpupool; extern crate multihash; extern crate order_stat; extern crate rand; diff --git a/rpc/src/v1/extractors.rs b/rpc/src/v1/extractors.rs index 67f121526..ab1fad4da 100644 --- a/rpc/src/v1/extractors.rs +++ b/rpc/src/v1/extractors.rs @@ -217,18 +217,26 @@ impl> WsDispatcher { } impl> core::Middleware for WsDispatcher { - fn on_request(&self, request: core::Request, meta: Metadata, process: F) -> core::FutureResponse where - F: FnOnce(core::Request, Metadata) -> core::FutureResponse, + type Future = core::futures::future::Either< + M::Future, + core::FutureResponse, + >; + + fn on_request(&self, request: core::Request, meta: Metadata, process: F) -> Self::Future where + F: FnOnce(core::Request, Metadata) -> X, + X: core::futures::Future, Error=()> + Send + 'static, { + use self::core::futures::future::Either::{A, B}; + let use_full = match &meta.origin { &Origin::Signer { .. } => true, _ => false, }; if use_full { - self.full_handler.handle_rpc_request(request, meta) + A(self.full_handler.handle_rpc_request(request, meta)) } else { - process(request, meta) + B(process(request, meta).boxed()) } } } diff --git a/rpc/src/v1/informant.rs b/rpc/src/v1/informant.rs index c1bbbe6c2..160f0ea9f 100644 --- a/rpc/src/v1/informant.rs +++ b/rpc/src/v1/informant.rs @@ -21,10 +21,13 @@ use std::sync::Arc; use std::sync::atomic::{self, AtomicUsize}; use std::time; use futures::Future; +use futures_cpupool as pool; use jsonrpc_core as rpc; use order_stat; use util::RwLock; +pub use self::pool::CpuPool; + const RATE_SECONDS: usize = 10; const STATS_SAMPLES: usize = 60; @@ -184,14 +187,16 @@ pub trait ActivityNotifier: Send + Sync + 'static { pub struct Middleware { stats: Arc, notifier: T, + pool: Option, } impl Middleware { /// Create new Middleware with stats counter and activity notifier. - pub fn new(stats: Arc, notifier: T) -> Self { + pub fn new(stats: Arc, notifier: T, pool: Option) -> Self { Middleware { - stats: stats, - notifier: notifier, + stats, + notifier, + pool, } } @@ -201,19 +206,32 @@ impl Middleware { } impl rpc::Middleware for Middleware { - fn on_request(&self, request: rpc::Request, meta: M, process: F) -> rpc::FutureResponse where - F: FnOnce(rpc::Request, M) -> rpc::FutureResponse, + type Future = rpc::futures::future::Either< + pool::CpuFuture, ()>, + rpc::FutureResponse, + >; + + fn on_request(&self, request: rpc::Request, meta: M, process: F) -> Self::Future where + F: FnOnce(rpc::Request, M) -> X, + X: rpc::futures::Future, Error=()> + Send + 'static, { + use self::rpc::futures::future::Either::{A, B}; + let start = time::Instant::now(); - let response = process(request, meta); self.notifier.active(); + self.stats.count_request(); + let stats = self.stats.clone(); - stats.count_request(); - response.map(move |res| { + let future = process(request, meta).map(move |res| { stats.add_roundtrip(Self::as_micro(start.elapsed())); res - }).boxed() + }); + + match self.pool { + Some(ref pool) => A(pool.spawn(future)), + None => B(future.boxed()), + } } }