// Copyright 2015-2019 Parity Technologies (UK) Ltd. // This file is part of Parity Ethereum. // Parity Ethereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Ethereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . use std::sync::Arc; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::time::{Instant, Duration}; use ansi_term::Colour::{White, Yellow, Green, Cyan, Blue}; use ansi_term::{Colour, Style}; use atty; use ethcore::client::Client; use client_traits::{BlockInfo, ChainInfo, BlockChainClient, ChainNotify}; use types::{ BlockNumber, chain_notify::NewBlocks, client_types::ClientReport, ids::BlockId, io_message::ClientIoMessage, blockchain_info::BlockChainInfo, verification::VerificationQueueInfo as BlockQueueInfo, snapshot::RestorationStatus, }; use snapshot::SnapshotService as SS; use snapshot::service::Service as SnapshotService; use sync::{LightSyncProvider, LightSync, SyncProvider, ManageNetwork}; use io::{TimerToken, IoContext, IoHandler}; use light::Cache as LightDataCache; use light::client::{LightChainClient, LightChainNotify}; use number_prefix::{binary_prefix, Standalone, Prefixed}; use parity_rpc::is_major_importing_or_waiting; use parity_rpc::informant::RpcStats; use ethereum_types::H256; use parking_lot::{RwLock, Mutex}; /// Format byte counts to standard denominations. pub fn format_bytes(b: usize) -> String { match binary_prefix(b as f64) { Standalone(bytes) => format!("{} bytes", bytes), Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix), } } #[derive(Default)] struct CacheSizes { sizes: ::std::collections::BTreeMap<&'static str, usize>, } impl CacheSizes { fn insert(&mut self, key: &'static str, bytes: usize) { self.sizes.insert(key, bytes); } fn display(&self, style: Style, paint: F) -> String where F: Fn(Style, String) -> String { use std::fmt::Write; let mut buf = String::new(); for (name, &size) in &self.sizes { write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name) .expect("writing to string won't fail unless OOM; qed") } buf } } pub struct SyncInfo { last_imported_block_number: BlockNumber, last_imported_old_block_number: Option, num_peers: usize, max_peers: u32, snapshot_sync: bool, } pub struct Report { importing: bool, chain_info: BlockChainInfo, client_report: ClientReport, queue_info: BlockQueueInfo, cache_sizes: CacheSizes, sync_info: Option, } /// Something which can provide data to the informant. pub trait InformantData: Send + Sync { /// Whether it executes transactions fn executes_transactions(&self) -> bool; /// Whether it is currently importing (also included in `Report`) fn is_major_importing(&self) -> bool; /// Generate a report of blockchain status, memory usage, and sync info. fn report(&self) -> Report; } /// Informant data for a full node. pub struct FullNodeInformantData { pub client: Arc, pub sync: Option>, pub net: Option>, } impl InformantData for FullNodeInformantData { fn executes_transactions(&self) -> bool { true } fn is_major_importing(&self) -> bool { let state = self.sync.as_ref().map(|sync| sync.status().state); is_major_importing_or_waiting(state, self.client.queue_info(), false) } fn report(&self) -> Report { let (client_report, queue_info, blockchain_cache_info) = (self.client.report(), self.client.queue_info(), self.client.blockchain_cache_info()); let chain_info = self.client.chain_info(); let mut cache_sizes = CacheSizes::default(); cache_sizes.insert("db", client_report.state_db_mem); cache_sizes.insert("queue", queue_info.mem_used); cache_sizes.insert("chain", blockchain_cache_info.total()); let importing = self.is_major_importing(); let sync_info = match (self.sync.as_ref(), self.net.as_ref()) { (Some(sync), Some(net)) => { let status = sync.status(); let num_peers_range = net.num_peers_range(); debug_assert!(num_peers_range.end() >= num_peers_range.start()); cache_sizes.insert("sync", status.mem_used); Some(SyncInfo { last_imported_block_number: status.last_imported_block_number.unwrap_or(chain_info.best_block_number), last_imported_old_block_number: status.last_imported_old_block_number, num_peers: status.num_peers, max_peers: status.current_max_peers(*num_peers_range.start(), *num_peers_range.end()), snapshot_sync: status.is_snapshot_syncing(), }) } _ => None }; Report { importing, chain_info, client_report, queue_info, cache_sizes, sync_info, } } } /// Informant data for a light node -- note that the network is required. pub struct LightNodeInformantData { pub client: Arc, pub sync: Arc, pub cache: Arc>, } impl InformantData for LightNodeInformantData { fn executes_transactions(&self) -> bool { false } fn is_major_importing(&self) -> bool { self.sync.is_major_importing() } fn report(&self) -> Report { let (client_report, queue_info, chain_info) = (self.client.report(), self.client.queue_info(), self.client.chain_info()); let mut cache_sizes = CacheSizes::default(); cache_sizes.insert("queue", queue_info.mem_used); cache_sizes.insert("cache", self.cache.lock().mem_used()); let peer_numbers = self.sync.peer_numbers(); let sync_info = Some(SyncInfo { last_imported_block_number: chain_info.best_block_number, last_imported_old_block_number: None, num_peers: peer_numbers.connected, max_peers: peer_numbers.max as u32, snapshot_sync: false, }); Report { importing: self.sync.is_major_importing(), chain_info, client_report, queue_info, cache_sizes, sync_info, } } } pub struct Informant { last_tick: RwLock, with_color: bool, target: T, snapshot: Option>>, rpc_stats: Option>, last_import: Mutex, skipped: AtomicUsize, skipped_txs: AtomicUsize, in_shutdown: AtomicBool, last_report: Mutex, } impl Informant { /// Make a new instance potentially `with_color` output. pub fn new( target: T, snapshot: Option>>, rpc_stats: Option>, with_color: bool, ) -> Self { Informant { last_tick: RwLock::new(Instant::now()), with_color, target, snapshot, rpc_stats, last_import: Mutex::new(Instant::now()), skipped: AtomicUsize::new(0), skipped_txs: AtomicUsize::new(0), in_shutdown: AtomicBool::new(false), last_report: Mutex::new(Default::default()), } } /// Signal that we're shutting down; no more output necessary. pub fn shutdown(&self) { self.in_shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst); } pub fn tick(&self) { let now = Instant::now(); let elapsed = now.duration_since(*self.last_tick.read()); let (client_report, full_report) = { let last_report = self.last_report.lock(); let full_report = self.target.report(); let diffed = full_report.client_report.clone() - &*last_report; (diffed, full_report) }; let Report { importing, chain_info, queue_info, cache_sizes, sync_info, .. } = full_report; let rpc_stats = self.rpc_stats.as_ref(); let snapshot_sync = sync_info.as_ref().map_or(false, |s| s.snapshot_sync) && self.snapshot.as_ref().map_or(false, |s| match s.status() { RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => true, _ => false, } ); if !importing && !snapshot_sync && elapsed < Duration::from_secs(30) { return; } *self.last_tick.write() = now; *self.last_report.lock() = full_report.client_report.clone(); let paint = |c: Style, t: String| match self.with_color && atty::is(atty::Stream::Stdout) { true => format!("{}", c.paint(t)), false => t, }; info!(target: "import", "{} {} {} {}", match importing { true => match snapshot_sync { false => format!("Syncing {} {} {} {}+{} Qed", paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), paint(White.bold(), format!("{}", chain_info.best_block_hash)), if self.target.executes_transactions() { format!("{} blk/s {} tx/s {} Mgas/s", paint(Yellow.bold(), format!("{:7.2}", (client_report.blocks_imported * 1000) as f64 / elapsed.as_millis() as f64)), paint(Yellow.bold(), format!("{:6.1}", (client_report.transactions_applied * 1000) as f64 / elapsed.as_millis() as f64)), paint(Yellow.bold(), format!("{:6.1}", (client_report.gas_processed / 1000).low_u64() as f64 / elapsed.as_millis() as f64)) ) } else { format!("{} hdr/s", paint(Yellow.bold(), format!("{:6.1}", (client_report.blocks_imported * 1000) as f64 / elapsed.as_millis() as f64)) ) }, paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) ), true => { self.snapshot.as_ref().map_or(String::new(), |s| match s.status() { RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => { format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks) }, RestorationStatus::Initializing { chunks_done, state_chunks, block_chunks } => { let total_chunks = state_chunks + block_chunks; // Note that the percentage here can be slightly misleading when // they have chunks already on disk: we'll import the local // chunks first and then download the rest. format!("Snapshot initializing ({}/{} chunks restored, {:.0}%)", chunks_done, total_chunks, (chunks_done as f32 / total_chunks as f32) * 100.0) }, RestorationStatus::Finalizing => { format!("Snapshot finalization under way") } _ => String::new(), } ) }, }, false => String::new(), }, match sync_info.as_ref() { Some(ref sync_info) => format!("{}{}/{} peers", match importing { true => format!("{}", if self.target.executes_transactions() { paint(Green.bold(), format!("{:>8} ", format!("#{}", sync_info.last_imported_block_number))) } else { String::new() } ), false => match sync_info.last_imported_old_block_number { Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("#{}", number)))), None => String::new(), } }, paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)), paint(Cyan.bold(), format!("{:2}", sync_info.max_peers)), ), _ => String::new(), }, cache_sizes.display(Blue.bold(), &paint), match rpc_stats { Some(ref rpc_stats) => format!( "RPC: {} conn, {} req/s, {} µs", paint(Blue.bold(), format!("{:2}", rpc_stats.sessions())), paint(Blue.bold(), format!("{:4}", rpc_stats.requests_rate())), paint(Blue.bold(), format!("{:4}", rpc_stats.approximated_roundtrip())), ), _ => String::new(), }, ); } } impl ChainNotify for Informant { fn new_blocks(&self, new_blocks: NewBlocks) { if new_blocks.has_more_blocks_to_import { return } let mut last_import = self.last_import.lock(); let client = &self.target.client; let importing = self.target.is_major_importing(); let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing; let txs_imported = new_blocks.imported.iter() .take(new_blocks.imported.len().saturating_sub(if ripe { 1 } else { 0 })) .filter_map(|h| client.block(BlockId::Hash(*h))) .map(|b| b.transactions_count()) .sum(); if ripe { if let Some(block) = new_blocks.imported.last().and_then(|h| client.block(BlockId::Hash(*h))) { let header_view = block.header_view(); let size = block.rlp().as_raw().len(); let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + new_blocks.imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported); info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}", Colour::White.bold().paint(format!("#{}", header_view.number())), Colour::White.bold().paint(format!("{}", header_view.hash())), Colour::Yellow.bold().paint(format!("{}", block.transactions_count())), Colour::Yellow.bold().paint(format!("{:.2}", header_view.gas_used().low_u64() as f32 / 1000000f32)), Colour::Purple.bold().paint(format!("{}", new_blocks.duration.as_millis())), Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)), if skipped > 0 { format!(" + another {} block(s) containing {} tx(s)", Colour::Red.bold().paint(format!("{}", skipped)), Colour::Red.bold().paint(format!("{}", skipped_txs)) ) } else { String::new() } ); self.skipped.store(0, AtomicOrdering::Relaxed); self.skipped_txs.store(0, AtomicOrdering::Relaxed); *last_import = Instant::now(); } } else { self.skipped.fetch_add(new_blocks.imported.len(), AtomicOrdering::Relaxed); self.skipped_txs.fetch_add(txs_imported, AtomicOrdering::Relaxed); } } } impl LightChainNotify for Informant { fn new_headers(&self, good: &[H256]) { let mut last_import = self.last_import.lock(); let client = &self.target.client; let importing = self.target.is_major_importing(); let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing; if ripe { if let Some(header) = good.last().and_then(|h| client.block_header(BlockId::Hash(*h))) { info!(target: "import", "Imported {} {} ({} Mgas){}", Colour::White.bold().paint(format!("#{}", header.number())), Colour::White.bold().paint(format!("{}", header.hash())), Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used().low_u64() as f32 / 1000000f32)), if good.len() > 1 { format!(" + another {} header(s)", Colour::Red.bold().paint(format!("{}", good.len() - 1))) } else { String::new() } ); *last_import = Instant::now(); } } } } const INFO_TIMER: TimerToken = 0; impl IoHandler> for Informant where T: InformantData, C: client_traits::Tick + 'static, { fn initialize(&self, io: &IoContext>) { io.register_timer(INFO_TIMER, Duration::from_secs(5)).expect("Error registering timer"); } fn timeout(&self, _io: &IoContext>, timer: TimerToken) { if timer == INFO_TIMER && !self.in_shutdown.load(AtomicOrdering::SeqCst) { self.tick(); } } }