Proper light client informant and more verification of imported headers (#5897)
* do more validation of imported headers in light client * generalize informant with traits * informant implementation for light client * make comment into TODO * fix broken test * disable full checking of headers in light client in sync tests
This commit is contained in:
parent
f0a6b5d401
commit
67c1f71b6e
@ -26,7 +26,7 @@ use ethcore::receipt::Receipt;
|
|||||||
|
|
||||||
use stats::Corpus;
|
use stats::Corpus;
|
||||||
use time::{SteadyTime, Duration};
|
use time::{SteadyTime, Duration};
|
||||||
use util::{U256, H256};
|
use util::{U256, H256, HeapSizeOf};
|
||||||
use util::cache::MemoryLruCache;
|
use util::cache::MemoryLruCache;
|
||||||
|
|
||||||
/// Configuration for how much data to cache.
|
/// Configuration for how much data to cache.
|
||||||
@ -153,6 +153,22 @@ impl Cache {
|
|||||||
pub fn set_gas_price_corpus(&mut self, corpus: Corpus<U256>) {
|
pub fn set_gas_price_corpus(&mut self, corpus: Corpus<U256>) {
|
||||||
self.corpus = Some((corpus, SteadyTime::now()))
|
self.corpus = Some((corpus, SteadyTime::now()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the memory used.
|
||||||
|
pub fn mem_used(&self) -> usize {
|
||||||
|
self.heap_size_of_children()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HeapSizeOf for Cache {
|
||||||
|
fn heap_size_of_children(&self) -> usize {
|
||||||
|
self.headers.current_size()
|
||||||
|
+ self.canon_hashes.current_size()
|
||||||
|
+ self.bodies.current_size()
|
||||||
|
+ self.receipts.current_size()
|
||||||
|
+ self.chain_score.current_size()
|
||||||
|
// TODO: + corpus
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -44,7 +44,7 @@ mod header_chain;
|
|||||||
mod service;
|
mod service;
|
||||||
|
|
||||||
/// Configuration for the light client.
|
/// Configuration for the light client.
|
||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
/// Verification queue config.
|
/// Verification queue config.
|
||||||
pub queue: queue::Config,
|
pub queue: queue::Config,
|
||||||
@ -56,6 +56,21 @@ pub struct Config {
|
|||||||
pub db_compaction: CompactionProfile,
|
pub db_compaction: CompactionProfile,
|
||||||
/// Should db have WAL enabled?
|
/// Should db have WAL enabled?
|
||||||
pub db_wal: bool,
|
pub db_wal: bool,
|
||||||
|
/// Should it do full verification of blocks?
|
||||||
|
pub verify_full: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Config {
|
||||||
|
fn default() -> Config {
|
||||||
|
Config {
|
||||||
|
queue: Default::default(),
|
||||||
|
chain_column: None,
|
||||||
|
db_cache_size: None,
|
||||||
|
db_compaction: CompactionProfile::default(),
|
||||||
|
db_wal: true,
|
||||||
|
verify_full: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait for interacting with the header chain abstractly.
|
/// Trait for interacting with the header chain abstractly.
|
||||||
@ -109,6 +124,9 @@ pub trait LightChainClient: Send + Sync {
|
|||||||
|
|
||||||
/// Get the EIP-86 transition block number.
|
/// Get the EIP-86 transition block number.
|
||||||
fn eip86_transition(&self) -> u64;
|
fn eip86_transition(&self) -> u64;
|
||||||
|
|
||||||
|
/// Get a report of import activity since the last call.
|
||||||
|
fn report(&self) -> ClientReport;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An actor listening to light chain events.
|
/// An actor listening to light chain events.
|
||||||
@ -141,6 +159,7 @@ pub struct Client {
|
|||||||
import_lock: Mutex<()>,
|
import_lock: Mutex<()>,
|
||||||
db: Arc<KeyValueDB>,
|
db: Arc<KeyValueDB>,
|
||||||
listeners: RwLock<Vec<Weak<LightChainNotify>>>,
|
listeners: RwLock<Vec<Weak<LightChainNotify>>>,
|
||||||
|
verify_full: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
@ -156,6 +175,7 @@ impl Client {
|
|||||||
import_lock: Mutex::new(()),
|
import_lock: Mutex::new(()),
|
||||||
db: db,
|
db: db,
|
||||||
listeners: RwLock::new(vec![]),
|
listeners: RwLock::new(vec![]),
|
||||||
|
verify_full: config.verify_full,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -263,6 +283,14 @@ impl Client {
|
|||||||
for verified_header in self.queue.drain(MAX) {
|
for verified_header in self.queue.drain(MAX) {
|
||||||
let (num, hash) = (verified_header.number(), verified_header.hash());
|
let (num, hash) = (verified_header.number(), verified_header.hash());
|
||||||
|
|
||||||
|
if self.verify_full && !self.check_header(&mut bad, &verified_header) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: `epoch_end_signal`, `is_epoch_end`.
|
||||||
|
// proofs we get from the network would be _complete_, whereas we need
|
||||||
|
// _incomplete_ signals
|
||||||
|
|
||||||
let mut tx = self.db.transaction();
|
let mut tx = self.db.transaction();
|
||||||
let pending = match self.chain.insert(&mut tx, verified_header) {
|
let pending = match self.chain.insert(&mut tx, verified_header) {
|
||||||
Ok(pending) => {
|
Ok(pending) => {
|
||||||
@ -273,14 +301,16 @@ impl Client {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e);
|
debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e);
|
||||||
bad.push(hash);
|
bad.push(hash);
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.db.write_buffered(tx);
|
self.db.write_buffered(tx);
|
||||||
self.chain.apply_pending(pending);
|
self.chain.apply_pending(pending);
|
||||||
if let Err(e) = self.db.flush() {
|
}
|
||||||
panic!("Database flush failed: {}. Check disk health and space.", e);
|
|
||||||
}
|
if let Err(e) = self.db.flush() {
|
||||||
|
panic!("Database flush failed: {}. Check disk health and space.", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.queue.mark_as_bad(&bad);
|
self.queue.mark_as_bad(&bad);
|
||||||
@ -291,7 +321,7 @@ impl Client {
|
|||||||
|
|
||||||
/// Get a report about blocks imported.
|
/// Get a report about blocks imported.
|
||||||
pub fn report(&self) -> ClientReport {
|
pub fn report(&self) -> ClientReport {
|
||||||
::std::mem::replace(&mut *self.report.write(), ClientReport::default())
|
self.report.read().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get blockchain mem usage in bytes.
|
/// Get blockchain mem usage in bytes.
|
||||||
@ -350,6 +380,37 @@ impl Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// return true if should skip, false otherwise. may push onto bad if
|
||||||
|
// should skip.
|
||||||
|
fn check_header(&self, bad: &mut Vec<H256>, verified_header: &Header) -> bool {
|
||||||
|
let hash = verified_header.hash();
|
||||||
|
let parent_header = match self.chain.block_header(BlockId::Hash(*verified_header.parent_hash())) {
|
||||||
|
Some(header) => header,
|
||||||
|
None => return false, // skip import of block with missing parent.
|
||||||
|
};
|
||||||
|
|
||||||
|
// Verify Block Family
|
||||||
|
let verify_family_result = self.engine.verify_block_family(&verified_header, &parent_header.decode(), None);
|
||||||
|
if let Err(e) = verify_family_result {
|
||||||
|
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}",
|
||||||
|
verified_header.number(), verified_header.hash(), e);
|
||||||
|
bad.push(hash);
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
// "external" verification.
|
||||||
|
let verify_external_result = self.engine.verify_block_external(&verified_header, None);
|
||||||
|
if let Err(e) = verify_external_result {
|
||||||
|
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}",
|
||||||
|
verified_header.number(), verified_header.hash(), e);
|
||||||
|
|
||||||
|
bad.push(hash);
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LightChainClient for Client {
|
impl LightChainClient for Client {
|
||||||
@ -414,4 +475,8 @@ impl LightChainClient for Client {
|
|||||||
fn eip86_transition(&self) -> u64 {
|
fn eip86_transition(&self) -> u64 {
|
||||||
self.engine().params().eip86_transition
|
self.engine().params().eip86_transition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn report(&self) -> ClientReport {
|
||||||
|
Client::report(self)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,6 +112,22 @@ impl ClientReport {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a> ::std::ops::Sub<&'a ClientReport> for ClientReport {
|
||||||
|
type Output = Self;
|
||||||
|
|
||||||
|
fn sub(mut self, other: &'a ClientReport) -> Self {
|
||||||
|
let higher_mem = ::std::cmp::max(self.state_db_mem, other.state_db_mem);
|
||||||
|
let lower_mem = ::std::cmp::min(self.state_db_mem, other.state_db_mem);
|
||||||
|
|
||||||
|
self.blocks_imported -= other.blocks_imported;
|
||||||
|
self.transactions_applied -= other.transactions_applied;
|
||||||
|
self.gas_processed = self.gas_processed - other.gas_processed;
|
||||||
|
self.state_db_mem = higher_mem - lower_mem;
|
||||||
|
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct SleepState {
|
struct SleepState {
|
||||||
last_activity: Option<Instant>,
|
last_activity: Option<Instant>,
|
||||||
last_autosleep: Option<Instant>,
|
last_autosleep: Option<Instant>,
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
//! Verification queue info types
|
//! Verification queue info types
|
||||||
|
|
||||||
/// Verification queue status
|
/// Verification queue status
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
#[cfg_attr(feature = "ipc", binary)]
|
#[cfg_attr(feature = "ipc", binary)]
|
||||||
pub struct VerificationQueueInfo {
|
pub struct VerificationQueueInfo {
|
||||||
/// Number of queued items pending verification
|
/// Number of queued items pending verification
|
||||||
|
@ -29,7 +29,7 @@ use ethcore::error::ImportError;
|
|||||||
use ethcore::miner::Miner;
|
use ethcore::miner::Miner;
|
||||||
use ethcore::verification::queue::VerifierSettings;
|
use ethcore::verification::queue::VerifierSettings;
|
||||||
use cache::CacheConfig;
|
use cache::CacheConfig;
|
||||||
use informant::{Informant, MillisecondDuration};
|
use informant::{Informant, FullNodeInformantData, MillisecondDuration};
|
||||||
use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool};
|
use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool};
|
||||||
use helpers::{to_client_config, execute_upgrades};
|
use helpers::{to_client_config, execute_upgrades};
|
||||||
use dir::Directories;
|
use dir::Directories;
|
||||||
@ -238,7 +238,17 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let informant = Arc::new(Informant::new(client.clone(), None, None, None, None, cmd.with_color));
|
let informant = Arc::new(Informant::new(
|
||||||
|
FullNodeInformantData {
|
||||||
|
client: client.clone(),
|
||||||
|
sync: None,
|
||||||
|
net: None,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
cmd.with_color,
|
||||||
|
));
|
||||||
|
|
||||||
service.register_io_handler(informant).map_err(|_| "Unable to register informant handler".to_owned())?;
|
service.register_io_handler(informant).map_err(|_| "Unable to register informant handler".to_owned())?;
|
||||||
|
|
||||||
let do_import = |bytes| {
|
let do_import = |bytes| {
|
||||||
|
@ -21,32 +21,21 @@ use self::ansi_term::Style;
|
|||||||
use std::sync::{Arc};
|
use std::sync::{Arc};
|
||||||
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
|
||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
|
|
||||||
|
use ethcore::client::*;
|
||||||
|
use ethcore::header::BlockNumber;
|
||||||
|
use ethcore::service::ClientIoMessage;
|
||||||
|
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
|
||||||
|
use ethcore::snapshot::service::Service as SnapshotService;
|
||||||
|
use ethsync::{LightSyncProvider, LightSync, SyncProvider, ManageNetwork};
|
||||||
use io::{TimerToken, IoContext, IoHandler};
|
use io::{TimerToken, IoContext, IoHandler};
|
||||||
use isatty::{stdout_isatty};
|
use isatty::{stdout_isatty};
|
||||||
use ethsync::{SyncProvider, ManageNetwork};
|
use light::Cache as LightDataCache;
|
||||||
use util::{RwLock, Mutex, H256, Colour, Bytes};
|
use light::client::LightChainClient;
|
||||||
use ethcore::client::*;
|
|
||||||
use ethcore::service::ClientIoMessage;
|
|
||||||
use ethcore::snapshot::service::Service as SnapshotService;
|
|
||||||
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
|
|
||||||
use number_prefix::{binary_prefix, Standalone, Prefixed};
|
use number_prefix::{binary_prefix, Standalone, Prefixed};
|
||||||
use parity_rpc::{is_major_importing};
|
use parity_rpc::{is_major_importing};
|
||||||
use parity_rpc::informant::RpcStats;
|
use parity_rpc::informant::RpcStats;
|
||||||
|
use util::{RwLock, Mutex, H256, Colour, Bytes};
|
||||||
pub struct Informant {
|
|
||||||
report: RwLock<Option<ClientReport>>,
|
|
||||||
last_tick: RwLock<Instant>,
|
|
||||||
with_color: bool,
|
|
||||||
client: Arc<Client>,
|
|
||||||
snapshot: Option<Arc<SnapshotService>>,
|
|
||||||
sync: Option<Arc<SyncProvider>>,
|
|
||||||
net: Option<Arc<ManageNetwork>>,
|
|
||||||
rpc_stats: Option<Arc<RpcStats>>,
|
|
||||||
last_import: Mutex<Instant>,
|
|
||||||
skipped: AtomicUsize,
|
|
||||||
skipped_txs: AtomicUsize,
|
|
||||||
in_shutdown: AtomicBool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Format byte counts to standard denominations.
|
/// Format byte counts to standard denominations.
|
||||||
pub fn format_bytes(b: usize) -> String {
|
pub fn format_bytes(b: usize) -> String {
|
||||||
@ -68,29 +57,188 @@ impl MillisecondDuration for Duration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Informant {
|
#[derive(Default)]
|
||||||
|
struct CacheSizes {
|
||||||
|
sizes: ::std::collections::BTreeMap<&'static str, usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CacheSizes {
|
||||||
|
fn insert(&mut self, key: &'static str, bytes: usize) {
|
||||||
|
self.sizes.insert(key, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn display<F>(&self, style: Style, paint: F) -> String
|
||||||
|
where F: Fn(Style, String) -> String
|
||||||
|
{
|
||||||
|
use std::fmt::Write;
|
||||||
|
|
||||||
|
let mut buf = String::new();
|
||||||
|
for (name, &size) in &self.sizes {
|
||||||
|
|
||||||
|
write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name)
|
||||||
|
.expect("writing to string won't fail unless OOM; qed")
|
||||||
|
}
|
||||||
|
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SyncInfo {
|
||||||
|
last_imported_block_number: BlockNumber,
|
||||||
|
last_imported_old_block_number: Option<BlockNumber>,
|
||||||
|
num_peers: usize,
|
||||||
|
max_peers: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Report {
|
||||||
|
importing: bool,
|
||||||
|
chain_info: BlockChainInfo,
|
||||||
|
client_report: ClientReport,
|
||||||
|
queue_info: BlockQueueInfo,
|
||||||
|
cache_sizes: CacheSizes,
|
||||||
|
sync_info: Option<SyncInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Something which can provide data to the informant.
|
||||||
|
pub trait InformantData: Send + Sync {
|
||||||
|
/// Whether it executes transactions
|
||||||
|
fn executes_transactions(&self) -> bool;
|
||||||
|
|
||||||
|
/// Whether it is currently importing (also included in `Report`)
|
||||||
|
fn is_major_importing(&self) -> bool;
|
||||||
|
|
||||||
|
/// Generate a report of blockchain status, memory usage, and sync info.
|
||||||
|
fn report(&self) -> Report;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Informant data for a full node.
|
||||||
|
pub struct FullNodeInformantData {
|
||||||
|
pub client: Arc<Client>,
|
||||||
|
pub sync: Option<Arc<SyncProvider>>,
|
||||||
|
pub net: Option<Arc<ManageNetwork>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InformantData for FullNodeInformantData {
|
||||||
|
fn executes_transactions(&self) -> bool { true }
|
||||||
|
|
||||||
|
fn is_major_importing(&self) -> bool {
|
||||||
|
let state = self.sync.as_ref().map(|sync| sync.status().state);
|
||||||
|
is_major_importing(state, self.client.queue_info())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report(&self) -> Report {
|
||||||
|
let (client_report, queue_info, blockchain_cache_info) =
|
||||||
|
(self.client.report(), self.client.queue_info(), self.client.blockchain_cache_info());
|
||||||
|
|
||||||
|
let chain_info = self.client.chain_info();
|
||||||
|
|
||||||
|
let mut cache_sizes = CacheSizes::default();
|
||||||
|
cache_sizes.insert("db", client_report.state_db_mem);
|
||||||
|
cache_sizes.insert("queue", queue_info.mem_used);
|
||||||
|
cache_sizes.insert("chain", blockchain_cache_info.total());
|
||||||
|
|
||||||
|
let (importing, sync_info) = match (self.sync.as_ref(), self.net.as_ref()) {
|
||||||
|
(Some(sync), Some(net)) => {
|
||||||
|
let status = sync.status();
|
||||||
|
let net_config = net.network_config();
|
||||||
|
|
||||||
|
cache_sizes.insert("sync", status.mem_used);
|
||||||
|
|
||||||
|
let importing = is_major_importing(Some(status.state), queue_info.clone());
|
||||||
|
(importing, Some(SyncInfo {
|
||||||
|
last_imported_block_number: status.last_imported_block_number.unwrap_or(chain_info.best_block_number),
|
||||||
|
last_imported_old_block_number: status.last_imported_old_block_number,
|
||||||
|
num_peers: status.num_peers,
|
||||||
|
max_peers: status.current_max_peers(net_config.min_peers, net_config.max_peers),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
_ => (is_major_importing(None, queue_info.clone()), None),
|
||||||
|
};
|
||||||
|
|
||||||
|
Report {
|
||||||
|
importing,
|
||||||
|
chain_info,
|
||||||
|
client_report,
|
||||||
|
queue_info,
|
||||||
|
cache_sizes,
|
||||||
|
sync_info,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Informant data for a light node -- note that the network is required.
|
||||||
|
pub struct LightNodeInformantData {
|
||||||
|
pub client: Arc<LightChainClient>,
|
||||||
|
pub sync: Arc<LightSync>,
|
||||||
|
pub cache: Arc<Mutex<LightDataCache>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InformantData for LightNodeInformantData {
|
||||||
|
fn executes_transactions(&self) -> bool { false }
|
||||||
|
|
||||||
|
fn is_major_importing(&self) -> bool {
|
||||||
|
self.sync.is_major_importing()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report(&self) -> Report {
|
||||||
|
let (client_report, queue_info, chain_info) =
|
||||||
|
(self.client.report(), self.client.queue_info(), self.client.chain_info());
|
||||||
|
|
||||||
|
let mut cache_sizes = CacheSizes::default();
|
||||||
|
cache_sizes.insert("queue", queue_info.mem_used);
|
||||||
|
cache_sizes.insert("cache", self.cache.lock().mem_used());
|
||||||
|
|
||||||
|
let peer_numbers = self.sync.peer_numbers();
|
||||||
|
let sync_info = Some(SyncInfo {
|
||||||
|
last_imported_block_number: chain_info.best_block_number,
|
||||||
|
last_imported_old_block_number: None,
|
||||||
|
num_peers: peer_numbers.connected,
|
||||||
|
max_peers: peer_numbers.max as u32,
|
||||||
|
});
|
||||||
|
|
||||||
|
Report {
|
||||||
|
importing: self.sync.is_major_importing(),
|
||||||
|
chain_info,
|
||||||
|
client_report,
|
||||||
|
queue_info,
|
||||||
|
cache_sizes,
|
||||||
|
sync_info,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Informant<T> {
|
||||||
|
last_tick: RwLock<Instant>,
|
||||||
|
with_color: bool,
|
||||||
|
target: T,
|
||||||
|
snapshot: Option<Arc<SnapshotService>>,
|
||||||
|
rpc_stats: Option<Arc<RpcStats>>,
|
||||||
|
last_import: Mutex<Instant>,
|
||||||
|
skipped: AtomicUsize,
|
||||||
|
skipped_txs: AtomicUsize,
|
||||||
|
in_shutdown: AtomicBool,
|
||||||
|
last_report: Mutex<ClientReport>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: InformantData> Informant<T> {
|
||||||
/// Make a new instance potentially `with_color` output.
|
/// Make a new instance potentially `with_color` output.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
client: Arc<Client>,
|
target: T,
|
||||||
sync: Option<Arc<SyncProvider>>,
|
|
||||||
net: Option<Arc<ManageNetwork>>,
|
|
||||||
snapshot: Option<Arc<SnapshotService>>,
|
snapshot: Option<Arc<SnapshotService>>,
|
||||||
rpc_stats: Option<Arc<RpcStats>>,
|
rpc_stats: Option<Arc<RpcStats>>,
|
||||||
with_color: bool,
|
with_color: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Informant {
|
Informant {
|
||||||
report: RwLock::new(None),
|
|
||||||
last_tick: RwLock::new(Instant::now()),
|
last_tick: RwLock::new(Instant::now()),
|
||||||
with_color: with_color,
|
with_color: with_color,
|
||||||
client: client,
|
target: target,
|
||||||
snapshot: snapshot,
|
snapshot: snapshot,
|
||||||
sync: sync,
|
|
||||||
net: net,
|
|
||||||
rpc_stats: rpc_stats,
|
rpc_stats: rpc_stats,
|
||||||
last_import: Mutex::new(Instant::now()),
|
last_import: Mutex::new(Instant::now()),
|
||||||
skipped: AtomicUsize::new(0),
|
skipped: AtomicUsize::new(0),
|
||||||
skipped_txs: AtomicUsize::new(0),
|
skipped_txs: AtomicUsize::new(0),
|
||||||
in_shutdown: AtomicBool::new(false),
|
in_shutdown: AtomicBool::new(false),
|
||||||
|
last_report: Mutex::new(Default::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,14 +254,24 @@ impl Informant {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let chain_info = self.client.chain_info();
|
let Report {
|
||||||
let queue_info = self.client.queue_info();
|
importing,
|
||||||
let cache_info = self.client.blockchain_cache_info();
|
chain_info,
|
||||||
let network_config = self.net.as_ref().map(|n| n.network_config());
|
client_report,
|
||||||
let sync_status = self.sync.as_ref().map(|s| s.status());
|
queue_info,
|
||||||
|
cache_sizes,
|
||||||
|
sync_info,
|
||||||
|
} = self.target.report();
|
||||||
|
|
||||||
|
let client_report = {
|
||||||
|
let mut last_report = self.last_report.lock();
|
||||||
|
let diffed = client_report.clone() - &*last_report;
|
||||||
|
*last_report = client_report.clone();
|
||||||
|
diffed
|
||||||
|
};
|
||||||
|
|
||||||
let rpc_stats = self.rpc_stats.as_ref();
|
let rpc_stats = self.rpc_stats.as_ref();
|
||||||
|
|
||||||
let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info());
|
|
||||||
let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s|
|
let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s|
|
||||||
match s.status() {
|
match s.status() {
|
||||||
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
|
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
|
||||||
@ -128,9 +286,6 @@ impl Informant {
|
|||||||
|
|
||||||
*self.last_tick.write() = Instant::now();
|
*self.last_tick.write() = Instant::now();
|
||||||
|
|
||||||
let mut write_report = self.report.write();
|
|
||||||
let report = self.client.report();
|
|
||||||
|
|
||||||
let paint = |c: Style, t: String| match self.with_color && stdout_isatty() {
|
let paint = |c: Style, t: String| match self.with_color && stdout_isatty() {
|
||||||
true => format!("{}", c.paint(t)),
|
true => format!("{}", c.paint(t)),
|
||||||
false => t,
|
false => t,
|
||||||
@ -142,13 +297,16 @@ impl Informant {
|
|||||||
false => format!("Syncing {} {} {} {}+{} Qed",
|
false => format!("Syncing {} {} {} {}+{} Qed",
|
||||||
paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))),
|
paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))),
|
||||||
paint(White.bold(), format!("{}", chain_info.best_block_hash)),
|
paint(White.bold(), format!("{}", chain_info.best_block_hash)),
|
||||||
{
|
if self.target.executes_transactions() {
|
||||||
let last_report = match *write_report { Some(ref last_report) => last_report.clone(), _ => ClientReport::default() };
|
|
||||||
format!("{} blk/s {} tx/s {} Mgas/s",
|
format!("{} blk/s {} tx/s {} Mgas/s",
|
||||||
paint(Yellow.bold(), format!("{:4}", ((report.blocks_imported - last_report.blocks_imported) * 1000) as u64 / elapsed.as_milliseconds())),
|
paint(Yellow.bold(), format!("{:4}", (client_report.blocks_imported * 1000) as u64 / elapsed.as_milliseconds())),
|
||||||
paint(Yellow.bold(), format!("{:4}", ((report.transactions_applied - last_report.transactions_applied) * 1000) as u64 / elapsed.as_milliseconds())),
|
paint(Yellow.bold(), format!("{:4}", (client_report.transactions_applied * 1000) as u64 / elapsed.as_milliseconds())),
|
||||||
paint(Yellow.bold(), format!("{:3}", ((report.gas_processed - last_report.gas_processed) / From::from(elapsed.as_milliseconds() * 1000)).low_u64()))
|
paint(Yellow.bold(), format!("{:3}", (client_report.gas_processed / From::from(elapsed.as_milliseconds() * 1000)).low_u64()))
|
||||||
)
|
)
|
||||||
|
} else {
|
||||||
|
format!("{} hdr/s",
|
||||||
|
paint(Yellow.bold(), format!("{:4}", (client_report.blocks_imported * 1000) as u64 / elapsed.as_milliseconds()))
|
||||||
|
)
|
||||||
},
|
},
|
||||||
paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)),
|
paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)),
|
||||||
paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size))
|
paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size))
|
||||||
@ -157,29 +315,21 @@ impl Informant {
|
|||||||
},
|
},
|
||||||
false => String::new(),
|
false => String::new(),
|
||||||
},
|
},
|
||||||
match (&sync_status, &network_config) {
|
match sync_info.as_ref() {
|
||||||
(&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{} peers",
|
Some(ref sync_info) => format!("{}{}/{} peers",
|
||||||
match importing {
|
match importing {
|
||||||
true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))),
|
true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number)))),
|
||||||
false => match sync_info.last_imported_old_block_number {
|
false => match sync_info.last_imported_old_block_number {
|
||||||
Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("#{}", number)))),
|
Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("#{}", number)))),
|
||||||
None => String::new(),
|
None => String::new(),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)),
|
paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)),
|
||||||
paint(Cyan.bold(), format!("{:2}", sync_info.current_max_peers(net_config.min_peers, net_config.max_peers))),
|
paint(Cyan.bold(), format!("{:2}", sync_info.max_peers)),
|
||||||
),
|
),
|
||||||
_ => String::new(),
|
_ => String::new(),
|
||||||
},
|
},
|
||||||
format!("{} db {} chain {} queue{}",
|
cache_sizes.display(Blue.bold(), &paint),
|
||||||
paint(Blue.bold(), format!("{:>8}", format_bytes(report.state_db_mem))),
|
|
||||||
paint(Blue.bold(), format!("{:>8}", format_bytes(cache_info.total()))),
|
|
||||||
paint(Blue.bold(), format!("{:>8}", format_bytes(queue_info.mem_used))),
|
|
||||||
match sync_status {
|
|
||||||
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", format_bytes(sync_info.mem_used)))),
|
|
||||||
_ => String::new(),
|
|
||||||
}
|
|
||||||
),
|
|
||||||
match rpc_stats {
|
match rpc_stats {
|
||||||
Some(ref rpc_stats) => format!(
|
Some(ref rpc_stats) => format!(
|
||||||
"RPC: {} conn, {} req/s, {} µs",
|
"RPC: {} conn, {} req/s, {} µs",
|
||||||
@ -190,25 +340,24 @@ impl Informant {
|
|||||||
_ => String::new(),
|
_ => String::new(),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
*write_report = Some(report);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChainNotify for Informant {
|
impl ChainNotify for Informant<FullNodeInformantData> {
|
||||||
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: u64) {
|
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: u64) {
|
||||||
let mut last_import = self.last_import.lock();
|
let mut last_import = self.last_import.lock();
|
||||||
let sync_state = self.sync.as_ref().map(|s| s.status().state);
|
let client = &self.target.client;
|
||||||
let importing = is_major_importing(sync_state, self.client.queue_info());
|
|
||||||
|
let importing = self.target.is_major_importing();
|
||||||
let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing;
|
let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing;
|
||||||
let txs_imported = imported.iter()
|
let txs_imported = imported.iter()
|
||||||
.take(imported.len().saturating_sub(if ripe { 1 } else { 0 }))
|
.take(imported.len().saturating_sub(if ripe { 1 } else { 0 }))
|
||||||
.filter_map(|h| self.client.block(BlockId::Hash(*h)))
|
.filter_map(|h| client.block(BlockId::Hash(*h)))
|
||||||
.map(|b| b.transactions_count())
|
.map(|b| b.transactions_count())
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
if ripe {
|
if ripe {
|
||||||
if let Some(block) = imported.last().and_then(|h| self.client.block(BlockId::Hash(*h))) {
|
if let Some(block) = imported.last().and_then(|h| client.block(BlockId::Hash(*h))) {
|
||||||
let header_view = block.header_view();
|
let header_view = block.header_view();
|
||||||
let size = block.rlp().as_raw().len();
|
let size = block.rlp().as_raw().len();
|
||||||
let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported);
|
let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported);
|
||||||
@ -241,7 +390,7 @@ impl ChainNotify for Informant {
|
|||||||
|
|
||||||
const INFO_TIMER: TimerToken = 0;
|
const INFO_TIMER: TimerToken = 0;
|
||||||
|
|
||||||
impl IoHandler<ClientIoMessage> for Informant {
|
impl<T: InformantData> IoHandler<ClientIoMessage> for Informant<T> {
|
||||||
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
|
||||||
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
|
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
|
||||||
}
|
}
|
||||||
|
@ -16,26 +16,27 @@
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::net::{TcpListener};
|
use std::net::{TcpListener};
|
||||||
|
|
||||||
use ctrlc::CtrlC;
|
use ctrlc::CtrlC;
|
||||||
use fdlimit::raise_fd_limit;
|
|
||||||
use parity_rpc::{NetworkSettings, informant, is_major_importing};
|
|
||||||
use ethsync::NetworkConfiguration;
|
|
||||||
use util::{Colour, version, Mutex, Condvar};
|
|
||||||
use ethcore_logger::{Config as LogConfig, RotatingLogger};
|
use ethcore_logger::{Config as LogConfig, RotatingLogger};
|
||||||
use ethcore::miner::{StratumOptions, Stratum};
|
|
||||||
use ethcore::client::{Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient};
|
|
||||||
use ethcore::service::ClientService;
|
|
||||||
use ethcore::account_provider::{AccountProvider, AccountProviderSettings};
|
use ethcore::account_provider::{AccountProvider, AccountProviderSettings};
|
||||||
|
use ethcore::client::{Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient};
|
||||||
|
use ethcore::ethstore::ethkey;
|
||||||
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
|
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
|
||||||
|
use ethcore::miner::{StratumOptions, Stratum};
|
||||||
|
use ethcore::service::ClientService;
|
||||||
use ethcore::snapshot;
|
use ethcore::snapshot;
|
||||||
use ethcore::verification::queue::VerifierSettings;
|
use ethcore::verification::queue::VerifierSettings;
|
||||||
use ethcore::ethstore::ethkey;
|
use ethsync::NetworkConfiguration;
|
||||||
use light::Cache as LightDataCache;
|
|
||||||
use ethsync::SyncConfig;
|
use ethsync::SyncConfig;
|
||||||
use informant::Informant;
|
use fdlimit::raise_fd_limit;
|
||||||
use updater::{UpdatePolicy, Updater};
|
|
||||||
use parity_reactor::EventLoop;
|
|
||||||
use hash_fetch::fetch::{Fetch, Client as FetchClient};
|
use hash_fetch::fetch::{Fetch, Client as FetchClient};
|
||||||
|
use informant::{Informant, LightNodeInformantData, FullNodeInformantData};
|
||||||
|
use light::Cache as LightDataCache;
|
||||||
|
use parity_reactor::EventLoop;
|
||||||
|
use parity_rpc::{NetworkSettings, informant, is_major_importing};
|
||||||
|
use updater::{UpdatePolicy, Updater};
|
||||||
|
use util::{Colour, version, Mutex, Condvar};
|
||||||
|
|
||||||
use params::{
|
use params::{
|
||||||
SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch,
|
SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch,
|
||||||
@ -209,6 +210,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
|
|||||||
db_cache_size: Some(cmd.cache_config.blockchain() as usize * 1024 * 1024),
|
db_cache_size: Some(cmd.cache_config.blockchain() as usize * 1024 * 1024),
|
||||||
db_compaction: compaction,
|
db_compaction: compaction,
|
||||||
db_wal: cmd.wal,
|
db_wal: cmd.wal,
|
||||||
|
verify_full: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024;
|
config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024;
|
||||||
@ -300,7 +302,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
|
|||||||
logger: logger,
|
logger: logger,
|
||||||
settings: Arc::new(cmd.net_settings),
|
settings: Arc::new(cmd.net_settings),
|
||||||
on_demand: on_demand,
|
on_demand: on_demand,
|
||||||
cache: cache,
|
cache: cache.clone(),
|
||||||
transaction_queue: txq,
|
transaction_queue: txq,
|
||||||
dapps_service: dapps_service,
|
dapps_service: dapps_service,
|
||||||
dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()),
|
dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()),
|
||||||
@ -322,16 +324,25 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
|
|||||||
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
|
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
|
||||||
let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;
|
let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;
|
||||||
|
|
||||||
// minimal informant thread. Just prints block number every 5 seconds.
|
// the informant
|
||||||
// TODO: integrate with informant.rs
|
let informant = Arc::new(Informant::new(
|
||||||
let informant_client = service.client().clone();
|
LightNodeInformantData {
|
||||||
::std::thread::spawn(move || loop {
|
client: service.client().clone(),
|
||||||
info!("#{}", informant_client.best_block_header().number());
|
sync: light_sync.clone(),
|
||||||
::std::thread::sleep(::std::time::Duration::from_secs(5));
|
cache: cache,
|
||||||
});
|
},
|
||||||
|
None,
|
||||||
|
Some(rpc_stats),
|
||||||
|
cmd.logger_config.color,
|
||||||
|
));
|
||||||
|
|
||||||
// wait for ctrl-c.
|
service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?;
|
||||||
Ok(wait_for_exit(None, None, can_restart))
|
|
||||||
|
// wait for ctrl-c and then shut down the informant.
|
||||||
|
let res = wait_for_exit(None, None, can_restart);
|
||||||
|
informant.shutdown();
|
||||||
|
|
||||||
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
|
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
|
||||||
@ -672,9 +683,11 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
|
|||||||
|
|
||||||
// the informant
|
// the informant
|
||||||
let informant = Arc::new(Informant::new(
|
let informant = Arc::new(Informant::new(
|
||||||
service.client(),
|
FullNodeInformantData {
|
||||||
Some(sync_provider.clone()),
|
client: service.client(),
|
||||||
Some(manage_network.clone()),
|
sync: Some(sync_provider.clone()),
|
||||||
|
net: Some(manage_network.clone()),
|
||||||
|
},
|
||||||
Some(snapshot_service.clone()),
|
Some(snapshot_service.clone()),
|
||||||
Some(rpc_stats.clone()),
|
Some(rpc_stats.clone()),
|
||||||
cmd.logger_config.color,
|
cmd.logger_config.color,
|
||||||
|
@ -211,8 +211,12 @@ impl TestNet<Peer> {
|
|||||||
pub fn light(n_light: usize, n_full: usize) -> Self {
|
pub fn light(n_light: usize, n_full: usize) -> Self {
|
||||||
let mut peers = Vec::with_capacity(n_light + n_full);
|
let mut peers = Vec::with_capacity(n_light + n_full);
|
||||||
for _ in 0..n_light {
|
for _ in 0..n_light {
|
||||||
|
let mut config = ::light::client::Config::default();
|
||||||
|
|
||||||
|
// skip full verification because the blocks are bad.
|
||||||
|
config.verify_full = false;
|
||||||
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
|
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
|
||||||
let client = LightClient::in_memory(Default::default(), &Spec::new_test(), IoChannel::disconnected(), cache);
|
let client = LightClient::in_memory(config, &Spec::new_test(), IoChannel::disconnected(), cache);
|
||||||
peers.push(Arc::new(Peer::new_light(Arc::new(client))))
|
peers.push(Arc::new(Peer::new_light(Arc::new(client))))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user