RPC middleware: Informant & Client.keep_alive (#4384)

* Adding RPC informant structs

* RPC informant

* Middleware counting RPC requests

* Moving client keep_alive to middleware
This commit is contained in:
Tomasz Drwięga
2017-02-04 22:18:19 +01:00
committed by Gav Wood
parent d7b937fe88
commit 248cd5e036
31 changed files with 520 additions and 407 deletions

View File

@@ -242,7 +242,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
}
};
let informant = Arc::new(Informant::new(client.clone(), None, None, None, cmd.with_color));
let informant = Arc::new(Informant::new(client.clone(), None, None, None, None, cmd.with_color));
service.register_io_handler(informant).map_err(|_| "Unable to register informant handler".to_owned())?;
let do_import = |bytes| {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use dir::default_data_path;
use ethcore::client::Client;
use ethcore_rpc::informant::RpcStats;
use ethsync::SyncProvider;
use hash_fetch::fetch::Client as FetchClient;
use helpers::replace_home;
@@ -64,6 +65,7 @@ pub struct Dependencies {
pub remote: Remote,
pub fetch: FetchClient,
pub signer: Arc<SignerService>,
pub stats: Arc<RpcStats>,
}
pub fn new(configuration: Configuration, deps: Dependencies) -> Result<Option<WebappServer>, String> {
@@ -174,7 +176,7 @@ mod server {
} else {
rpc_apis::ApiSet::UnsafeContext
};
let apis = rpc_apis::setup_rpc(Default::default(), deps.apis.clone(), api_set);
let apis = rpc_apis::setup_rpc(deps.stats, deps.apis.clone(), api_set);
let handler = RpcHandler::new(Arc::new(apis), deps.remote);
let start_result = match auth {
None => {

View File

@@ -30,7 +30,8 @@ use ethcore::service::ClientIoMessage;
use ethcore::snapshot::service::Service as SnapshotService;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
use number_prefix::{binary_prefix, Standalone, Prefixed};
use ethcore_rpc::is_major_importing;
use ethcore_rpc::{is_major_importing};
use ethcore_rpc::informant::RpcStats;
use rlp::View;
pub struct Informant {
@@ -41,6 +42,7 @@ pub struct Informant {
snapshot: Option<Arc<SnapshotService>>,
sync: Option<Arc<SyncProvider>>,
net: Option<Arc<ManageNetwork>>,
rpc_stats: Option<Arc<RpcStats>>,
last_import: Mutex<Instant>,
skipped: AtomicUsize,
skipped_txs: AtomicUsize,
@@ -63,13 +65,20 @@ pub trait MillisecondDuration {
impl MillisecondDuration for Duration {
fn as_milliseconds(&self) -> u64 {
self.as_secs() * 1000 + self.subsec_nanos() as u64 / 1000000
self.as_secs() * 1000 + self.subsec_nanos() as u64 / 1_000_000
}
}
impl Informant {
/// Make a new instance potentially `with_color` output.
pub fn new(client: Arc<Client>, sync: Option<Arc<SyncProvider>>, net: Option<Arc<ManageNetwork>>, snapshot: Option<Arc<SnapshotService>>, with_color: bool) -> Self {
pub fn new(
client: Arc<Client>,
sync: Option<Arc<SyncProvider>>,
net: Option<Arc<ManageNetwork>>,
snapshot: Option<Arc<SnapshotService>>,
rpc_stats: Option<Arc<RpcStats>>,
with_color: bool,
) -> Self {
Informant {
report: RwLock::new(None),
last_tick: RwLock::new(Instant::now()),
@@ -78,6 +87,7 @@ impl Informant {
snapshot: snapshot,
sync: sync,
net: net,
rpc_stats: rpc_stats,
last_import: Mutex::new(Instant::now()),
skipped: AtomicUsize::new(0),
skipped_txs: AtomicUsize::new(0),
@@ -102,6 +112,7 @@ impl Informant {
let cache_info = self.client.blockchain_cache_info();
let network_config = self.net.as_ref().map(|n| n.network_config());
let sync_status = self.sync.as_ref().map(|s| s.status());
let rpc_stats = self.rpc_stats.as_ref();
let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info());
let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s|
@@ -126,10 +137,10 @@ impl Informant {
false => t,
};
info!(target: "import", "{} {} {}",
info!(target: "import", "{} {} {} {}",
match importing {
true => match snapshot_sync {
false => format!("Syncing {} {} {} {}+{} Qed",
false => format!("Syncing {} {} {} {}+{} Qed",
paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))),
paint(White.bold(), format!("{}", chain_info.best_block_hash)),
{
@@ -170,7 +181,16 @@ impl Informant {
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", format_bytes(sync_info.mem_used)))),
_ => String::new(),
}
)
),
match rpc_stats {
Some(ref rpc_stats) => format!(
"RPC: {} conn, {} req/s, {} µs",
paint(Blue.bold(), format!("{:2}", rpc_stats.sessions())),
paint(Blue.bold(), format!("{:2}", rpc_stats.requests_rate())),
paint(Blue.bold(), format!("{:3}", rpc_stats.approximated_roundtrip())),
),
_ => String::new(),
},
);
*write_report = Some(report);

View File

@@ -22,6 +22,7 @@ use io::PanicHandler;
use dir::default_data_path;
use ethcore_rpc::{self as rpc, RpcServerError, IpcServerError, Metadata};
use ethcore_rpc::informant::{RpcStats, Middleware};
use helpers::parity_ipc_path;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_core::reactor::{RpcHandler, Remote};
@@ -85,6 +86,7 @@ pub struct Dependencies {
pub panic_handler: Arc<PanicHandler>,
pub apis: Arc<rpc_apis::Dependencies>,
pub remote: Remote,
pub stats: Arc<RpcStats>,
}
pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Result<Option<HttpServer>, String> {
@@ -97,8 +99,8 @@ pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Result<Option<H
Ok(Some(setup_http_rpc_server(deps, &addr, conf.cors, conf.hosts, conf.apis)?))
}
fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler<Metadata> {
rpc_apis::setup_rpc(MetaIoHandler::default(), deps.apis.clone(), apis)
fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler<Metadata, Middleware> {
rpc_apis::setup_rpc(deps.stats.clone(), deps.apis.clone(), apis)
}
pub fn setup_http_rpc_server(
@@ -122,12 +124,12 @@ pub fn setup_http_rpc_server(
}
}
pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Result<Option<IpcServer<Metadata>>, String> {
pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Result<Option<IpcServer<Metadata, Middleware>>, String> {
if !conf.enabled { return Ok(None); }
Ok(Some(setup_ipc_rpc_server(deps, &conf.socket_addr, conf.apis)?))
}
pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: ApiSet) -> Result<IpcServer<Metadata>, String> {
pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: ApiSet) -> Result<IpcServer<Metadata, Middleware>, String> {
let apis = setup_apis(apis, dependencies);
let handler = RpcHandler::new(Arc::new(apis), dependencies.remote.clone());
match rpc::start_ipc(addr, handler) {

View File

@@ -14,22 +14,25 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp::PartialEq;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::cmp::PartialEq;
use std::str::FromStr;
use std::sync::Arc;
use util::RotatingLogger;
use jsonrpc_core::{MetaIoHandler};
use ethcore::miner::{Miner, ExternalMiner};
use ethcore::client::Client;
use ethcore::account_provider::AccountProvider;
use ethcore::snapshot::SnapshotService;
use ethsync::{ManageNetwork, SyncProvider};
use ethcore_rpc::{Metadata, NetworkSettings};
pub use ethcore_rpc::SignerService;
use updater::Updater;
use ethcore::account_provider::AccountProvider;
use ethcore::client::Client;
use ethcore::miner::{Miner, ExternalMiner};
use ethcore::snapshot::SnapshotService;
use ethcore_rpc::{Metadata, NetworkSettings};
use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier};
use ethsync::{ManageNetwork, SyncProvider};
use hash_fetch::fetch::Client as FetchClient;
use jsonrpc_core::{MetaIoHandler};
use updater::Updater;
use util::RotatingLogger;
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum Api {
@@ -182,9 +185,13 @@ macro_rules! add_signing_methods {
}
}
pub fn setup_rpc(mut handler: MetaIoHandler<Metadata>, deps: Arc<Dependencies>, apis: ApiSet) -> MetaIoHandler<Metadata> {
pub fn setup_rpc(stats: Arc<RpcStats>, deps: Arc<Dependencies>, apis: ApiSet) -> MetaIoHandler<Metadata, Middleware> {
use ethcore_rpc::v1::*;
let mut handler = MetaIoHandler::with_middleware(Middleware::new(stats, ClientNotifier {
client: deps.client.clone(),
}));
// it's turned into vector, cause ont of the cases requires &[]
let apis = apis.list_apis().into_iter().collect::<Vec<_>>();
for api in &apis {
@@ -244,7 +251,7 @@ pub fn setup_rpc(mut handler: MetaIoHandler<Metadata>, deps: Arc<Dependencies>,
add_signing_methods!(ParitySigning, handler, deps);
},
Api::ParityAccounts => {
handler.extend_with(ParityAccountsClient::new(&deps.secret_store, &deps.client).to_delegate());
handler.extend_with(ParityAccountsClient::new(&deps.secret_store).to_delegate());
},
Api::ParitySet => {
handler.extend_with(ParitySetClient::new(

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use std::net::{TcpListener};
use ctrlc::CtrlC;
use fdlimit::raise_fd_limit;
use ethcore_rpc::{NetworkSettings, is_major_importing};
use ethcore_rpc::{NetworkSettings, informant, is_major_importing};
use ethsync::NetworkConfiguration;
use util::{Colour, version, RotatingLogger, Mutex, Condvar};
use io::{MayPanic, ForwardPanic, PanicHandler};
@@ -358,6 +358,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
service.add_notify(updater.clone());
// set up dependencies for rpc servers
let rpc_stats = Arc::new(informant::RpcStats::default());
let signer_path = cmd.signer_conf.signer_path.clone();
let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
signer_service: Arc::new(rpc_apis::SignerService::new(move || {
@@ -390,6 +391,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
panic_handler: panic_handler.clone(),
apis: deps_for_rpc_apis.clone(),
remote: event_loop.raw_remote(),
stats: rpc_stats.clone(),
};
// start rpc servers
@@ -405,6 +407,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
remote: event_loop.raw_remote(),
fetch: fetch.clone(),
signer: deps_for_rpc_apis.signer_service.clone(),
stats: rpc_stats.clone(),
};
let dapps_server = dapps::new(cmd.dapps_conf.clone(), dapps_deps)?;
@@ -413,6 +416,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
panic_handler: panic_handler.clone(),
apis: deps_for_rpc_apis.clone(),
remote: event_loop.raw_remote(),
rpc_stats: rpc_stats.clone(),
};
let signer_server = signer::start(cmd.signer_conf.clone(), signer_deps)?;
@@ -422,7 +426,8 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
Some(sync_provider.clone()),
Some(manage_network.clone()),
Some(snapshot_service.clone()),
cmd.logger_config.color
Some(rpc_stats.clone()),
cmd.logger_config.color,
));
service.add_notify(informant.clone());
service.register_io_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?;

View File

@@ -15,18 +15,21 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::io;
use std::sync::Arc;
use std::path::PathBuf;
use ansi_term::Colour;
use io::{ForwardPanic, PanicHandler};
use util::path::restrict_permissions_owner;
use rpc_apis;
use ethcore_signer as signer;
use dir::default_data_path;
use helpers::replace_home;
use jsonrpc_core::reactor::{RpcHandler, Remote};
use std::sync::Arc;
pub use ethcore_signer::Server as SignerServer;
use ansi_term::Colour;
use dir::default_data_path;
use ethcore_rpc::informant::RpcStats;
use ethcore_signer as signer;
use helpers::replace_home;
use io::{ForwardPanic, PanicHandler};
use jsonrpc_core::reactor::{RpcHandler, Remote};
use rpc_apis;
use util::path::restrict_permissions_owner;
const CODES_FILENAME: &'static str = "authcodes";
#[derive(Debug, PartialEq, Clone)]
@@ -55,6 +58,7 @@ pub struct Dependencies {
pub panic_handler: Arc<PanicHandler>,
pub apis: Arc<rpc_apis::Dependencies>,
pub remote: Remote,
pub rpc_stats: Arc<RpcStats>,
}
pub struct NewToken {
@@ -126,7 +130,8 @@ fn do_start(conf: Configuration, deps: Dependencies) -> Result<SignerServer, Str
info!("If you do not intend this, exit now.");
}
let server = server.skip_origin_validation(conf.skip_origin_validation);
let apis = rpc_apis::setup_rpc(Default::default(), deps.apis, rpc_apis::ApiSet::SafeContext);
let server = server.stats(deps.rpc_stats.clone());
let apis = rpc_apis::setup_rpc(deps.rpc_stats, deps.apis, rpc_apis::ApiSet::SafeContext);
let handler = RpcHandler::new(Arc::new(apis), deps.remote);
server.start(addr, handler)
};