Prometheus, heavy memory calls removed (#27)

This commit is contained in:
rakita
2020-09-14 16:08:57 +02:00
committed by GitHub
parent dd38573a28
commit aecc6fc862
55 changed files with 3953 additions and 179 deletions

View File

@@ -466,6 +466,19 @@ usage! {
"--ws-max-connections=[CONN]",
"Maximum number of allowed concurrent WebSockets JSON-RPC connections.",
["Metrics"]
FLAG flag_metrics: (bool) = false, or |c: &Config| c.metrics.as_ref()?.enable.clone(),
"--metrics",
"Enable prometheus metrics (only full client).",
ARG arg_metrics_port: (u16) = 3000u16, or |c: &Config| c.metrics.as_ref()?.port.clone(),
"--metrics-port=[PORT]",
"Specify the port portion of the metrics server.",
ARG arg_metrics_interface: (String) = "local", or |c: &Config| c.metrics.as_ref()?.interface.clone(),
"--metrics-interface=[IP]",
"Specify the hostname portion of the metrics server, IP should be an interface's IP address, or all (all interfaces) or local.",
["API and Console Options IPC"]
FLAG flag_no_ipc: (bool) = false, or |c: &Config| c.ipc.as_ref()?.disable.clone(),
"--no-ipc",
@@ -808,6 +821,7 @@ struct Config {
snapshots: Option<Snapshots>,
misc: Option<Misc>,
stratum: Option<Stratum>,
metrics: Option<Metrics>,
}
#[derive(Default, Debug, PartialEq, Deserialize)]
@@ -899,6 +913,14 @@ struct Ipc {
apis: Option<Vec<String>>,
}
#[derive(Default, Debug, PartialEq, Deserialize)]
#[serde(deny_unknown_fields)]
struct Metrics {
enable: Option<bool>,
port: Option<u16>,
interface: Option<String>,
}
#[derive(Default, Debug, PartialEq, Deserialize)]
#[serde(deny_unknown_fields)]
struct SecretStore {
@@ -1007,8 +1029,8 @@ struct Misc {
#[cfg(test)]
mod tests {
use super::{
Account, Args, ArgsError, Config, Footprint, Ipc, Mining, Misc, Network, Operating, Rpc,
SecretStore, Snapshots, Ws,
Account, Args, ArgsError, Config, Footprint, Ipc, Metrics, Mining, Misc, Network,
Operating, Rpc, SecretStore, Snapshots, Ws,
};
use clap::ErrorKind as ClapErrorKind;
use toml;
@@ -1307,6 +1329,11 @@ mod tests {
arg_ipc_apis: "web3,eth,net,parity,parity_accounts,personal,traces,secretstore"
.into(),
// METRICS
flag_metrics: false,
arg_metrics_port: 3000u16,
arg_metrics_interface: "local".into(),
// SECRETSTORE
flag_no_secretstore: false,
flag_no_secretstore_http: false,
@@ -1505,6 +1532,11 @@ mod tests {
path: None,
apis: Some(vec!["rpc".into(), "eth".into()]),
}),
metrics: Some(Metrics {
enable: Some(true),
interface: Some("local".to_string()),
port: Some(4000),
}),
secretstore: Some(SecretStore {
disable: None,
disable_http: None,

View File

@@ -32,6 +32,12 @@ port = 8180
[ipc]
apis = ["rpc", "eth"]
[metrics]
enable = true
interface = "local"
port = 4000
[secretstore]
http_port = 8082
port = 8083

View File

@@ -26,6 +26,7 @@ use ethcore::{
use ethereum_types::{Address, H256, U256};
use ethkey::{Public, Secret};
use hash::keccak;
use metrics::MetricsConfiguration;
use miner::pool;
use num_cpus;
use parity_version::{version, version_data};
@@ -158,6 +159,7 @@ impl Configuration {
let experimental_rpcs = self.args.flag_jsonrpc_experimental;
let secretstore_conf = self.secretstore_config()?;
let format = self.format()?;
let metrics_conf = self.metrics_config()?;
let keys_iterations = NonZeroU32::new(self.args.arg_keys_iterations)
.ok_or_else(|| "--keys-iterations must be non-zero")?;
@@ -422,6 +424,7 @@ impl Configuration {
verifier_settings: verifier_settings,
no_persistent_txqueue: self.args.flag_no_persistent_txqueue,
max_round_blocks_to_import: self.args.arg_max_round_blocks_to_import,
metrics_conf,
};
Cmd::Run(run_cmd)
};
@@ -953,6 +956,15 @@ impl Configuration {
Ok(conf)
}
fn metrics_config(&self) -> Result<MetricsConfiguration, String> {
let conf = MetricsConfiguration {
enabled: self.metrics_enabled(),
interface: self.metrics_interface(),
port: self.args.arg_ports_shift + self.args.arg_metrics_port,
};
Ok(conf)
}
fn snapshot_config(&self) -> Result<SnapshotConfiguration, String> {
let conf = SnapshotConfiguration {
no_periodic: self.args.flag_no_periodic_snapshot,
@@ -1048,6 +1060,10 @@ impl Configuration {
self.interface(&self.args.arg_ws_interface)
}
fn metrics_interface(&self) -> String {
self.interface(&self.args.arg_metrics_interface)
}
fn secretstore_interface(&self) -> String {
self.interface(&self.args.arg_secretstore_interface)
}
@@ -1128,6 +1144,10 @@ impl Configuration {
!self.args.flag_no_ws
}
fn metrics_enabled(&self) -> bool {
self.args.flag_metrics
}
fn secretstore_enabled(&self) -> bool {
!self.args.flag_no_secretstore && cfg!(feature = "secretstore")
}
@@ -1531,6 +1551,7 @@ mod tests {
verifier_settings: Default::default(),
no_persistent_txqueue: false,
max_round_blocks_to_import: 12,
metrics_conf: MetricsConfiguration::default(),
};
expected.secretstore_conf.enabled = cfg!(feature = "secretstore");
expected.secretstore_conf.http_enabled = cfg!(feature = "secretstore");

View File

@@ -146,7 +146,6 @@ impl InformantData for FullNodeInformantData {
let chain_info = self.client.chain_info();
let mut cache_sizes = CacheSizes::default();
cache_sizes.insert("db", client_report.state_db_mem);
cache_sizes.insert("queue", queue_info.mem_used);
cache_sizes.insert("chain", blockchain_cache_info.total());
@@ -157,8 +156,6 @@ impl InformantData for FullNodeInformantData {
let num_peers_range = net.num_peers_range();
debug_assert!(num_peers_range.end() >= num_peers_range.start());
cache_sizes.insert("sync", status.mem_used);
Some(SyncInfo {
last_imported_block_number: status
.last_imported_block_number
@@ -247,10 +244,15 @@ impl<T: InformantData> Informant<T> {
let rpc_stats = self.rpc_stats.as_ref();
let snapshot_sync = sync_info.as_ref().map_or(false, |s| s.snapshot_sync)
&& self.snapshot.as_ref().map_or(false, |s| match s.status() {
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => true,
_ => false,
});
&& self
.snapshot
.as_ref()
.map_or(false, |s| match s.restoration_status() {
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => {
true
}
_ => false,
});
if !importing && !snapshot_sync && elapsed < Duration::from_secs(30) {
return;
}
@@ -285,8 +287,8 @@ impl<T: InformantData> Informant<T> {
),
true => {
self.snapshot.as_ref().map_or(String::new(), |s|
match s.status() {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => {
match s.restoration_status() {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done, .. } => {
format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks)
},
RestorationStatus::Initializing { chunks_done } => {

View File

@@ -55,6 +55,7 @@ extern crate ethereum_types;
extern crate ethkey;
extern crate ethstore;
extern crate fetch;
extern crate hyper;
extern crate journaldb;
extern crate keccak_hash as hash;
extern crate kvdb;
@@ -65,7 +66,9 @@ extern crate parity_path as path;
extern crate parity_rpc;
extern crate parity_runtime;
extern crate parity_version;
extern crate prometheus;
extern crate registrar;
extern crate stats;
#[macro_use]
extern crate log as rlog;
@@ -96,6 +99,7 @@ mod configuration;
mod db;
mod helpers;
mod informant;
mod metrics;
mod modules;
mod params;
mod presale;

108
parity/metrics.rs Normal file
View File

@@ -0,0 +1,108 @@
use std::{sync::Arc, time::Instant};
use crate::{futures::Future, rpc, rpc_apis};
use parking_lot::Mutex;
use hyper::{service::service_fn_ok, Body, Method, Request, Response, Server, StatusCode};
use stats::{
prometheus::{self, Encoder},
prometheus_gauge, PrometheusMetrics,
};
#[derive(Debug, Clone, PartialEq)]
pub struct MetricsConfiguration {
/// Are metrics enabled (default is false)?
pub enabled: bool,
/// The IP of the network interface used (default is 127.0.0.1).
pub interface: String,
/// The network port (default is 3000).
pub port: u16,
}
impl Default for MetricsConfiguration {
fn default() -> Self {
MetricsConfiguration {
enabled: false,
interface: "127.0.0.1".into(),
port: 3000,
}
}
}
struct State {
rpc_apis: Arc<rpc_apis::FullDependencies>,
}
fn handle_request(req: Request<Body>, state: Arc<Mutex<State>>) -> Response<Body> {
let (parts, _body) = req.into_parts();
match (parts.method, parts.uri.path()) {
(Method::GET, "/metrics") => {
let start = Instant::now();
let mut reg = prometheus::Registry::new();
let state = state.lock();
state.rpc_apis.client.prometheus_metrics(&mut reg);
state.rpc_apis.sync.prometheus_metrics(&mut reg);
let elapsed = start.elapsed();
prometheus_gauge(
&mut reg,
"metrics_time",
"Time to perform rpc metrics",
elapsed.as_millis() as i64,
);
let mut buffer = vec![];
let encoder = prometheus::TextEncoder::new();
let metric_families = reg.gather();
encoder
.encode(&metric_families, &mut buffer)
.expect("all source of metrics are static; qed");
let text = String::from_utf8(buffer).expect("metrics encoding is ASCII; qed");
Response::new(Body::from(text))
}
(_, _) => {
let mut res = Response::new(Body::from("not found"));
*res.status_mut() = StatusCode::NOT_FOUND;
res
}
}
}
/// Start the prometheus metrics server accessible via GET <host>:<port>/metrics
pub fn start_prometheus_metrics(
conf: &MetricsConfiguration,
deps: &rpc::Dependencies<rpc_apis::FullDependencies>,
) -> Result<(), String> {
if !conf.enabled {
return Ok(());
}
let addr = format!("{}:{}", conf.interface, conf.port);
let addr = addr
.parse()
.map_err(|err| format!("Failed to parse address '{}': {}", addr, err))?;
let state = State {
rpc_apis: deps.apis.clone(),
};
let state = Arc::new(Mutex::new(state));
let server = Server::bind(&addr)
.serve(move || {
// This is the `Service` that will handle the connection.
// `service_fn_ok` is a helper to convert a function that
// returns a Response into a `Service`.
let state = state.clone();
service_fn_ok(move |req: Request<Body>| handle_request(req, state.clone()))
})
.map_err(|e| eprintln!("server error: {}", e));
println!("Listening on http://{}", addr);
deps.executor.spawn(server);
Ok(())
}

View File

@@ -38,6 +38,7 @@ use helpers::{execute_upgrades, passwords_from_files, to_client_config};
use informant::{FullNodeInformantData, Informant};
use journaldb::Algorithm;
use jsonrpc_core;
use metrics::{start_prometheus_metrics, MetricsConfiguration};
use miner::{external::ExternalMiner, work_notify::WorkPoster};
use modules;
use node_filter::NodeFilter;
@@ -109,6 +110,7 @@ pub struct RunCmd {
pub verifier_settings: VerifierSettings,
pub no_persistent_txqueue: bool,
pub max_round_blocks_to_import: usize,
pub metrics_conf: MetricsConfiguration,
}
// node info fetcher for the local store.
@@ -496,6 +498,10 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient
let rpc_direct = rpc::setup_apis(rpc_apis::ApiSet::All, &dependencies);
let ws_server = rpc::new_ws(cmd.ws_conf.clone(), &dependencies)?;
let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
// start the prometheus metrics server
start_prometheus_metrics(&cmd.metrics_conf, &dependencies)?;
let http_server = rpc::new_http(
"HTTP JSON-RPC",
"jsonrpc",

View File

@@ -96,7 +96,7 @@ fn restore_using<R: SnapshotReader>(
state_chunks_done,
block_chunks_done,
..
} = informant_handle.status()
} = informant_handle.restoration_status()
{
info!(
"Processed {}/{} state chunks and {}/{} block chunks.",
@@ -108,7 +108,7 @@ fn restore_using<R: SnapshotReader>(
info!("Restoring state");
for &state_hash in &manifest.state_hashes {
if snapshot.status() == RestorationStatus::Failed {
if snapshot.restoration_status() == RestorationStatus::Failed {
return Err("Restoration failed".into());
}
@@ -132,7 +132,7 @@ fn restore_using<R: SnapshotReader>(
info!("Restoring blocks");
for &block_hash in &manifest.block_hashes {
if snapshot.status() == RestorationStatus::Failed {
if snapshot.restoration_status() == RestorationStatus::Failed {
return Err("Restoration failed".into());
}
@@ -153,7 +153,7 @@ fn restore_using<R: SnapshotReader>(
snapshot.feed_block_chunk(block_hash, &chunk);
}
match snapshot.status() {
match snapshot.restoration_status() {
RestorationStatus::Ongoing { .. } => {
Err("Snapshot file is incomplete and missing chunks.".into())
}