From b007770ba8d1490e743e41e3b28cadd2c3ac3905 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 20 Jul 2016 12:36:20 +0200 Subject: [PATCH] Moved syncing log out of the client (#1670) --- ethcore/src/client/chain_notify.rs | 3 +- ethcore/src/client/client.rs | 94 ++++++++++-------------------- ethcore/src/service.rs | 4 +- parity/informant.rs | 91 ++++++++++++++++++++--------- parity/io_handler.rs | 6 +- parity/main.rs | 15 +++-- sync/src/api.rs | 3 +- sync/src/chain.rs | 7 +++ 8 files changed, 120 insertions(+), 103 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 71c076e4c..1d6b51803 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -28,7 +28,8 @@ pub trait ChainNotify : Send + Sync { _invalid: Vec, _enacted: Vec, _retracted: Vec, - _sealed: Vec) { + _sealed: Vec, + _duration: u64) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ba830b634..7cc909fa7 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -20,11 +20,11 @@ use std::sync::{Arc, Weak}; use std::path::{Path, PathBuf}; use std::fmt; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; -use std::time::{Instant, Duration}; +use std::time::{Instant}; use time::precise_time_ns; // util -use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock, Colour}; +use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock}; use util::journaldb::JournalDB; use util::rlp::{RlpStream, Rlp, UntrustedRlp}; use util::numbers::*; @@ -135,10 +135,8 @@ pub struct Client { sleep_state: Mutex, liveness: AtomicBool, io_channel: IoChannel, - notify: RwLock>>, + notify: RwLock>>, queue_transactions: AtomicUsize, - skipped: AtomicUsize, - last_import: Mutex, last_hashes: RwLock>, } @@ -229,24 +227,24 @@ impl Client { trie_factory: TrieFactory::new(config.trie_spec), miner: miner, io_channel: message_channel, - notify: RwLock::new(None), + notify: RwLock::new(Vec::new()), queue_transactions: AtomicUsize::new(0), - skipped: AtomicUsize::new(0), - last_import: Mutex::new(Instant::now()), last_hashes: RwLock::new(VecDeque::new()), }; Ok(Arc::new(client)) } - /// Sets the actor to be notified on certain events - pub fn set_notify(&self, target: &Arc) { - let mut write_lock = self.notify.write(); - *write_lock = Some(Arc::downgrade(target)); + /// Adds an actor to be notified on certain events + pub fn add_notify(&self, target: &Arc) { + self.notify.write().push(Arc::downgrade(target)); } - fn notify(&self) -> Option> { - let read_lock = self.notify.read(); - read_lock.as_ref().and_then(|weak| weak.upgrade()) + fn notify(&self, f: F) where F: Fn(&ChainNotify) { + for np in self.notify.read().iter() { + if let Some(n) = np.upgrade() { + f(&*n); + } + } } /// Flush the block import queue. @@ -357,28 +355,24 @@ impl Client { /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self) -> usize { let max_blocks_to_import = 64; - let (imported_blocks, import_results, invalid_blocks, original_best, imported) = { + let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = { let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); let mut import_results = Vec::with_capacity(max_blocks_to_import); let _import_lock = self.import_lock.lock(); let _timer = PerfTimer::new("import_verified_blocks"); + let start = precise_time_ns(); let blocks = self.block_queue.drain(max_blocks_to_import); let original_best = self.chain_info().best_block_hash; for block in blocks { let header = &block.header; - let start = precise_time_ns(); - if invalid_blocks.contains(&header.parent_hash) { invalid_blocks.insert(header.hash()); continue; } - let tx_count = block.transactions.len(); - let size = block.bytes.len(); - let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { invalid_blocks.insert(header.hash()); @@ -392,30 +386,6 @@ impl Client { import_results.push(route); self.report.write().accrue_block(&block); - - let duration_ns = precise_time_ns() - start; - - let mut last_import = self.last_import.lock(); - if Instant::now() > *last_import + Duration::from_secs(1) { - let queue_info = self.queue_info(); - let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; - if !importing { - let skipped = self.skipped.load(AtomicOrdering::Relaxed); - info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}", - Colour::White.bold().paint(format!("#{}", header.number())), - Colour::White.bold().paint(format!("{}", header.hash())), - Colour::Yellow.bold().paint(format!("{}", tx_count)), - Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)), - Colour::Purple.bold().paint(format!("{:.2}", duration_ns as f32 / 1000000f32)), - Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)), - if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() } - ); - *last_import = Instant::now(); - } - self.skipped.store(0, AtomicOrdering::Relaxed); - } else { - self.skipped.fetch_add(1, AtomicOrdering::Relaxed); - } } let imported = imported_blocks.len(); @@ -429,7 +399,8 @@ impl Client { self.block_queue.mark_as_good(&imported_blocks); } } - (imported_blocks, import_results, invalid_blocks, original_best, imported) + let duration_ns = precise_time_ns() - start; + (imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns) }; { @@ -440,15 +411,16 @@ impl Client { self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted); } - if let Some(notify) = self.notify() { + self.notify(|notify| { notify.new_blocks( - imported_blocks, - invalid_blocks, - enacted, - retracted, + imported_blocks.clone(), + invalid_blocks.clone(), + enacted.clone(), + retracted.clone(), Vec::new(), + duration, ); - } + }); } } @@ -640,9 +612,7 @@ impl Client { fn wake_up(&self) { if !self.liveness.load(AtomicOrdering::Relaxed) { self.liveness.store(true, AtomicOrdering::Relaxed); - if let Some(notify) = self.notify() { - notify.start(); - } + self.notify(|n| n.start()); trace!(target: "mode", "wake_up: Waking."); } } @@ -652,9 +622,7 @@ impl Client { // only sleep if the import queue is mostly empty. if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON { self.liveness.store(false, AtomicOrdering::Relaxed); - if let Some(notify) = self.notify() { - notify.stop(); - } + self.notify(|n| n.stop()); trace!(target: "mode", "sleep: Sleeping."); } else { trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing."); @@ -1029,6 +997,7 @@ impl MiningBlockChainClient for Client { fn import_sealed_block(&self, block: SealedBlock) -> ImportResult { let _import_lock = self.import_lock.lock(); let _timer = PerfTimer::new("import_sealed_block"); + let start = precise_time_ns(); let original_best = self.chain_info().best_block_hash; @@ -1043,15 +1012,16 @@ impl MiningBlockChainClient for Client { let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted); - if let Some(notify) = self.notify() { + self.notify(|notify| { notify.new_blocks( vec![h.clone()], vec![], - enacted, - retracted, + enacted.clone(), + retracted.clone(), vec![h.clone()], + precise_time_ns() - start, ); - } + }); } if self.chain_info().best_block_hash != original_best { diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 1567e77e4..3ea1dd114 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -90,8 +90,8 @@ impl ClientService { } /// Set the actor to be notified on certain chain events - pub fn set_notify(&self, notify: &Arc) { - self.client.set_notify(notify); + pub fn add_notify(&self, notify: &Arc) { + self.client.add_notify(notify); } } diff --git a/parity/informant.rs b/parity/informant.rs index 1235842d2..2568ca50c 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -18,12 +18,15 @@ extern crate ansi_term; use self::ansi_term::Colour::{White, Yellow, Green, Cyan, Blue}; use self::ansi_term::Style; +use std::sync::{Arc}; +use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; use std::time::{Instant, Duration}; use std::ops::{Deref, DerefMut}; use isatty::{stdout_isatty}; -use ethsync::{SyncStatus, NetworkConfiguration}; -use util::{Uint, RwLock}; +use ethsync::{SyncProvider, ManageNetwork}; +use util::{Uint, RwLock, Mutex, H256, Colour}; use ethcore::client::*; +use ethcore::views::BlockView; use number_prefix::{binary_prefix, Standalone, Prefixed}; pub struct Informant { @@ -32,18 +35,11 @@ pub struct Informant { report: RwLock>, last_tick: RwLock, with_color: bool, -} - -impl Default for Informant { - fn default() -> Self { - Informant { - chain_info: RwLock::new(None), - cache_info: RwLock::new(None), - report: RwLock::new(None), - last_tick: RwLock::new(Instant::now()), - with_color: true, - } - } + client: Arc, + sync: Option>, + net: Option>, + last_import: Mutex, + skipped: AtomicUsize, } trait MillisecondDuration { @@ -58,13 +54,18 @@ impl MillisecondDuration for Duration { impl Informant { /// Make a new instance potentially `with_color` output. - pub fn new(with_color: bool) -> Self { + pub fn new(client: Arc, sync: Option>, net: Option>, with_color: bool) -> Self { Informant { chain_info: RwLock::new(None), cache_info: RwLock::new(None), report: RwLock::new(None), last_tick: RwLock::new(Instant::now()), with_color: with_color, + client: client, + sync: sync, + net: net, + last_import: Mutex::new(Instant::now()), + skipped: AtomicUsize::new(0), } } @@ -77,17 +78,20 @@ impl Informant { #[cfg_attr(feature="dev", allow(match_bool))] - pub fn tick(&self, client: &Client, maybe_status: Option<(SyncStatus, NetworkConfiguration)>) { + pub fn tick(&self) { let elapsed = self.last_tick.read().elapsed(); if elapsed < Duration::from_secs(5) { return; } - let chain_info = client.chain_info(); - let queue_info = client.queue_info(); - let cache_info = client.blockchain_cache_info(); + let chain_info = self.client.chain_info(); + let queue_info = self.client.queue_info(); + let cache_info = self.client.blockchain_cache_info(); + let network_config = self.net.as_ref().map(|n| n.network_config()); + let sync_status = self.sync.as_ref().map(|s| s.status()); - let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; + let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 + || self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing()); if !importing && elapsed < Duration::from_secs(30) { return; } @@ -95,7 +99,7 @@ impl Informant { *self.last_tick.write() = Instant::now(); let mut write_report = self.report.write(); - let report = client.report(); + let report = self.client.report(); let paint = |c: Style, t: String| match self.with_color && stdout_isatty() { true => format!("{}", c.paint(t)), @@ -120,8 +124,8 @@ impl Informant { ), false => String::new(), }, - match maybe_status { - Some((ref sync_info, ref net_config)) => format!("{}{}/{}/{} peers", + match (&sync_status, &network_config) { + (&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{}/{} peers", match importing { true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))), false => String::new(), @@ -130,14 +134,14 @@ impl Informant { paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)), paint(Cyan.bold(), format!("{:2}", net_config.ideal_peers)) ), - None => String::new(), + _ => String::new(), }, - format!("{} db {} chain {} queue{}", + format!("{} db {} chain {} queue{}", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(report.state_db_mem))), paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(cache_info.total()))), paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(queue_info.mem_used))), - match maybe_status { - Some((ref sync_info, _)) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))), + match sync_status { + Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))), _ => String::new(), } ) @@ -149,3 +153,36 @@ impl Informant { } } +impl ChainNotify for Informant { + fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, duration: u64) { + let mut last_import = self.last_import.lock(); + if Instant::now() > *last_import + Duration::from_secs(1) { + let queue_info = self.client.queue_info(); + let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 + || self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing()); + if !importing { + if let Some(block) = enacted.last().and_then(|h| self.client.block(BlockID::Hash(h.clone()))) { + let view = BlockView::new(&block); + let header = view.header(); + let tx_count = view.transactions_count(); + let size = block.len(); + let skipped = self.skipped.load(AtomicOrdering::Relaxed); + info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}", + Colour::White.bold().paint(format!("#{}", header.number())), + Colour::White.bold().paint(format!("{}", header.hash())), + Colour::Yellow.bold().paint(format!("{}", tx_count)), + Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)), + Colour::Purple.bold().paint(format!("{:.2}", duration as f32 / 1000000f32)), + Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)), + if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() } + ); + *last_import = Instant::now(); + } + } + self.skipped.store(0, AtomicOrdering::Relaxed); + } else { + self.skipped.fetch_add(enacted.len(), AtomicOrdering::Relaxed); + } + } +} + diff --git a/parity/io_handler.rs b/parity/io_handler.rs index d0b33a470..7d8e53eea 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -30,7 +30,7 @@ pub struct ClientIoHandler { pub sync: Arc, pub net: Arc, pub accounts: Arc, - pub info: Informant, + pub info: Arc, } impl IoHandler for ClientIoHandler { @@ -40,9 +40,7 @@ impl IoHandler for ClientIoHandler { fn timeout(&self, _io: &IoContext, timer: TimerToken) { if let INFO_TIMER = timer { - let sync_status = self.sync.status(); - let network_config = self.net.network_config(); - self.info.tick(&self.client, Some((sync_status, network_config))); + self.info.tick(); } } } diff --git a/parity/main.rs b/parity/main.rs index fe5107d66..94db419cf 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -86,7 +86,7 @@ use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; use util::{H256, ToPretty, PayloadInfo, Bytes, Colour, version, journaldb, RotatingLogger}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; -use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, Mode}; +use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, Mode, ChainNotify}; use ethcore::error::{ImportError}; use ethcore::service::ClientService; use ethcore::spec::Spec; @@ -246,7 +246,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig, let (sync_provider, manage_network, chain_notify) = modules::sync(sync_config, NetworkConfiguration::from(net_settings), client.clone()) .unwrap_or_else(|e| die_with_error("Sync", e)); - service.set_notify(&chain_notify); + service.add_notify(&chain_notify); // if network is active by default if match conf.mode() { Mode::Dark(..) => false, _ => !conf.args.flag_no_network } { @@ -310,10 +310,13 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig, apis: deps_for_rpc_apis.clone(), }); + let informant = Arc::new(Informant::new(service.client(), Some(sync_provider.clone()), Some(manage_network.clone()), conf.have_color())); + let info_notify: Arc = informant.clone(); + service.add_notify(&info_notify); // Register IO handler let io_handler = Arc::new(ClientIoHandler { client: service.client(), - info: Informant::new(conf.have_color()), + info: informant, sync: sync_provider.clone(), net: manage_network.clone(), accounts: account_service.clone(), @@ -439,7 +442,7 @@ fn execute_import(conf: Configuration, panic_handler: Arc) { } }; - let informant = Informant::new(conf.have_color()); + let informant = Informant::new(client.clone(), None, None, conf.have_color()); let do_import = |bytes| { while client.queue_info().is_full() { sleep(Duration::from_secs(1)); } @@ -448,7 +451,7 @@ fn execute_import(conf: Configuration, panic_handler: Arc) { Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { trace!("Skipping block already in chain."); } Err(e) => die!("Cannot import block: {:?}", e) } - informant.tick(&*client, None); + informant.tick(); }; match format { @@ -476,7 +479,7 @@ fn execute_import(conf: Configuration, panic_handler: Arc) { } while !client.queue_info().is_empty() { sleep(Duration::from_secs(1)); - informant.tick(&*client, None); + informant.tick(); } client.flush_queue(); } diff --git a/sync/src/api.rs b/sync/src/api.rs index 79970a913..22b3d05a1 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -124,7 +124,8 @@ impl ChainNotify for EthSync { invalid: Vec, enacted: Vec, retracted: Vec, - sealed: Vec) + sealed: Vec, + _duration: u64) { self.network.with_context(ETH_PROTOCOL, |context| { let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); diff --git a/sync/src/chain.rs b/sync/src/chain.rs index c21e1c8f1..28c42cd6a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -178,6 +178,13 @@ pub struct SyncStatus { pub mem_used: usize, } +impl SyncStatus { + /// Indicates if initial sync is still in progress. + pub fn is_major_syncing(&self) -> bool { + self.state != SyncState::Idle && self.state != SyncState::NewBlocks + } +} + #[derive(PartialEq, Eq, Debug, Clone)] /// Peer data type requested enum PeerAsking {