RPC cpu pool (#6023)

* RPC cpu pool.

* introduce optional thread pool when processing RPC requests.

* Bump jsonrpc.

* Removing boxes.

* Fix CLI tests.
This commit is contained in:
Tomasz Drwięga
2017-07-11 12:22:19 +02:00
committed by Gav Wood
parent dc51dde112
commit 7fb46bff06
10 changed files with 103 additions and 43 deletions

View File

@@ -10,6 +10,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
cid = "0.2"
futures = "0.1"
futures-cpupool = "0.1"
log = "0.3"
multihash ="0.6"
order-stat = "0.1"

View File

@@ -23,6 +23,7 @@
extern crate cid;
extern crate crypto as rust_crypto;
extern crate futures;
extern crate futures_cpupool;
extern crate multihash;
extern crate order_stat;
extern crate rand;

View File

@@ -217,18 +217,26 @@ impl<M: core::Middleware<Metadata>> WsDispatcher<M> {
}
impl<M: core::Middleware<Metadata>> core::Middleware<Metadata> for WsDispatcher<M> {
fn on_request<F>(&self, request: core::Request, meta: Metadata, process: F) -> core::FutureResponse where
F: FnOnce(core::Request, Metadata) -> core::FutureResponse,
type Future = core::futures::future::Either<
M::Future,
core::FutureResponse,
>;
fn on_request<F, X>(&self, request: core::Request, meta: Metadata, process: F) -> Self::Future where
F: FnOnce(core::Request, Metadata) -> X,
X: core::futures::Future<Item=Option<core::Response>, Error=()> + Send + 'static,
{
use self::core::futures::future::Either::{A, B};
let use_full = match &meta.origin {
&Origin::Signer { .. } => true,
_ => false,
};
if use_full {
self.full_handler.handle_rpc_request(request, meta)
A(self.full_handler.handle_rpc_request(request, meta))
} else {
process(request, meta)
B(process(request, meta).boxed())
}
}
}

View File

@@ -21,10 +21,13 @@ use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::time;
use futures::Future;
use futures_cpupool as pool;
use jsonrpc_core as rpc;
use order_stat;
use util::RwLock;
pub use self::pool::CpuPool;
const RATE_SECONDS: usize = 10;
const STATS_SAMPLES: usize = 60;
@@ -184,14 +187,16 @@ pub trait ActivityNotifier: Send + Sync + 'static {
pub struct Middleware<T: ActivityNotifier = ClientNotifier> {
stats: Arc<RpcStats>,
notifier: T,
pool: Option<CpuPool>,
}
impl<T: ActivityNotifier> Middleware<T> {
/// Create new Middleware with stats counter and activity notifier.
pub fn new(stats: Arc<RpcStats>, notifier: T) -> Self {
pub fn new(stats: Arc<RpcStats>, notifier: T, pool: Option<CpuPool>) -> Self {
Middleware {
stats: stats,
notifier: notifier,
stats,
notifier,
pool,
}
}
@@ -201,19 +206,32 @@ impl<T: ActivityNotifier> Middleware<T> {
}
impl<M: rpc::Metadata, T: ActivityNotifier> rpc::Middleware<M> for Middleware<T> {
fn on_request<F>(&self, request: rpc::Request, meta: M, process: F) -> rpc::FutureResponse where
F: FnOnce(rpc::Request, M) -> rpc::FutureResponse,
type Future = rpc::futures::future::Either<
pool::CpuFuture<Option<rpc::Response>, ()>,
rpc::FutureResponse,
>;
fn on_request<F, X>(&self, request: rpc::Request, meta: M, process: F) -> Self::Future where
F: FnOnce(rpc::Request, M) -> X,
X: rpc::futures::Future<Item=Option<rpc::Response>, Error=()> + Send + 'static,
{
use self::rpc::futures::future::Either::{A, B};
let start = time::Instant::now();
let response = process(request, meta);
self.notifier.active();
self.stats.count_request();
let stats = self.stats.clone();
stats.count_request();
response.map(move |res| {
let future = process(request, meta).map(move |res| {
stats.add_roundtrip(Self::as_micro(start.elapsed()));
res
}).boxed()
});
match self.pool {
Some(ref pool) => A(pool.spawn(future)),
None => B(future.boxed()),
}
}
}