// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! RPC Requests Statistics use std::fmt; use std::sync::Arc; use std::sync::atomic::{self, AtomicUsize}; use std::time; use futures::Future; use futures_cpupool as pool; use jsonrpc_core as rpc; use order_stat; use util::RwLock; pub use self::pool::CpuPool; const RATE_SECONDS: usize = 10; const STATS_SAMPLES: usize = 60; struct RateCalculator { era: time::Instant, samples: [u16; RATE_SECONDS], } impl fmt::Debug for RateCalculator { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "{} req/s", self.rate()) } } impl Default for RateCalculator { fn default() -> Self { RateCalculator { era: time::Instant::now(), samples: [0; RATE_SECONDS], } } } impl RateCalculator { fn elapsed(&self) -> u64 { self.era.elapsed().as_secs() } pub fn tick(&mut self) -> u16 { if self.elapsed() >= RATE_SECONDS as u64 { self.era = time::Instant::now(); self.samples[0] = 0; } let pos = self.elapsed() as usize % RATE_SECONDS; let next = (pos + 1) % RATE_SECONDS; self.samples[next] = 0; self.samples[pos] = self.samples[pos].saturating_add(1); self.samples[pos] } fn current_rate(&self) -> usize { let now = match self.elapsed() { i if i >= RATE_SECONDS as u64 => RATE_SECONDS, i => i as usize + 1, }; let sum: usize = self.samples[0..now].iter().map(|x| *x as usize).sum(); sum / now } pub fn rate(&self) -> usize { if self.elapsed() > RATE_SECONDS as u64 { 0 } else { self.current_rate() } } } struct StatsCalculator { filled: bool, idx: usize, samples: [T; STATS_SAMPLES], } impl Default for StatsCalculator { fn default() -> Self { StatsCalculator { filled: false, idx: 0, samples: [T::default(); STATS_SAMPLES], } } } impl fmt::Debug for StatsCalculator { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "median: {} ms", self.approximated_median()) } } impl StatsCalculator { pub fn add(&mut self, sample: T) { self.idx += 1; if self.idx >= STATS_SAMPLES { self.filled = true; self.idx = 0; } self.samples[self.idx] = sample; } /// Returns aproximate of media pub fn approximated_median(&self) -> T { let mut copy = [T::default(); STATS_SAMPLES]; copy.copy_from_slice(&self.samples); let bound = if self.filled { STATS_SAMPLES } else { self.idx + 1 }; let (_, &mut median) = order_stat::median_of_medians(&mut copy[0..bound]); median } } /// RPC Statistics #[derive(Default, Debug)] pub struct RpcStats { requests: RwLock, roundtrips: RwLock>, active_sessions: AtomicUsize, } impl RpcStats { /// Count session opened pub fn open_session(&self) { self.active_sessions.fetch_add(1, atomic::Ordering::SeqCst); } /// Count session closed. /// Silently overflows if closing unopened session. pub fn close_session(&self) { self.active_sessions.fetch_sub(1, atomic::Ordering::SeqCst); } /// Count request. Returns number of requests in current second. pub fn count_request(&self) -> u16 { self.requests.write().tick() } /// Add roundtrip time (microseconds) pub fn add_roundtrip(&self, microseconds: u32) { self.roundtrips.write().add(microseconds) } /// Returns number of open sessions pub fn sessions(&self) -> usize { self.active_sessions.load(atomic::Ordering::Relaxed) } /// Returns requests rate pub fn requests_rate(&self) -> usize { self.requests.read().rate() } /// Returns approximated roundtrip in microseconds pub fn approximated_roundtrip(&self) -> u32 { self.roundtrips.read().approximated_median() } } /// Notifies about RPC activity. pub trait ActivityNotifier: Send + Sync + 'static { /// Activity on RPC interface fn active(&self); } /// Stats-counting RPC middleware pub struct Middleware { stats: Arc, notifier: T, pool: Option, } impl Middleware { /// Create new Middleware with stats counter and activity notifier. pub fn new(stats: Arc, notifier: T, pool: Option) -> Self { Middleware { stats, notifier, pool, } } fn as_micro(dur: time::Duration) -> u32 { (dur.as_secs() * 1_000_000) as u32 + dur.subsec_nanos() / 1_000 } } impl rpc::Middleware for Middleware { type Future = rpc::futures::future::Either< pool::CpuFuture, ()>, rpc::FutureResponse, >; fn on_request(&self, request: rpc::Request, meta: M, process: F) -> Self::Future where F: FnOnce(rpc::Request, M) -> X, X: rpc::futures::Future, Error=()> + Send + 'static, { use self::rpc::futures::future::Either::{A, B}; let start = time::Instant::now(); self.notifier.active(); self.stats.count_request(); let stats = self.stats.clone(); let future = process(request, meta).map(move |res| { stats.add_roundtrip(Self::as_micro(start.elapsed())); res }); match self.pool { Some(ref pool) => A(pool.spawn(future)), None => B(future.boxed()), } } } /// Client Notifier pub struct ClientNotifier { /// Client pub client: Arc<::ethcore::client::Client>, } impl ActivityNotifier for ClientNotifier { fn active(&self) { self.client.keep_alive() } } #[cfg(test)] mod tests { use super::{RateCalculator, StatsCalculator, RpcStats}; #[test] fn should_calculate_rate() { // given let mut avg = RateCalculator::default(); // when avg.tick(); avg.tick(); avg.tick(); let rate = avg.rate(); // then assert_eq!(rate, 3usize); } #[test] fn should_approximate_median() { // given let mut stats = StatsCalculator::default(); stats.add(5); stats.add(100); stats.add(3); stats.add(15); stats.add(20); stats.add(6); // when let median = stats.approximated_median(); // then assert_eq!(median, 5); } #[test] fn should_count_rpc_stats() { // given let stats = RpcStats::default(); assert_eq!(stats.sessions(), 0); assert_eq!(stats.requests_rate(), 0); assert_eq!(stats.approximated_roundtrip(), 0); // when stats.open_session(); stats.close_session(); stats.open_session(); stats.count_request(); stats.count_request(); stats.add_roundtrip(125); // then assert_eq!(stats.sessions(), 1); assert_eq!(stats.requests_rate(), 2); assert_eq!(stats.approximated_roundtrip(), 125); } #[test] fn should_be_sync_and_send() { let stats = RpcStats::default(); is_sync(stats); } fn is_sync(x: F) { drop(x) } }