diff --git a/Cargo.lock b/Cargo.lock index 460fd08c9..0abc8c1c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1073,6 +1073,7 @@ dependencies = [ "rlp_compress", "rlp_derive", "rustc-hex 1.0.0", + "stats", "tempdir", "triehash-ethereum", ] @@ -1123,9 +1124,12 @@ dependencies = [ "ethereum-types 0.4.2", "heapsize", "kvdb", + "kvdb-memorydb", + "kvdb-rocksdb", "parking_lot 0.7.1", "rlp 0.3.0", "rlp_derive", + "stats", ] [[package]] @@ -2154,6 +2158,7 @@ name = "journaldb" version = "0.2.0" dependencies = [ "env_logger", + "ethcore-db", "ethereum-types 0.4.2", "fastmap", "hash-db", @@ -2986,6 +2991,7 @@ name = "parity-local-store" version = "0.1.0" dependencies = [ "common-types", + "ethcore-db", "ethcore-io", "ethkey", "kvdb", diff --git a/bin/oe/cli/mod.rs b/bin/oe/cli/mod.rs index f07422940..216b7177c 100644 --- a/bin/oe/cli/mod.rs +++ b/bin/oe/cli/mod.rs @@ -475,6 +475,10 @@ usage! { "--metrics", "Enable prometheus metrics (only full client).", + ARG arg_metrics_prefix: (String) = "", or |c: &Config| c.metrics.as_ref()?.prefix.clone(), + "--metrics-prefix=[prefix]", + "Prepend the specified prefix to the exported metrics names.", + ARG arg_metrics_port: (u16) = 3000u16, or |c: &Config| c.metrics.as_ref()?.port.clone(), "--metrics-port=[PORT]", "Specify the port portion of the metrics server.", @@ -922,6 +926,7 @@ struct Ipc { #[serde(deny_unknown_fields)] struct Metrics { enable: Option, + prefix: Option, port: Option, interface: Option, } @@ -1338,6 +1343,7 @@ mod tests { // METRICS flag_metrics: false, + arg_metrics_prefix: "".into(), arg_metrics_port: 3000u16, arg_metrics_interface: "local".into(), @@ -1542,6 +1548,7 @@ mod tests { }), metrics: Some(Metrics { enable: Some(true), + prefix: Some("oe".to_string()), interface: Some("local".to_string()), port: Some(4000), }), diff --git a/bin/oe/cli/tests/config.toml b/bin/oe/cli/tests/config.toml index 581a59af5..d9b01a05d 100644 --- a/bin/oe/cli/tests/config.toml +++ b/bin/oe/cli/tests/config.toml @@ -36,7 +36,7 @@ apis = ["rpc", "eth"] enable = true interface = "local" port = 4000 - +prefix = "oe" [secretstore] http_port = 8082 diff --git a/bin/oe/configuration.rs b/bin/oe/configuration.rs index b0b1b9215..79fc2f5ec 100644 --- a/bin/oe/configuration.rs +++ b/bin/oe/configuration.rs @@ -958,6 +958,7 @@ impl Configuration { fn metrics_config(&self) -> Result { let conf = MetricsConfiguration { enabled: self.metrics_enabled(), + prefix: self.metrics_prefix(), interface: self.metrics_interface(), port: self.args.arg_ports_shift + self.args.arg_metrics_port, }; @@ -1147,6 +1148,10 @@ impl Configuration { self.args.flag_metrics } + fn metrics_prefix(&self) -> String { + self.args.arg_metrics_prefix.clone() + } + fn secretstore_enabled(&self) -> bool { !self.args.flag_no_secretstore && cfg!(feature = "secretstore") } diff --git a/bin/oe/db/rocksdb/mod.rs b/bin/oe/db/rocksdb/mod.rs index c923cba2f..e5789f89a 100644 --- a/bin/oe/db/rocksdb/mod.rs +++ b/bin/oe/db/rocksdb/mod.rs @@ -24,7 +24,8 @@ use self::{ }; use blooms_db; use ethcore::client::ClientConfig; -use kvdb::KeyValueDB; +use ethcore_db::KeyValueDB; +use stats::PrometheusMetrics; use std::{fs, io, path::Path, sync::Arc}; mod blooms; @@ -53,6 +54,10 @@ impl BlockChainDB for AppDB { } } +impl PrometheusMetrics for AppDB { + fn prometheus_metrics(&self, _: &mut stats::PrometheusRegistry) {} +} + /// Open a secret store DB using the given secret store data path. The DB path is one level beneath the data path. #[cfg(feature = "secretstore")] pub fn open_secretstore_db(data_path: &str) -> Result, String> { @@ -101,8 +106,11 @@ pub fn open_database( fs::create_dir_all(&blooms_path)?; fs::create_dir_all(&trace_blooms_path)?; + let db = Database::open(&config, client_path)?; + let db_with_metrics = ethcore_db::DatabaseWithMetrics::new(db); + let db = AppDB { - key_value: Arc::new(Database::open(&config, client_path)?), + key_value: Arc::new(db_with_metrics), blooms: blooms_db::Database::open(blooms_path)?, trace_blooms: blooms_db::Database::open(trace_blooms_path)?, }; diff --git a/bin/oe/metrics.rs b/bin/oe/metrics.rs index da083bd8d..7c7119bcf 100644 --- a/bin/oe/metrics.rs +++ b/bin/oe/metrics.rs @@ -8,13 +8,15 @@ use hyper::{service::service_fn_ok, Body, Method, Request, Response, Server, Sta use stats::{ prometheus::{self, Encoder}, - prometheus_gauge, PrometheusMetrics, + PrometheusMetrics, PrometheusRegistry, }; #[derive(Debug, Clone, PartialEq)] pub struct MetricsConfiguration { /// Are metrics enabled (default is false)? pub enabled: bool, + /// Prefix + pub prefix: String, /// The IP of the network interface used (default is 127.0.0.1). pub interface: String, /// The network port (default is 3000). @@ -25,6 +27,7 @@ impl Default for MetricsConfiguration { fn default() -> Self { MetricsConfiguration { enabled: false, + prefix: "".into(), interface: "127.0.0.1".into(), port: 3000, } @@ -35,19 +38,22 @@ struct State { rpc_apis: Arc, } -fn handle_request(req: Request, state: Arc>) -> Response { +fn handle_request( + req: Request, + conf: Arc, + state: Arc>, +) -> Response { let (parts, _body) = req.into_parts(); match (parts.method, parts.uri.path()) { (Method::GET, "/metrics") => { let start = Instant::now(); - let mut reg = prometheus::Registry::new(); + let mut reg = PrometheusRegistry::new(conf.prefix.clone()); let state = state.lock(); state.rpc_apis.client.prometheus_metrics(&mut reg); state.rpc_apis.sync.prometheus_metrics(&mut reg); let elapsed = start.elapsed(); - prometheus_gauge( - &mut reg, + reg.register_gauge( "metrics_time", "Time to perform rpc metrics", elapsed.as_millis() as i64, @@ -55,7 +61,7 @@ fn handle_request(req: Request, state: Arc>) -> Response| handle_request(req, state.clone())) + let conf = conf.clone(); + service_fn_ok(move |req: Request| { + handle_request(req, conf.clone(), state.clone()) + }) }) .map_err(|e| eprintln!("server error: {}", e)); - println!("Listening on http://{}", addr); + info!("Started prometeus metrics at http://{}/metrics", addr); deps.executor.spawn(server); diff --git a/crates/concensus/miner/local-store/Cargo.toml b/crates/concensus/miner/local-store/Cargo.toml index db34cd7c7..463470c86 100644 --- a/crates/concensus/miner/local-store/Cargo.toml +++ b/crates/concensus/miner/local-store/Cargo.toml @@ -7,6 +7,7 @@ authors = ["Parity Technologies "] [dependencies] common-types = { path = "../../../ethcore/types" } ethcore-io = { path = "../../../runtime/io" } +ethcore-db = { path = "../../../db/db"} kvdb = "0.1" log = "0.4" rlp = { version = "0.3.0", features = ["ethereum"] } diff --git a/crates/concensus/miner/local-store/src/lib.rs b/crates/concensus/miner/local-store/src/lib.rs index 9dd80dc35..0d2a3657c 100644 --- a/crates/concensus/miner/local-store/src/lib.rs +++ b/crates/concensus/miner/local-store/src/lib.rs @@ -18,14 +18,15 @@ use std::{fmt, sync::Arc, time::Duration}; +use ethcore_db::KeyValueDB; use io::IoHandler; -use kvdb::KeyValueDB; use types::transaction::{ Condition as TransactionCondition, PendingTransaction, SignedTransaction, TypedTransaction, UnverifiedTransaction, }; extern crate common_types as types; +extern crate ethcore_db; extern crate ethcore_io as io; extern crate kvdb; extern crate rlp; @@ -253,7 +254,7 @@ mod tests { #[test] fn twice_empty() { - let db = Arc::new(::kvdb_memorydb::create(0)); + let db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); { let store = super::create(db.clone(), None, Dummy(vec![])); @@ -284,7 +285,7 @@ mod tests { }) .collect(); - let db = Arc::new(::kvdb_memorydb::create(0)); + let db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); { // nothing written yet, will write pending. @@ -325,7 +326,7 @@ mod tests { PendingTransaction::new(signed, None) }); - let db = Arc::new(::kvdb_memorydb::create(0)); + let db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); { // nothing written, will write bad. let store = super::create(db.clone(), None, Dummy(transactions.clone())); diff --git a/crates/db/db/Cargo.toml b/crates/db/db/Cargo.toml index 64f175c24..7b6ebbfde 100644 --- a/crates/db/db/Cargo.toml +++ b/crates/db/db/Cargo.toml @@ -12,6 +12,9 @@ common-types = { path = "../../ethcore/types" } ethereum-types = "0.4" heapsize = "0.4" kvdb = "0.1" +kvdb-rocksdb = "0.1.3" +kvdb-memorydb = "0.1" parking_lot = "0.7" rlp = { version = "0.3.0", features = ["ethereum"] } rlp_derive = { path = "../../util/rlp-derive" } +stats = { path = "../../util/stats" } \ No newline at end of file diff --git a/crates/db/db/src/db.rs b/crates/db/db/src/db.rs index 67cba2fbd..464300a25 100644 --- a/crates/db/db/src/db.rs +++ b/crates/db/db/src/db.rs @@ -16,9 +16,11 @@ //! Database utilities and definitions. -use kvdb::{DBTransaction, KeyValueDB}; +use kvdb::DBTransaction; +use kvdb_rocksdb::Database; use parking_lot::RwLock; -use std::{collections::HashMap, hash::Hash, ops::Deref}; +use stats::{PrometheusMetrics, PrometheusRegistry}; +use std::{collections::HashMap, hash::Hash, io::Read, ops::Deref}; use rlp; @@ -282,7 +284,7 @@ impl Writable for DBTransaction { } } -impl Readable for KVDB { +impl Readable for KVDB { fn read(&self, col: Option, key: &dyn Key) -> Option where T: rlp::Decodable, @@ -311,3 +313,197 @@ impl Readable for KVDB { } } } + +/// Database with enabled statistics +pub struct DatabaseWithMetrics { + db: Database, + reads: std::sync::atomic::AtomicI64, + writes: std::sync::atomic::AtomicI64, + bytes_read: std::sync::atomic::AtomicI64, + bytes_written: std::sync::atomic::AtomicI64, +} + +impl DatabaseWithMetrics { + /// Create a new instance + pub fn new(db: Database) -> Self { + Self { + db, + reads: std::sync::atomic::AtomicI64::new(0), + writes: std::sync::atomic::AtomicI64::new(0), + bytes_read: std::sync::atomic::AtomicI64::new(0), + bytes_written: std::sync::atomic::AtomicI64::new(0), + } + } +} + +/// Ethcore definition of a KeyValueDB with embeeded metrics +pub trait KeyValueDB: kvdb::KeyValueDB + PrometheusMetrics {} + +impl kvdb::KeyValueDB for DatabaseWithMetrics { + fn get(&self, col: Option, key: &[u8]) -> std::io::Result> { + let res = self.db.get(col, key); + let count = res + .as_ref() + .map_or(0, |y| y.as_ref().map_or(0, |x| x.bytes().count())); + + self.reads + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.bytes_read + .fetch_add(count as i64, std::sync::atomic::Ordering::Relaxed); + + res + } + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + let res = self.db.get_by_prefix(col, prefix); + let count = res.as_ref().map_or(0, |x| x.bytes().count()); + + self.reads + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.bytes_read + .fetch_add(count as i64, std::sync::atomic::Ordering::Relaxed); + + res + } + fn write_buffered(&self, transaction: DBTransaction) { + let mut count = 0; + for op in &transaction.ops { + count += match op { + kvdb::DBOp::Insert { value, .. } => value.bytes().count(), + _ => 0, + }; + } + + self.writes.fetch_add( + transaction.ops.len() as i64, + std::sync::atomic::Ordering::Relaxed, + ); + self.bytes_written + .fetch_add(count as i64, std::sync::atomic::Ordering::Relaxed); + + self.db.write_buffered(transaction) + } + fn write(&self, transaction: DBTransaction) -> std::io::Result<()> { + let mut count = 0; + for op in &transaction.ops { + count += match op { + kvdb::DBOp::Insert { value, .. } => value.bytes().count(), + _ => 0, + }; + } + + self.bytes_written + .fetch_add(count as i64, std::sync::atomic::Ordering::Relaxed); + self.writes.fetch_add( + transaction.ops.len() as i64, + std::sync::atomic::Ordering::Relaxed, + ); + self.db.write(transaction) + } + fn flush(&self) -> std::io::Result<()> { + self.db.flush() + } + + fn iter<'a>( + &'a self, + col: Option, + ) -> Box<(dyn Iterator, Box<[u8]>)> + 'a)> { + kvdb::KeyValueDB::iter(&self.db, col) + } + + fn iter_from_prefix<'a>( + &'a self, + col: Option, + prefix: &'a [u8], + ) -> Box, Box<[u8]>)> + 'a> { + self.db.iter_from_prefix(col, prefix) + } + + fn restore(&self, new_db: &str) -> std::io::Result<()> { + self.db.restore(new_db) + } +} + +impl KeyValueDB for DatabaseWithMetrics {} + +impl PrometheusMetrics for DatabaseWithMetrics { + fn prometheus_metrics(&self, p: &mut PrometheusRegistry) { + p.register_counter( + "kvdb_reads", + "db reads", + self.reads.load(std::sync::atomic::Ordering::Relaxed) as i64, + ); + p.register_counter( + "kvdb_writes", + "db writes", + self.writes.load(std::sync::atomic::Ordering::Relaxed) as i64, + ); + p.register_counter( + "kvdb_bytes_read", + "db bytes_reads", + self.bytes_read.load(std::sync::atomic::Ordering::Relaxed) as i64, + ); + p.register_counter( + "kvdb_bytes_written", + "db bytes_written", + self.bytes_written + .load(std::sync::atomic::Ordering::Relaxed) as i64, + ); + } +} + +/// InMemory with disabled statistics +pub struct InMemoryWithMetrics { + db: kvdb_memorydb::InMemory, +} + +impl kvdb::KeyValueDB for InMemoryWithMetrics { + fn get(&self, col: Option, key: &[u8]) -> std::io::Result> { + self.db.get(col, key) + } + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + self.db.get_by_prefix(col, prefix) + } + fn write_buffered(&self, transaction: DBTransaction) { + self.db.write_buffered(transaction) + } + fn write(&self, transaction: DBTransaction) -> std::io::Result<()> { + self.db.write(transaction) + } + fn flush(&self) -> std::io::Result<()> { + self.db.flush() + } + + fn iter<'a>( + &'a self, + col: Option, + ) -> Box<(dyn Iterator, Box<[u8]>)> + 'a)> { + kvdb::KeyValueDB::iter(&self.db, col) + } + + fn iter_from_prefix<'a>( + &'a self, + col: Option, + prefix: &'a [u8], + ) -> Box, Box<[u8]>)> + 'a> { + self.db.iter_from_prefix(col, prefix) + } + + fn restore(&self, new_db: &str) -> std::io::Result<()> { + self.db.restore(new_db) + } +} + +impl PrometheusMetrics for InMemoryWithMetrics { + fn prometheus_metrics(&self, _: &mut PrometheusRegistry) {} +} + +impl KeyValueDB for InMemoryWithMetrics {} + +impl InMemoryWithMetrics { + /// Create new instance + pub fn create(num_cols: u32) -> Self { + Self { + db: kvdb_memorydb::create(num_cols), + } + } +} diff --git a/crates/db/db/src/lib.rs b/crates/db/db/src/lib.rs index 9181b6402..29d33cc02 100644 --- a/crates/db/db/src/lib.rs +++ b/crates/db/db/src/lib.rs @@ -22,5 +22,6 @@ mod db; pub mod cache_manager; pub mod keys; +pub use kvdb::{DBTransaction, DBValue}; pub use self::db::*; diff --git a/crates/db/journaldb/Cargo.toml b/crates/db/journaldb/Cargo.toml index 5ca9853ab..9c331e14d 100644 --- a/crates/db/journaldb/Cargo.toml +++ b/crates/db/journaldb/Cargo.toml @@ -11,6 +11,7 @@ ethereum-types = "0.4" hash-db = "0.11.0" heapsize = "0.4" keccak-hasher = { path = "../../util/keccak-hasher" } +ethcore-db = { path = "../../db/db"} kvdb = "0.1" log = "0.4" memory-db = { path = "../memory-db" } diff --git a/crates/db/journaldb/src/archivedb.rs b/crates/db/journaldb/src/archivedb.rs index 53ad46140..79cafcd7b 100644 --- a/crates/db/journaldb/src/archivedb.rs +++ b/crates/db/journaldb/src/archivedb.rs @@ -25,10 +25,10 @@ use std::{ use super::{ error_key_already_exists, error_negatively_reference_hash, memory_db::*, LATEST_ERA_KEY, }; +use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::HashDB; use keccak_hasher::KeccakHasher; -use kvdb::{DBTransaction, DBValue, KeyValueDB}; use rlp::{decode, encode}; use traits::JournalDB; @@ -222,13 +222,12 @@ mod tests { use super::*; use hash_db::HashDB; use keccak::keccak; - use kvdb_memorydb; use JournalDB; #[test] fn insert_same_in_fork() { // history is 1 - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let x = jdb.insert(b"X"); jdb.commit_batch(1, &keccak(b"1"), None).unwrap(); @@ -256,7 +255,7 @@ mod tests { #[test] fn long_history() { // history is 3 - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let h = jdb.insert(b"foo"); jdb.commit_batch(0, &keccak(b"0"), None).unwrap(); assert!(jdb.contains(&h)); @@ -276,7 +275,7 @@ mod tests { #[test] #[should_panic] fn multiple_owed_removal_not_allowed() { - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let h = jdb.insert(b"foo"); jdb.commit_batch(0, &keccak(b"0"), None).unwrap(); assert!(jdb.contains(&h)); @@ -290,7 +289,7 @@ mod tests { #[test] fn complex() { // history is 1 - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let foo = jdb.insert(b"foo"); let bar = jdb.insert(b"bar"); @@ -326,7 +325,7 @@ mod tests { #[test] fn fork() { // history is 1 - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let foo = jdb.insert(b"foo"); let bar = jdb.insert(b"bar"); @@ -355,7 +354,7 @@ mod tests { #[test] fn overwrite() { // history is 1 - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let foo = jdb.insert(b"foo"); jdb.commit_batch(0, &keccak(b"0"), None).unwrap(); @@ -377,7 +376,7 @@ mod tests { #[test] fn fork_same_key() { // history is 1 - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); jdb.commit_batch(0, &keccak(b"0"), None).unwrap(); let foo = jdb.insert(b"foo"); @@ -396,7 +395,7 @@ mod tests { #[test] fn reopen() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let bar = H256::random(); let foo = { @@ -426,7 +425,7 @@ mod tests { #[test] fn reopen_remove() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let foo = { let mut jdb = ArchiveDB::new(shared_db.clone(), None); @@ -460,7 +459,7 @@ mod tests { #[test] fn reopen_fork() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let (foo, _, _) = { let mut jdb = ArchiveDB::new(shared_db.clone(), None); // history is 1 @@ -488,7 +487,7 @@ mod tests { #[test] fn inject() { - let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None); + let mut jdb = ArchiveDB::new(Arc::new(ethcore_db::InMemoryWithMetrics::create(0)), None); let key = jdb.insert(b"dog"); jdb.inject_batch().unwrap(); diff --git a/crates/db/journaldb/src/earlymergedb.rs b/crates/db/journaldb/src/earlymergedb.rs index cf5ad0a82..2d1c5be69 100644 --- a/crates/db/journaldb/src/earlymergedb.rs +++ b/crates/db/journaldb/src/earlymergedb.rs @@ -26,11 +26,11 @@ use super::{ error_key_already_exists, error_negatively_reference_hash, traits::JournalDB, LATEST_ERA_KEY, }; use bytes::Bytes; +use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::HashDB; use heapsize::HeapSizeOf; use keccak_hasher::KeccakHasher; -use kvdb::{DBTransaction, DBValue, KeyValueDB}; use memory_db::*; use parking_lot::RwLock; use rlp::{decode, encode}; @@ -622,7 +622,6 @@ mod tests { use super::{super::traits::JournalDB, *}; use hash_db::HashDB; use keccak::keccak; - use kvdb_memorydb; #[test] fn insert_same_in_fork() { @@ -913,13 +912,13 @@ mod tests { } fn new_db() -> EarlyMergeDB { - let backing = Arc::new(kvdb_memorydb::create(0)); + let backing = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); EarlyMergeDB::new(backing, None) } #[test] fn reopen() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let bar = H256::random(); let foo = { @@ -1105,7 +1104,7 @@ mod tests { fn reopen_remove_three() { let _ = ::env_logger::try_init(); - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let foo = keccak(b"foo"); { @@ -1166,7 +1165,7 @@ mod tests { #[test] fn reopen_fork() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let (foo, bar, baz) = { let mut jdb = EarlyMergeDB::new(shared_db.clone(), None); diff --git a/crates/db/journaldb/src/lib.rs b/crates/db/journaldb/src/lib.rs index b41bb0a35..1365302f5 100644 --- a/crates/db/journaldb/src/lib.rs +++ b/crates/db/journaldb/src/lib.rs @@ -20,6 +20,7 @@ extern crate heapsize; #[macro_use] extern crate log; +extern crate ethcore_db; extern crate ethereum_types; extern crate fastmap; extern crate hash_db; @@ -147,7 +148,7 @@ impl fmt::Display for Algorithm { /// Create a new `JournalDB` trait object over a generic key-value database. pub fn new( - backing: Arc, + backing: Arc, algorithm: Algorithm, col: Option, ) -> Box { diff --git a/crates/db/journaldb/src/overlaydb.rs b/crates/db/journaldb/src/overlaydb.rs index 3828842cb..5df5d0bc4 100644 --- a/crates/db/journaldb/src/overlaydb.rs +++ b/crates/db/journaldb/src/overlaydb.rs @@ -23,10 +23,10 @@ use std::{ }; use super::error_negatively_reference_hash; +use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::HashDB; use keccak_hasher::KeccakHasher; -use kvdb::{DBTransaction, DBValue, KeyValueDB}; use memory_db::*; use rlp::{decode, encode, Decodable, DecoderError, Encodable, Rlp, RlpStream}; @@ -88,7 +88,7 @@ impl OverlayDB { /// Create a new instance of OverlayDB with an anonymous temporary database. #[cfg(test)] pub fn new_temp() -> OverlayDB { - let backing = Arc::new(::kvdb_memorydb::create(0)); + let backing = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); Self::new(backing, None) } diff --git a/crates/db/journaldb/src/overlayrecentdb.rs b/crates/db/journaldb/src/overlayrecentdb.rs index 8a51161c7..84ae4b3ad 100644 --- a/crates/db/journaldb/src/overlayrecentdb.rs +++ b/crates/db/journaldb/src/overlayrecentdb.rs @@ -23,12 +23,12 @@ use std::{ }; use super::{error_negatively_reference_hash, JournalDB, DB_PREFIX_LEN, LATEST_ERA_KEY}; +use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use fastmap::H256FastMap; use hash_db::HashDB; use heapsize::HeapSizeOf; use keccak_hasher::KeccakHasher; -use kvdb::{DBTransaction, DBValue, KeyValueDB}; use memory_db::*; use parking_lot::RwLock; use rlp::{decode, encode, Decodable, DecoderError, Encodable, Rlp, RlpStream}; @@ -554,11 +554,10 @@ mod tests { use super::*; use hash_db::HashDB; use keccak::keccak; - use kvdb_memorydb; use JournalDB; fn new_db() -> OverlayRecentDB { - let backing = Arc::new(kvdb_memorydb::create(0)); + let backing = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); OverlayRecentDB::new(backing, None) } @@ -832,7 +831,7 @@ mod tests { #[test] fn reopen() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let bar = H256::random(); let foo = { @@ -1015,7 +1014,7 @@ mod tests { fn reopen_remove_three() { let _ = ::env_logger::try_init(); - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let foo = keccak(b"foo"); { @@ -1076,7 +1075,7 @@ mod tests { #[test] fn reopen_fork() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); let (foo, bar, baz) = { let mut jdb = OverlayRecentDB::new(shared_db.clone(), None); @@ -1146,7 +1145,7 @@ mod tests { #[test] fn earliest_era() { - let shared_db = Arc::new(kvdb_memorydb::create(0)); + let shared_db = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); // empty DB let mut jdb = OverlayRecentDB::new(shared_db.clone(), None); diff --git a/crates/db/journaldb/src/refcounteddb.rs b/crates/db/journaldb/src/refcounteddb.rs index ee4d1a0b2..73ac77945 100644 --- a/crates/db/journaldb/src/refcounteddb.rs +++ b/crates/db/journaldb/src/refcounteddb.rs @@ -23,11 +23,11 @@ use std::{ }; use super::{traits::JournalDB, LATEST_ERA_KEY}; +use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::HashDB; use heapsize::HeapSizeOf; use keccak_hasher::KeccakHasher; -use kvdb::{DBTransaction, DBValue, KeyValueDB}; use memory_db::MemoryDB; use overlaydb::OverlayDB; use rlp::{decode, encode}; @@ -252,11 +252,10 @@ mod tests { use super::*; use hash_db::HashDB; use keccak::keccak; - use kvdb_memorydb; use JournalDB; fn new_db() -> RefCountedDB { - let backing = Arc::new(kvdb_memorydb::create(0)); + let backing = Arc::new(ethcore_db::InMemoryWithMetrics::create(0)); RefCountedDB::new(backing, None) } diff --git a/crates/db/journaldb/src/traits.rs b/crates/db/journaldb/src/traits.rs index 3cc2cdb32..ecd2d556e 100644 --- a/crates/db/journaldb/src/traits.rs +++ b/crates/db/journaldb/src/traits.rs @@ -18,10 +18,10 @@ use std::{io, sync::Arc}; +use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::{AsHashDB, HashDB}; use keccak_hasher::KeccakHasher; -use kvdb::{self, DBTransaction, DBValue}; use std::collections::{BTreeMap, HashMap}; /// expose keys of a hashDB for debugging or tests (slow). @@ -86,7 +86,7 @@ pub trait JournalDB: KeyedHashDB { } /// Get backing database. - fn backing(&self) -> &Arc; + fn backing(&self) -> &Arc; /// Clear internal strucutres. This should called after changes have been written /// to the backing strage diff --git a/crates/ethcore/blockchain/Cargo.toml b/crates/ethcore/blockchain/Cargo.toml index d20a23889..75f3135a5 100644 --- a/crates/ethcore/blockchain/Cargo.toml +++ b/crates/ethcore/blockchain/Cargo.toml @@ -26,6 +26,7 @@ rlp = { version = "0.3.0", features = ["ethereum"] } rlp_compress = { path = "../../util/rlp-compress" } rlp_derive = { path = "../../util/rlp-derive" } triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" } +stats = { path = "../../util/stats" } [dev-dependencies] env_logger = "0.5" diff --git a/crates/ethcore/blockchain/src/blockchain.rs b/crates/ethcore/blockchain/src/blockchain.rs index a03b25aae..1363304ff 100644 --- a/crates/ethcore/blockchain/src/blockchain.rs +++ b/crates/ethcore/blockchain/src/blockchain.rs @@ -41,6 +41,7 @@ use common_types::{ views::{BlockView, HeaderView}, BlockNumber, }; +use db::{DBTransaction, KeyValueDB}; use ethcore_db::{ self as db, cache_manager::CacheManager, @@ -50,13 +51,13 @@ use ethcore_db::{ use ethereum_types::{Bloom, BloomRef, H256, U256}; use heapsize::HeapSizeOf; use itertools::Itertools; -use kvdb::{DBTransaction, KeyValueDB}; use log::{info, trace, warn}; use parity_bytes::Bytes; use parking_lot::{Mutex, RwLock}; use rayon::prelude::*; use rlp::RlpStream; use rlp_compress::{blocks_swapper, compress, decompress}; +use stats::PrometheusMetrics; use crate::{ best_block::{BestAncientBlock, BestBlock}, @@ -66,7 +67,7 @@ use crate::{ }; /// Database backing `BlockChain`. -pub trait BlockChainDB: Send + Sync { +pub trait BlockChainDB: Send + Sync + PrometheusMetrics { /// Generic key value store. fn key_value(&self) -> &Arc; @@ -1950,6 +1951,9 @@ mod tests { &self.trace_blooms } } + impl PrometheusMetrics for TestBlockChainDB { + fn prometheus_metrics(&self, _: &mut stats::PrometheusRegistry) {} + } /// Creates new test instance of `BlockChainDB` pub fn new_db() -> Arc { @@ -1961,7 +1965,9 @@ mod tests { trace_blooms: blooms_db::Database::open(trace_blooms_dir.path()).unwrap(), _blooms_dir: blooms_dir, _trace_blooms_dir: trace_blooms_dir, - key_value: Arc::new(kvdb_memorydb::create(ethcore_db::NUM_COLUMNS.unwrap())), + key_value: Arc::new(ethcore_db::InMemoryWithMetrics::create( + ethcore_db::NUM_COLUMNS.unwrap(), + )), }; Arc::new(db) diff --git a/crates/ethcore/src/client/client.rs b/crates/ethcore/src/client/client.rs index 80531f721..a9cba475f 100644 --- a/crates/ethcore/src/client/client.rs +++ b/crates/ethcore/src/client/client.rs @@ -33,12 +33,12 @@ use blockchain::{ }; use bytes::{Bytes, ToPretty}; use call_contract::CallContract; +use db::{DBTransaction, DBValue, KeyValueDB}; use error::Error; use ethcore_miner::pool::VerifiedTransaction; use ethereum_types::{Address, H256, H264, U256}; use hash::keccak; use itertools::Itertools; -use kvdb::{DBTransaction, DBValue, KeyValueDB}; use parking_lot::{Mutex, RwLock}; use rand::OsRng; use rlp::{PayloadInfo, Rlp}; @@ -88,7 +88,7 @@ use snapshot::{self, io as snapshot_io, SnapshotClient}; use spec::Spec; use state::{self, State}; use state_db::StateDB; -use stats::{prometheus, prometheus_counter, prometheus_gauge, PrometheusMetrics}; +use stats::{PrometheusMetrics, PrometheusRegistry}; use trace::{ self, Database as TraceDatabase, ImportRequest as TraceImportRequest, LocalizedTrace, TraceDB, }; @@ -3236,41 +3236,36 @@ impl IoChannelQueue { } impl PrometheusMetrics for Client { - fn prometheus_metrics(&self, r: &mut prometheus::Registry) { + fn prometheus_metrics(&self, r: &mut PrometheusRegistry) { // gas, tx & blocks let report = self.report(); for (key, value) in report.item_sizes.iter() { - prometheus_gauge( - r, + r.register_gauge( &key, format!("Total item number of {}", key).as_str(), *value as i64, ); } - prometheus_counter( - r, + r.register_counter( "import_gas", "Gas processed", report.gas_processed.as_u64() as i64, ); - prometheus_counter( - r, + r.register_counter( "import_blocks", "Blocks imported", report.blocks_imported as i64, ); - prometheus_counter( - r, + r.register_counter( "import_txs", "Transactions applied", report.transactions_applied as i64, ); let state_db = self.state_db.read(); - prometheus_gauge( - r, + r.register_gauge( "statedb_cache_size", "State DB cache size", state_db.cache_size() as i64, @@ -3278,32 +3273,27 @@ impl PrometheusMetrics for Client { // blockchain cache let blockchain_cache_info = self.blockchain_cache_info(); - prometheus_gauge( - r, + r.register_gauge( "blockchaincache_block_details", "BlockDetails cache size", blockchain_cache_info.block_details as i64, ); - prometheus_gauge( - r, + r.register_gauge( "blockchaincache_block_recipts", "Block receipts size", blockchain_cache_info.block_receipts as i64, ); - prometheus_gauge( - r, + r.register_gauge( "blockchaincache_blocks", "Blocks cache size", blockchain_cache_info.blocks as i64, ); - prometheus_gauge( - r, + r.register_gauge( "blockchaincache_txaddrs", "Transaction addresses cache size", blockchain_cache_info.transaction_addresses as i64, ); - prometheus_gauge( - r, + r.register_gauge( "blockchaincache_size", "Total blockchain cache size", blockchain_cache_info.total() as i64, @@ -3321,22 +3311,19 @@ impl PrometheusMetrics for Client { .map(|last| (first, U256::from(last))) }); if let Some((first, last)) = gap { - prometheus_gauge( - r, + r.register_gauge( "chain_warpsync_gap_first", "Warp sync gap, first block", first.as_u64() as i64, ); - prometheus_gauge( - r, + r.register_gauge( "chain_warpsync_gap_last", "Warp sync gap, last block", last.as_u64() as i64, ); } - prometheus_gauge( - r, + r.register_gauge( "chain_block", "Best block number", chain.best_block_number as i64, @@ -3344,14 +3331,12 @@ impl PrometheusMetrics for Client { // prunning info let prunning = self.pruning_info(); - prometheus_gauge( - r, + r.register_gauge( "prunning_earliest_chain", "The first block which everything can be served after", prunning.earliest_chain as i64, ); - prometheus_gauge( - r, + r.register_gauge( "prunning_earliest_state", "The first block where state requests may be served", prunning.earliest_state as i64, @@ -3359,36 +3344,34 @@ impl PrometheusMetrics for Client { // queue info let queue = self.queue_info(); - prometheus_gauge( - r, + r.register_gauge( "queue_mem_used", "Queue heap memory used in bytes", queue.mem_used as i64, ); - prometheus_gauge( - r, + r.register_gauge( "queue_size_total", "The total size of the queues", queue.total_queue_size() as i64, ); - prometheus_gauge( - r, + r.register_gauge( "queue_size_unverified", "Number of queued items pending verification", queue.unverified_queue_size as i64, ); - prometheus_gauge( - r, + r.register_gauge( "queue_size_verified", "Number of verified queued items pending import", queue.verified_queue_size as i64, ); - prometheus_gauge( - r, + r.register_gauge( "queue_size_verifying", "Number of items being verified", queue.verifying_queue_size as i64, ); + + // database info + self.db.read().key_value().prometheus_metrics(r); } } diff --git a/crates/ethcore/src/client/evm_test_client.rs b/crates/ethcore/src/client/evm_test_client.rs index a80a7c04a..bc9eac40d 100644 --- a/crates/ethcore/src/client/evm_test_client.rs +++ b/crates/ethcore/src/client/evm_test_client.rs @@ -25,7 +25,6 @@ use executive; use factory::{self, Factories}; use journaldb; use kvdb::{self, KeyValueDB}; -use kvdb_memorydb; use pod_state; use spec; use state; @@ -181,7 +180,7 @@ impl<'a> EvmTestClient<'a> { spec: &'a spec::Spec, factories: &Factories, ) -> Result, EvmTestError> { - let db = Arc::new(kvdb_memorydb::create( + let db = Arc::new(ethcore_db::InMemoryWithMetrics::create( db::NUM_COLUMNS.expect("We use column-based DB; qed"), )); let journal_db = @@ -211,7 +210,7 @@ impl<'a> EvmTestClient<'a> { factories: &Factories, pod_state: pod_state::PodState, ) -> Result, EvmTestError> { - let db = Arc::new(kvdb_memorydb::create( + let db = Arc::new(ethcore_db::InMemoryWithMetrics::create( db::NUM_COLUMNS.expect("We use column-based DB; qed"), )); let journal_db = diff --git a/crates/ethcore/src/client/test_client.rs b/crates/ethcore/src/client/test_client.rs index 6e284e6c2..28195da23 100644 --- a/crates/ethcore/src/client/test_client.rs +++ b/crates/ethcore/src/client/test_client.rs @@ -34,7 +34,6 @@ use ethtrie; use hash::keccak; use itertools::Itertools; use kvdb::DBValue; -use kvdb_memorydb; use parking_lot::RwLock; use rlp::RlpStream; use rustc_hex::FromHex; @@ -75,7 +74,7 @@ use miner::{self, Miner, MinerService}; use spec::Spec; use state::StateInfo; use state_db::StateDB; -use stats::{prometheus, PrometheusMetrics}; +use stats::{PrometheusMetrics, PrometheusRegistry}; use trace::LocalizedTrace; use verification::queue::{kind::blocks::Unverified, QueueInfo}; @@ -409,7 +408,7 @@ impl TestBlockChainClient { /// Get temporary db state1 pub fn get_temp_state_db() -> StateDB { - let db = kvdb_memorydb::create(NUM_COLUMNS.unwrap_or(0)); + let db = ethcore_db::InMemoryWithMetrics::create(NUM_COLUMNS.unwrap_or(0)); let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, COL_STATE); StateDB::new(journal_db, 1024 * 1024) } @@ -1132,5 +1131,5 @@ impl super::traits::EngineClient for TestBlockChainClient { } impl PrometheusMetrics for TestBlockChainClient { - fn prometheus_metrics(&self, _r: &mut prometheus::Registry) {} + fn prometheus_metrics(&self, _r: &mut PrometheusRegistry) {} } diff --git a/crates/ethcore/src/lib.rs b/crates/ethcore/src/lib.rs index 8a95f2213..ba87d3ef4 100644 --- a/crates/ethcore/src/lib.rs +++ b/crates/ethcore/src/lib.rs @@ -73,8 +73,6 @@ extern crate ethcore_accounts as accounts; extern crate ethcore_stratum; #[cfg(feature = "json-tests")] extern crate globset; -#[cfg(any(test, feature = "test-helpers"))] -extern crate kvdb_memorydb; #[cfg(any(test, feature = "kvdb-rocksdb"))] extern crate kvdb_rocksdb; #[cfg(test)] diff --git a/crates/ethcore/src/snapshot/consensus/authority.rs b/crates/ethcore/src/snapshot/consensus/authority.rs index 9c74e9ec0..4b03d6588 100644 --- a/crates/ethcore/src/snapshot/consensus/authority.rs +++ b/crates/ethcore/src/snapshot/consensus/authority.rs @@ -32,9 +32,9 @@ use snapshot::{Error, ManifestData, Progress}; use blockchain::{BlockChain, BlockChainDB, BlockProvider}; use bytes::Bytes; +use db::KeyValueDB; use ethereum_types::{H256, U256}; use itertools::{Itertools, Position}; -use kvdb::KeyValueDB; use rlp::{Rlp, RlpStream}; use types::{ encoded, header::Header, ids::BlockId, receipt::TypedReceipt, transaction::TypedTransaction, diff --git a/crates/ethcore/src/snapshot/consensus/work.rs b/crates/ethcore/src/snapshot/consensus/work.rs index 46d4edd85..ae9dc364b 100644 --- a/crates/ethcore/src/snapshot/consensus/work.rs +++ b/crates/ethcore/src/snapshot/consensus/work.rs @@ -32,9 +32,9 @@ use std::{ use blockchain::{BlockChain, BlockChainDB, BlockProvider}; use bytes::Bytes; +use db::KeyValueDB; use engines::EthEngine; use ethereum_types::H256; -use kvdb::KeyValueDB; use rand::OsRng; use rlp::{Rlp, RlpStream}; use snapshot::{block::AbridgedBlock, Error, ManifestData, Progress}; diff --git a/crates/ethcore/src/snapshot/mod.rs b/crates/ethcore/src/snapshot/mod.rs index 5ffd3cff2..ca9b7f7b0 100644 --- a/crates/ethcore/src/snapshot/mod.rs +++ b/crates/ethcore/src/snapshot/mod.rs @@ -35,12 +35,12 @@ use engines::EthEngine; use types::{header::Header, ids::BlockId}; use bytes::Bytes; +use db::{DBValue, KeyValueDB}; use ethereum_types::H256; use ethtrie::{TrieDB, TrieDBMut}; use hash_db::HashDB; use journaldb::{self, Algorithm, JournalDB}; use keccak_hasher::KeccakHasher; -use kvdb::{DBValue, KeyValueDB}; use num_cpus; use parking_lot::Mutex; use rlp::{Rlp, RlpStream}; diff --git a/crates/ethcore/src/snapshot/tests/state.rs b/crates/ethcore/src/snapshot/tests/state.rs index 06a7e4afe..03653acf6 100644 --- a/crates/ethcore/src/snapshot/tests/state.rs +++ b/crates/ethcore/src/snapshot/tests/state.rs @@ -82,8 +82,11 @@ fn snap_and_restore() { let db_path = tempdir.path().join("db"); let db = { - let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap()); - let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::OverlayRecent); + let new_db = Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap(); + let new_db_with_metrics: Arc = + Arc::new(ethcore_db::DatabaseWithMetrics::new(new_db)); + let mut rebuilder = + StateRebuilder::new(new_db_with_metrics.clone(), Algorithm::OverlayRecent); let reader = PackedReader::new(&snap_file).unwrap().unwrap(); let flag = AtomicBool::new(true); @@ -98,7 +101,7 @@ fn snap_and_restore() { assert_eq!(rebuilder.state_root(), state_root); rebuilder.finalize(1000, H256::default()).unwrap(); - new_db + new_db_with_metrics }; let new_db = journaldb::new(db, Algorithm::OverlayRecent, ::db::COL_STATE); @@ -163,10 +166,11 @@ fn get_code_from_prev_chunk() { let tempdir = TempDir::new("").unwrap(); let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); - let new_db = Arc::new(Database::open(&db_cfg, tempdir.path().to_str().unwrap()).unwrap()); - + let new_db = Database::open(&db_cfg, tempdir.path().to_str().unwrap()).unwrap(); + let new_db_with_metrics = Arc::new(db::DatabaseWithMetrics::new(new_db)); { - let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::OverlayRecent); + let mut rebuilder = + StateRebuilder::new(new_db_with_metrics.clone(), Algorithm::OverlayRecent); let flag = AtomicBool::new(true); rebuilder.feed(&chunk1, &flag).unwrap(); @@ -175,7 +179,11 @@ fn get_code_from_prev_chunk() { rebuilder.finalize(1000, H256::random()).unwrap(); } - let state_db = journaldb::new(new_db, Algorithm::OverlayRecent, ::db::COL_STATE); + let state_db = journaldb::new( + new_db_with_metrics, + Algorithm::OverlayRecent, + ::db::COL_STATE, + ); assert_eq!(state_db.earliest_era(), Some(1000)); } @@ -214,8 +222,10 @@ fn checks_flag() { let tempdir = TempDir::new("").unwrap(); let db_path = tempdir.path().join("db"); { - let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap()); - let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::OverlayRecent); + let new_db = Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap(); + let new_db_with_metrics = Arc::new(db::DatabaseWithMetrics::new(new_db)); + let mut rebuilder = + StateRebuilder::new(new_db_with_metrics.clone(), Algorithm::OverlayRecent); let reader = PackedReader::new(&snap_file).unwrap().unwrap(); let flag = AtomicBool::new(false); diff --git a/crates/ethcore/src/spec/spec.rs b/crates/ethcore/src/spec/spec.rs index 0ae5835b9..24688c21d 100644 --- a/crates/ethcore/src/spec/spec.rs +++ b/crates/ethcore/src/spec/spec.rs @@ -970,7 +970,7 @@ impl Spec { let factories = Default::default(); let mut db = journaldb::new( - Arc::new(kvdb_memorydb::create(0)), + Arc::new(db::InMemoryWithMetrics::create(0)), journaldb::Algorithm::Archive, None, ); diff --git a/crates/ethcore/src/test_helpers.rs b/crates/ethcore/src/test_helpers.rs index a8b54e234..57652dd3d 100644 --- a/crates/ethcore/src/test_helpers.rs +++ b/crates/ethcore/src/test_helpers.rs @@ -23,12 +23,12 @@ use blockchain::{ }; use blooms_db; use bytes::Bytes; +use db::KeyValueDB; use ethereum_types::{Address, H256, U256}; use ethkey::KeyPair; use evm::Factory as EvmFactory; use hash::keccak; use io::IoChannel; -use kvdb::KeyValueDB; use kvdb_rocksdb::{self, Database, DatabaseConfig}; use parking_lot::RwLock; use rlp::{self, RlpStream}; @@ -350,6 +350,10 @@ impl BlockChainDB for TestBlockChainDB { } } +impl stats::PrometheusMetrics for TestBlockChainDB { + fn prometheus_metrics(&self, _: &mut stats::PrometheusRegistry) {} +} + /// Creates new test instance of `BlockChainDB` pub fn new_db() -> Arc { let blooms_dir = TempDir::new("").unwrap(); @@ -360,7 +364,9 @@ pub fn new_db() -> Arc { trace_blooms: blooms_db::Database::open(trace_blooms_dir.path()).unwrap(), _blooms_dir: blooms_dir, _trace_blooms_dir: trace_blooms_dir, - key_value: Arc::new(::kvdb_memorydb::create(::db::NUM_COLUMNS.unwrap())), + key_value: Arc::new(ethcore_db::InMemoryWithMetrics::create( + ::db::NUM_COLUMNS.unwrap(), + )), }; Arc::new(db) @@ -374,13 +380,13 @@ pub fn new_temp_db(tempdir: &Path) -> Arc { let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); let key_value_db = Database::open(&db_config, key_value_dir.to_str().unwrap()).unwrap(); - + let key_value_db_with_metrics = ethcore_db::DatabaseWithMetrics::new(key_value_db); let db = TestBlockChainDB { blooms: blooms_db::Database::open(blooms_dir.path()).unwrap(), trace_blooms: blooms_db::Database::open(trace_blooms_dir.path()).unwrap(), _blooms_dir: blooms_dir, _trace_blooms_dir: trace_blooms_dir, - key_value: Arc::new(key_value_db), + key_value: Arc::new(key_value_db_with_metrics), }; Arc::new(db) @@ -413,13 +419,14 @@ pub fn restoration_db_handler( &self.trace_blooms } } + impl stats::PrometheusMetrics for RestorationDB { + fn prometheus_metrics(&self, _: &mut stats::PrometheusRegistry) {} + } impl BlockChainDBHandler for RestorationDBHandler { fn open(&self, db_path: &Path) -> io::Result> { - let key_value = Arc::new(kvdb_rocksdb::Database::open( - &self.config, - &db_path.to_string_lossy(), - )?); + let key_value = kvdb_rocksdb::Database::open(&self.config, &db_path.to_string_lossy())?; + let key_value = Arc::new(db::DatabaseWithMetrics::new(key_value)); let blooms_path = db_path.join("blooms"); let trace_blooms_path = db_path.join("trace_blooms"); fs::create_dir_all(&blooms_path)?; diff --git a/crates/ethcore/sync/src/api.rs b/crates/ethcore/sync/src/api.rs index e5576aaeb..e45196986 100644 --- a/crates/ethcore/sync/src/api.rs +++ b/crates/ethcore/sync/src/api.rs @@ -42,7 +42,7 @@ use ethkey::Secret; use io::TimerToken; use network::IpFilter; use parking_lot::{Mutex, RwLock}; -use stats::{prometheus, prometheus_counter, prometheus_gauge, PrometheusMetrics}; +use stats::{PrometheusMetrics, PrometheusRegistry}; use std::{ net::{AddrParseError, SocketAddr}, @@ -323,11 +323,11 @@ impl SyncProvider for EthSync { } impl PrometheusMetrics for EthSync { - fn prometheus_metrics(&self, r: &mut prometheus::Registry) { + fn prometheus_metrics(&self, r: &mut PrometheusRegistry) { let scalar = |b| if b { 1i64 } else { 0i64 }; let sync_status = self.status(); - prometheus_gauge(r, + r.register_gauge( "sync_status", "WaitingPeers(0), SnapshotManifest(1), SnapshotData(2), SnapshotWaiting(3), Blocks(4), Idle(5), Waiting(6), NewBlocks(7)", match self.eth_handler.sync.status().state { @@ -342,59 +342,50 @@ impl PrometheusMetrics for EthSync { }); for (key, value) in sync_status.item_sizes.iter() { - prometheus_gauge( - r, + r.register_gauge( &key, format!("Total item number of {}", key).as_str(), *value as i64, ); } - prometheus_gauge( - r, + r.register_gauge( "net_peers", "Total number of connected peers", sync_status.num_peers as i64, ); - prometheus_gauge( - r, + r.register_gauge( "net_active_peers", "Total number of active peers", sync_status.num_active_peers as i64, ); - prometheus_counter( - r, + r.register_counter( "sync_blocks_recieved", "Number of blocks downloaded so far", sync_status.blocks_received as i64, ); - prometheus_counter( - r, + r.register_counter( "sync_blocks_total", "Total number of blocks for the sync process", sync_status.blocks_total as i64, ); - prometheus_gauge( - r, + r.register_gauge( "sync_blocks_highest", "Highest block number in the download queue", sync_status.highest_block_number.unwrap_or(0) as i64, ); - prometheus_gauge( - r, + r.register_gauge( "snapshot_download_active", "1 if downloading snapshots", scalar(sync_status.is_snapshot_syncing()), ); - prometheus_gauge( - r, + r.register_gauge( "snapshot_download_chunks", "Snapshot chunks", sync_status.num_snapshot_chunks as i64, ); - prometheus_gauge( - r, + r.register_gauge( "snapshot_download_chunks_done", "Snapshot chunks downloaded", sync_status.snapshot_chunks_done as i64, @@ -408,8 +399,7 @@ impl PrometheusMetrics for EthSync { .manifest_block() .unwrap_or((0, H256::zero())); - prometheus_gauge( - r, + r.register_gauge( "snapshot_create_block", "First block of the current snapshot creation", if let CreationStatus::Ongoing { block_number } = creation { @@ -418,8 +408,7 @@ impl PrometheusMetrics for EthSync { 0 }, ); - prometheus_gauge( - r, + r.register_gauge( "snapshot_restore_block", "First block of the current snapshot restoration", if let RestorationStatus::Ongoing { block_number, .. } = restoration { @@ -428,8 +417,7 @@ impl PrometheusMetrics for EthSync { 0 }, ); - prometheus_gauge( - r, + r.register_gauge( "snapshot_manifest_block", "First block number of the present snapshot", manifest_block_num as i64, diff --git a/crates/rpc/src/v1/tests/helpers/sync_provider.rs b/crates/rpc/src/v1/tests/helpers/sync_provider.rs index 6b58bd46f..c0e87b893 100644 --- a/crates/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/crates/rpc/src/v1/tests/helpers/sync_provider.rs @@ -19,7 +19,7 @@ use ethereum_types::H256; use network::client_version::ClientVersion; use parking_lot::RwLock; -use stats::{prometheus, PrometheusMetrics}; +use stats::{PrometheusMetrics, PrometheusRegistry}; use std::collections::BTreeMap; use sync::{EthProtocolInfo, PeerInfo, SyncProvider, SyncState, SyncStatus, TransactionStats}; @@ -69,7 +69,7 @@ impl TestSyncProvider { } impl PrometheusMetrics for TestSyncProvider { - fn prometheus_metrics(&self, _: &mut prometheus::Registry) {} + fn prometheus_metrics(&self, _: &mut PrometheusRegistry) {} } impl SyncProvider for TestSyncProvider { diff --git a/crates/util/stats/src/lib.rs b/crates/util/stats/src/lib.rs index b6b351bc1..633ef42e7 100644 --- a/crates/util/stats/src/lib.rs +++ b/crates/util/stats/src/lib.rs @@ -26,39 +26,64 @@ use std::{ extern crate log; pub extern crate prometheus; +pub struct PrometheusRegistry { + prefix: String, + registry: prometheus::Registry, +} + +impl PrometheusRegistry { + /// Create a new instance with the specified prefix + pub fn new(prefix: String) -> Self { + Self { + prefix, + registry: prometheus::Registry::new(), + } + } + + /// Get internal registry + pub fn registry(&self) -> &prometheus::Registry { + &self.registry + } + + /// Adds a new prometheus counter with the specified value + pub fn register_counter(&mut self, name: &str, help: &str, value: i64) { + let name = format!("{}{}", self.prefix, name); + let c = prometheus::IntCounter::new(name.as_str(), help) + .expect("name and help must be non-empty"); + c.inc_by(value); + self.registry + .register(Box::new(c)) + .expect("prometheus identifiers must be unique"); + } + + /// Adds a new prometheus gauge with the specified gauge + pub fn register_gauge(&mut self, name: &str, help: &str, value: i64) { + let name = format!("{}{}", self.prefix, name); + let g = prometheus::IntGauge::new(name.as_str(), help) + .expect("name and help must be non-empty"); + g.set(value); + self.registry + .register(Box::new(g)) + .expect("prometheus identifiers must be are unique"); + } + + /// Adds a new prometheus counter with the time spent in running the specified function + pub fn register_optime T, T>(&mut self, name: &str, f: &F) -> T { + let start = Instant::now(); + let t = f(); + let elapsed = start.elapsed(); + self.register_gauge( + &format!("optime_{}", name), + &format!("Time to perform {}", name), + elapsed.as_millis() as i64, + ); + t + } +} + /// Implements a prometheus metrics collector pub trait PrometheusMetrics { - fn prometheus_metrics(&self, registry: &mut prometheus::Registry); -} - -/// Adds a new prometheus counter with the specified value -pub fn prometheus_counter(reg: &mut prometheus::Registry, name: &str, help: &str, value: i64) { - let c = prometheus::IntCounter::new(name, help).expect("name and help must be non-empty"); - c.inc_by(value); - reg.register(Box::new(c)) - .expect("prometheus identifiers must be unique"); -} - -/// Adds a new prometheus gauge with the specified gauge -pub fn prometheus_gauge(reg: &mut prometheus::Registry, name: &str, help: &str, value: i64) { - let g = prometheus::IntGauge::new(name, help).expect("name and help must be non-empty"); - g.set(value); - reg.register(Box::new(g)) - .expect("prometheus identifiers must be are unique"); -} - -/// Adds a new prometheus counter with the time spent in running the specified function -pub fn prometheus_optime T, T>(r: &mut prometheus::Registry, name: &str, f: &F) -> T { - let start = Instant::now(); - let t = f(); - let elapsed = start.elapsed(); - prometheus_gauge( - r, - &format!("optime_{}", name), - &format!("Time to perform {}", name), - elapsed.as_millis() as i64, - ); - t + fn prometheus_metrics(&self, registry: &mut PrometheusRegistry); } /// Sorted corpus of data.