// Copyright 2015-2020 Parity Technologies (UK) Ltd. // This file is part of OpenEthereum. // OpenEthereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // OpenEthereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with OpenEthereum. If not, see . extern crate ansi_term; use self::ansi_term::{ Colour, Colour::{Blue, Cyan, Green, White, Yellow}, Style, }; use std::{ sync::{ atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}, Arc, }, time::{Duration, Instant}, }; use atty; use ethcore::{ client::{ BlockChainClient, BlockChainInfo, BlockId, BlockInfo, BlockQueueInfo, ChainInfo, ChainNotify, Client, ClientIoMessage, ClientReport, NewBlocks, }, snapshot::{service::Service as SnapshotService, RestorationStatus, SnapshotService as SS}, }; use io::{IoContext, IoHandler, TimerToken}; use number_prefix::{binary_prefix, Prefixed, Standalone}; use parity_rpc::{informant::RpcStats, is_major_importing_or_waiting}; use parking_lot::{Mutex, RwLock}; use sync::{ManageNetwork, SyncProvider}; use types::BlockNumber; /// Format byte counts to standard denominations. pub fn format_bytes(b: usize) -> String { match binary_prefix(b as f64) { Standalone(bytes) => format!("{} bytes", bytes), Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix), } } /// Something that can be converted to milliseconds. pub trait MillisecondDuration { /// Get the value in milliseconds. fn as_milliseconds(&self) -> u64; } impl MillisecondDuration for Duration { fn as_milliseconds(&self) -> u64 { self.as_secs() * 1000 + self.subsec_nanos() as u64 / 1_000_000 } } #[derive(Default)] struct CacheSizes { sizes: ::std::collections::BTreeMap<&'static str, usize>, } impl CacheSizes { fn insert(&mut self, key: &'static str, bytes: usize) { self.sizes.insert(key, bytes); } fn display(&self, style: Style, paint: F) -> String where F: Fn(Style, String) -> String, { use std::fmt::Write; let mut buf = String::new(); for (name, &size) in &self.sizes { write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name) .expect("writing to string won't fail unless OOM; qed") } buf } } pub struct SyncInfo { last_imported_block_number: BlockNumber, last_imported_ancient_number: Option, num_peers: usize, max_peers: u32, snapshot_sync: bool, } pub struct Report { importing: bool, chain_info: BlockChainInfo, client_report: ClientReport, queue_info: BlockQueueInfo, cache_sizes: CacheSizes, sync_info: Option, } /// Something which can provide data to the informant. pub trait InformantData: Send + Sync { /// Whether it executes transactions fn executes_transactions(&self) -> bool; /// Whether it is currently importing (also included in `Report`) fn is_major_importing(&self) -> bool; /// Generate a report of blockchain status, memory usage, and sync info. fn report(&self) -> Report; } /// Informant data for a full node. pub struct FullNodeInformantData { pub client: Arc, pub sync: Option>, pub net: Option>, } impl InformantData for FullNodeInformantData { fn executes_transactions(&self) -> bool { true } fn is_major_importing(&self) -> bool { let state = self.sync.as_ref().map(|sync| sync.status().state); is_major_importing_or_waiting(state, self.client.queue_info(), false) } fn report(&self) -> Report { let (client_report, queue_info, blockchain_cache_info) = ( self.client.report(), self.client.queue_info(), self.client.blockchain_cache_info(), ); let chain_info = self.client.chain_info(); let mut cache_sizes = CacheSizes::default(); cache_sizes.insert("queue", queue_info.mem_used); cache_sizes.insert("chain", blockchain_cache_info.total()); let importing = self.is_major_importing(); let sync_info = match (self.sync.as_ref(), self.net.as_ref()) { (Some(sync), Some(net)) => { let status = sync.status(); let num_peers_range = net.num_peers_range(); debug_assert!(num_peers_range.end() >= num_peers_range.start()); Some(SyncInfo { last_imported_block_number: status .last_imported_block_number .unwrap_or(chain_info.best_block_number), last_imported_ancient_number: status.last_imported_old_block_number, num_peers: status.num_peers, max_peers: status .current_max_peers(*num_peers_range.start(), *num_peers_range.end()), snapshot_sync: status.is_snapshot_syncing(), }) } _ => None, }; Report { importing, chain_info, client_report, queue_info, cache_sizes, sync_info, } } } pub struct Informant { last_tick: RwLock, with_color: bool, target: T, snapshot: Option>, rpc_stats: Option>, last_import: Mutex, skipped: AtomicUsize, skipped_txs: AtomicUsize, in_shutdown: AtomicBool, last_report: Mutex, } impl Informant { /// Make a new instance potentially `with_color` output. pub fn new( target: T, snapshot: Option>, rpc_stats: Option>, with_color: bool, ) -> Self { Informant { last_tick: RwLock::new(Instant::now()), with_color: with_color, target: target, snapshot: snapshot, rpc_stats: rpc_stats, last_import: Mutex::new(Instant::now()), skipped: AtomicUsize::new(0), skipped_txs: AtomicUsize::new(0), in_shutdown: AtomicBool::new(false), last_report: Mutex::new(Default::default()), } } /// Signal that we're shutting down; no more output necessary. pub fn shutdown(&self) { self.in_shutdown .store(true, ::std::sync::atomic::Ordering::SeqCst); } pub fn tick(&self) { let now = Instant::now(); let elapsed = now.duration_since(*self.last_tick.read()); let (client_report, full_report) = { let last_report = self.last_report.lock(); let full_report = self.target.report(); let diffed = full_report.client_report.clone() - &*last_report; (diffed, full_report) }; let Report { importing, chain_info, queue_info, cache_sizes, sync_info, .. } = full_report; let rpc_stats = self.rpc_stats.as_ref(); let snapshot_sync = sync_info.as_ref().map_or(false, |s| s.snapshot_sync) && self .snapshot .as_ref() .map_or(false, |s| match s.restoration_status() { RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => { true } _ => false, }); if !importing && !snapshot_sync && elapsed < Duration::from_secs(30) { return; } *self.last_tick.write() = now; *self.last_report.lock() = full_report.client_report.clone(); let paint = |c: Style, t: String| match self.with_color && atty::is(atty::Stream::Stdout) { true => format!("{}", c.paint(t)), false => t, }; info!(target: "import", "{}{} {} {} {}", match importing { true => match snapshot_sync { false => format!("Syncing {} {} {} {}+{} Qed", paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), paint(White.bold(), format!("{}", chain_info.best_block_hash)), if self.target.executes_transactions() { format!("{} blk/s {} tx/s {} Mgas/s", paint(Yellow.bold(), format!("{:7.2}", (client_report.blocks_imported * 1000) as f64 / elapsed.as_milliseconds() as f64)), paint(Yellow.bold(), format!("{:6.1}", (client_report.transactions_applied * 1000) as f64 / elapsed.as_milliseconds() as f64)), paint(Yellow.bold(), format!("{:6.1}", (client_report.gas_processed / 1000).low_u64() as f64 / elapsed.as_milliseconds() as f64)) ) } else { format!("{} hdr/s", paint(Yellow.bold(), format!("{:6.1}", (client_report.blocks_imported * 1000) as f64 / elapsed.as_milliseconds() as f64)) ) }, paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) ), true => { self.snapshot.as_ref().map_or(String::new(), |s| match s.restoration_status() { RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done, .. } => { format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks) }, RestorationStatus::Initializing { chunks_done } => { format!("Snapshot initializing ({} chunks restored)", chunks_done) }, _ => String::new(), } ) }, }, false => String::new(), }, match chain_info.ancient_block_number { Some(ancient_number) => format!(" Ancient:#{}", ancient_number), None => String::new(), }, match sync_info.as_ref() { Some(ref sync_info) => format!("{}{}/{} peers", match importing { true => format!("{}", if self.target.executes_transactions() { paint(Green.bold(), format!("{:>8} ", format!("LI:#{}", sync_info.last_imported_block_number))) } else { String::new() } ), false => match sync_info.last_imported_ancient_number { Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("AB:#{}", number)))), None => String::new(), } }, paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)), paint(Cyan.bold(), format!("{:2}", sync_info.max_peers)), ), _ => String::new(), }, cache_sizes.display(Blue.bold(), &paint), match rpc_stats { Some(ref rpc_stats) => format!( "RPC: {} conn, {} req/s, {} µs", paint(Blue.bold(), format!("{:2}", rpc_stats.sessions())), paint(Blue.bold(), format!("{:4}", rpc_stats.requests_rate())), paint(Blue.bold(), format!("{:4}", rpc_stats.approximated_roundtrip())), ), _ => String::new(), }, ); } } impl ChainNotify for Informant { // t_nb 11.2 Informant. Prints new block inclusiong to console/log. fn new_blocks(&self, new_blocks: NewBlocks) { if new_blocks.has_more_blocks_to_import { return; } let mut last_import = self.last_import.lock(); let client = &self.target.client; let importing = self.target.is_major_importing(); let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing; let txs_imported = new_blocks .imported .iter() .take( new_blocks .imported .len() .saturating_sub(if ripe { 1 } else { 0 }), ) .filter_map(|h| client.block(BlockId::Hash(*h))) .map(|b| b.transactions_count()) .sum(); if ripe { if let Some(block) = new_blocks .imported .last() .and_then(|h| client.block(BlockId::Hash(*h))) { let header_view = block.header_view(); let size = block.rlp().as_raw().len(); let (skipped, skipped_txs) = ( self.skipped.load(AtomicOrdering::Relaxed) + new_blocks.imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported, ); info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}", Colour::White.bold().paint(format!("#{}", header_view.number())), Colour::White.bold().paint(format!("{}", header_view.hash())), Colour::Yellow.bold().paint(format!("{}", block.transactions_count())), Colour::Yellow.bold().paint(format!("{:.2}", header_view.gas_used().low_u64() as f32 / 1000000f32)), Colour::Purple.bold().paint(format!("{}", new_blocks.duration.as_milliseconds())), Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)), if skipped > 0 { format!(" + another {} block(s) containing {} tx(s)", Colour::Red.bold().paint(format!("{}", skipped)), Colour::Red.bold().paint(format!("{}", skipped_txs)) ) } else { String::new() } ); self.skipped.store(0, AtomicOrdering::Relaxed); self.skipped_txs.store(0, AtomicOrdering::Relaxed); *last_import = Instant::now(); } } else { self.skipped .fetch_add(new_blocks.imported.len(), AtomicOrdering::Relaxed); self.skipped_txs .fetch_add(txs_imported, AtomicOrdering::Relaxed); } } } const INFO_TIMER: TimerToken = 0; impl IoHandler for Informant { fn initialize(&self, io: &IoContext) { io.register_timer(INFO_TIMER, Duration::from_secs(5)) .expect("Error registering timer"); } fn timeout(&self, _io: &IoContext, timer: TimerToken) { if timer == INFO_TIMER && !self.in_shutdown.load(AtomicOrdering::SeqCst) { self.tick(); } } }