From 67c1f71b6ea673bc748c9802d2871dea35b658fa Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 10 Jul 2017 13:21:11 +0200 Subject: [PATCH] Proper light client informant and more verification of imported headers (#5897) * do more validation of imported headers in light client * generalize informant with traits * informant implementation for light client * make comment into TODO * fix broken test * disable full checking of headers in light client in sync tests --- ethcore/light/src/cache.rs | 18 +- ethcore/light/src/client/mod.rs | 77 ++++- ethcore/src/client/client.rs | 16 ++ ethcore/src/types/verification_queue_info.rs | 2 +- parity/blockchain.rs | 14 +- parity/informant.rs | 279 ++++++++++++++----- parity/run.rs | 63 +++-- sync/src/light_sync/tests/test_net.rs | 6 +- 8 files changed, 374 insertions(+), 101 deletions(-) diff --git a/ethcore/light/src/cache.rs b/ethcore/light/src/cache.rs index 67ba11ce0..9a5a3638f 100644 --- a/ethcore/light/src/cache.rs +++ b/ethcore/light/src/cache.rs @@ -26,7 +26,7 @@ use ethcore::receipt::Receipt; use stats::Corpus; use time::{SteadyTime, Duration}; -use util::{U256, H256}; +use util::{U256, H256, HeapSizeOf}; use util::cache::MemoryLruCache; /// Configuration for how much data to cache. @@ -153,6 +153,22 @@ impl Cache { pub fn set_gas_price_corpus(&mut self, corpus: Corpus) { self.corpus = Some((corpus, SteadyTime::now())) } + + /// Get the memory used. + pub fn mem_used(&self) -> usize { + self.heap_size_of_children() + } +} + +impl HeapSizeOf for Cache { + fn heap_size_of_children(&self) -> usize { + self.headers.current_size() + + self.canon_hashes.current_size() + + self.bodies.current_size() + + self.receipts.current_size() + + self.chain_score.current_size() + // TODO: + corpus + } } #[cfg(test)] diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index 57cd61cec..ba580b905 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -44,7 +44,7 @@ mod header_chain; mod service; /// Configuration for the light client. -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct Config { /// Verification queue config. pub queue: queue::Config, @@ -56,6 +56,21 @@ pub struct Config { pub db_compaction: CompactionProfile, /// Should db have WAL enabled? pub db_wal: bool, + /// Should it do full verification of blocks? + pub verify_full: bool, +} + +impl Default for Config { + fn default() -> Config { + Config { + queue: Default::default(), + chain_column: None, + db_cache_size: None, + db_compaction: CompactionProfile::default(), + db_wal: true, + verify_full: true, + } + } } /// Trait for interacting with the header chain abstractly. @@ -109,6 +124,9 @@ pub trait LightChainClient: Send + Sync { /// Get the EIP-86 transition block number. fn eip86_transition(&self) -> u64; + + /// Get a report of import activity since the last call. + fn report(&self) -> ClientReport; } /// An actor listening to light chain events. @@ -141,6 +159,7 @@ pub struct Client { import_lock: Mutex<()>, db: Arc, listeners: RwLock>>, + verify_full: bool, } impl Client { @@ -156,6 +175,7 @@ impl Client { import_lock: Mutex::new(()), db: db, listeners: RwLock::new(vec![]), + verify_full: config.verify_full, }) } @@ -263,6 +283,14 @@ impl Client { for verified_header in self.queue.drain(MAX) { let (num, hash) = (verified_header.number(), verified_header.hash()); + if self.verify_full && !self.check_header(&mut bad, &verified_header) { + continue + } + + // TODO: `epoch_end_signal`, `is_epoch_end`. + // proofs we get from the network would be _complete_, whereas we need + // _incomplete_ signals + let mut tx = self.db.transaction(); let pending = match self.chain.insert(&mut tx, verified_header) { Ok(pending) => { @@ -273,14 +301,16 @@ impl Client { Err(e) => { debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e); bad.push(hash); - break; + continue; } }; + self.db.write_buffered(tx); self.chain.apply_pending(pending); - if let Err(e) = self.db.flush() { - panic!("Database flush failed: {}. Check disk health and space.", e); - } + } + + if let Err(e) = self.db.flush() { + panic!("Database flush failed: {}. Check disk health and space.", e); } self.queue.mark_as_bad(&bad); @@ -291,7 +321,7 @@ impl Client { /// Get a report about blocks imported. pub fn report(&self) -> ClientReport { - ::std::mem::replace(&mut *self.report.write(), ClientReport::default()) + self.report.read().clone() } /// Get blockchain mem usage in bytes. @@ -350,6 +380,37 @@ impl Client { } } } + + // return true if should skip, false otherwise. may push onto bad if + // should skip. + fn check_header(&self, bad: &mut Vec, verified_header: &Header) -> bool { + let hash = verified_header.hash(); + let parent_header = match self.chain.block_header(BlockId::Hash(*verified_header.parent_hash())) { + Some(header) => header, + None => return false, // skip import of block with missing parent. + }; + + // Verify Block Family + let verify_family_result = self.engine.verify_block_family(&verified_header, &parent_header.decode(), None); + if let Err(e) = verify_family_result { + warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", + verified_header.number(), verified_header.hash(), e); + bad.push(hash); + return false; + }; + + // "external" verification. + let verify_external_result = self.engine.verify_block_external(&verified_header, None); + if let Err(e) = verify_external_result { + warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", + verified_header.number(), verified_header.hash(), e); + + bad.push(hash); + return false; + }; + + true + } } impl LightChainClient for Client { @@ -414,4 +475,8 @@ impl LightChainClient for Client { fn eip86_transition(&self) -> u64 { self.engine().params().eip86_transition } + + fn report(&self) -> ClientReport { + Client::report(self) + } } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index dcb9ab9f8..d0eb10210 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -112,6 +112,22 @@ impl ClientReport { } } +impl<'a> ::std::ops::Sub<&'a ClientReport> for ClientReport { + type Output = Self; + + fn sub(mut self, other: &'a ClientReport) -> Self { + let higher_mem = ::std::cmp::max(self.state_db_mem, other.state_db_mem); + let lower_mem = ::std::cmp::min(self.state_db_mem, other.state_db_mem); + + self.blocks_imported -= other.blocks_imported; + self.transactions_applied -= other.transactions_applied; + self.gas_processed = self.gas_processed - other.gas_processed; + self.state_db_mem = higher_mem - lower_mem; + + self + } +} + struct SleepState { last_activity: Option, last_autosleep: Option, diff --git a/ethcore/src/types/verification_queue_info.rs b/ethcore/src/types/verification_queue_info.rs index 566e37280..570277f09 100644 --- a/ethcore/src/types/verification_queue_info.rs +++ b/ethcore/src/types/verification_queue_info.rs @@ -17,7 +17,7 @@ //! Verification queue info types /// Verification queue status -#[derive(Debug)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "ipc", binary)] pub struct VerificationQueueInfo { /// Number of queued items pending verification diff --git a/parity/blockchain.rs b/parity/blockchain.rs index e31f89e45..fae9a3bfc 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -29,7 +29,7 @@ use ethcore::error::ImportError; use ethcore::miner::Miner; use ethcore::verification::queue::VerifierSettings; use cache::CacheConfig; -use informant::{Informant, MillisecondDuration}; +use informant::{Informant, FullNodeInformantData, MillisecondDuration}; use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool}; use helpers::{to_client_config, execute_upgrades}; use dir::Directories; @@ -238,7 +238,17 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> { } }; - let informant = Arc::new(Informant::new(client.clone(), None, None, None, None, cmd.with_color)); + let informant = Arc::new(Informant::new( + FullNodeInformantData { + client: client.clone(), + sync: None, + net: None, + }, + None, + None, + cmd.with_color, + )); + service.register_io_handler(informant).map_err(|_| "Unable to register informant handler".to_owned())?; let do_import = |bytes| { diff --git a/parity/informant.rs b/parity/informant.rs index 3dfd5147d..7e1e4ed4d 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -21,32 +21,21 @@ use self::ansi_term::Style; use std::sync::{Arc}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::time::{Instant, Duration}; + +use ethcore::client::*; +use ethcore::header::BlockNumber; +use ethcore::service::ClientIoMessage; +use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; +use ethcore::snapshot::service::Service as SnapshotService; +use ethsync::{LightSyncProvider, LightSync, SyncProvider, ManageNetwork}; use io::{TimerToken, IoContext, IoHandler}; use isatty::{stdout_isatty}; -use ethsync::{SyncProvider, ManageNetwork}; -use util::{RwLock, Mutex, H256, Colour, Bytes}; -use ethcore::client::*; -use ethcore::service::ClientIoMessage; -use ethcore::snapshot::service::Service as SnapshotService; -use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; +use light::Cache as LightDataCache; +use light::client::LightChainClient; use number_prefix::{binary_prefix, Standalone, Prefixed}; use parity_rpc::{is_major_importing}; use parity_rpc::informant::RpcStats; - -pub struct Informant { - report: RwLock>, - last_tick: RwLock, - with_color: bool, - client: Arc, - snapshot: Option>, - sync: Option>, - net: Option>, - rpc_stats: Option>, - last_import: Mutex, - skipped: AtomicUsize, - skipped_txs: AtomicUsize, - in_shutdown: AtomicBool, -} +use util::{RwLock, Mutex, H256, Colour, Bytes}; /// Format byte counts to standard denominations. pub fn format_bytes(b: usize) -> String { @@ -68,29 +57,188 @@ impl MillisecondDuration for Duration { } } -impl Informant { +#[derive(Default)] +struct CacheSizes { + sizes: ::std::collections::BTreeMap<&'static str, usize>, +} + +impl CacheSizes { + fn insert(&mut self, key: &'static str, bytes: usize) { + self.sizes.insert(key, bytes); + } + + fn display(&self, style: Style, paint: F) -> String + where F: Fn(Style, String) -> String + { + use std::fmt::Write; + + let mut buf = String::new(); + for (name, &size) in &self.sizes { + + write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name) + .expect("writing to string won't fail unless OOM; qed") + } + + buf + } +} + +pub struct SyncInfo { + last_imported_block_number: BlockNumber, + last_imported_old_block_number: Option, + num_peers: usize, + max_peers: u32, +} + +pub struct Report { + importing: bool, + chain_info: BlockChainInfo, + client_report: ClientReport, + queue_info: BlockQueueInfo, + cache_sizes: CacheSizes, + sync_info: Option, +} + +/// Something which can provide data to the informant. +pub trait InformantData: Send + Sync { + /// Whether it executes transactions + fn executes_transactions(&self) -> bool; + + /// Whether it is currently importing (also included in `Report`) + fn is_major_importing(&self) -> bool; + + /// Generate a report of blockchain status, memory usage, and sync info. + fn report(&self) -> Report; +} + +/// Informant data for a full node. +pub struct FullNodeInformantData { + pub client: Arc, + pub sync: Option>, + pub net: Option>, +} + +impl InformantData for FullNodeInformantData { + fn executes_transactions(&self) -> bool { true } + + fn is_major_importing(&self) -> bool { + let state = self.sync.as_ref().map(|sync| sync.status().state); + is_major_importing(state, self.client.queue_info()) + } + + fn report(&self) -> Report { + let (client_report, queue_info, blockchain_cache_info) = + (self.client.report(), self.client.queue_info(), self.client.blockchain_cache_info()); + + let chain_info = self.client.chain_info(); + + let mut cache_sizes = CacheSizes::default(); + cache_sizes.insert("db", client_report.state_db_mem); + cache_sizes.insert("queue", queue_info.mem_used); + cache_sizes.insert("chain", blockchain_cache_info.total()); + + let (importing, sync_info) = match (self.sync.as_ref(), self.net.as_ref()) { + (Some(sync), Some(net)) => { + let status = sync.status(); + let net_config = net.network_config(); + + cache_sizes.insert("sync", status.mem_used); + + let importing = is_major_importing(Some(status.state), queue_info.clone()); + (importing, Some(SyncInfo { + last_imported_block_number: status.last_imported_block_number.unwrap_or(chain_info.best_block_number), + last_imported_old_block_number: status.last_imported_old_block_number, + num_peers: status.num_peers, + max_peers: status.current_max_peers(net_config.min_peers, net_config.max_peers), + })) + } + _ => (is_major_importing(None, queue_info.clone()), None), + }; + + Report { + importing, + chain_info, + client_report, + queue_info, + cache_sizes, + sync_info, + } + } +} + +/// Informant data for a light node -- note that the network is required. +pub struct LightNodeInformantData { + pub client: Arc, + pub sync: Arc, + pub cache: Arc>, +} + +impl InformantData for LightNodeInformantData { + fn executes_transactions(&self) -> bool { false } + + fn is_major_importing(&self) -> bool { + self.sync.is_major_importing() + } + + fn report(&self) -> Report { + let (client_report, queue_info, chain_info) = + (self.client.report(), self.client.queue_info(), self.client.chain_info()); + + let mut cache_sizes = CacheSizes::default(); + cache_sizes.insert("queue", queue_info.mem_used); + cache_sizes.insert("cache", self.cache.lock().mem_used()); + + let peer_numbers = self.sync.peer_numbers(); + let sync_info = Some(SyncInfo { + last_imported_block_number: chain_info.best_block_number, + last_imported_old_block_number: None, + num_peers: peer_numbers.connected, + max_peers: peer_numbers.max as u32, + }); + + Report { + importing: self.sync.is_major_importing(), + chain_info, + client_report, + queue_info, + cache_sizes, + sync_info, + } + } +} + +pub struct Informant { + last_tick: RwLock, + with_color: bool, + target: T, + snapshot: Option>, + rpc_stats: Option>, + last_import: Mutex, + skipped: AtomicUsize, + skipped_txs: AtomicUsize, + in_shutdown: AtomicBool, + last_report: Mutex, +} + +impl Informant { /// Make a new instance potentially `with_color` output. pub fn new( - client: Arc, - sync: Option>, - net: Option>, + target: T, snapshot: Option>, rpc_stats: Option>, with_color: bool, ) -> Self { Informant { - report: RwLock::new(None), last_tick: RwLock::new(Instant::now()), with_color: with_color, - client: client, + target: target, snapshot: snapshot, - sync: sync, - net: net, rpc_stats: rpc_stats, last_import: Mutex::new(Instant::now()), skipped: AtomicUsize::new(0), skipped_txs: AtomicUsize::new(0), in_shutdown: AtomicBool::new(false), + last_report: Mutex::new(Default::default()), } } @@ -106,14 +254,24 @@ impl Informant { return; } - let chain_info = self.client.chain_info(); - let queue_info = self.client.queue_info(); - let cache_info = self.client.blockchain_cache_info(); - let network_config = self.net.as_ref().map(|n| n.network_config()); - let sync_status = self.sync.as_ref().map(|s| s.status()); + let Report { + importing, + chain_info, + client_report, + queue_info, + cache_sizes, + sync_info, + } = self.target.report(); + + let client_report = { + let mut last_report = self.last_report.lock(); + let diffed = client_report.clone() - &*last_report; + *last_report = client_report.clone(); + diffed + }; + let rpc_stats = self.rpc_stats.as_ref(); - let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info()); let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s| match s.status() { RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => @@ -128,9 +286,6 @@ impl Informant { *self.last_tick.write() = Instant::now(); - let mut write_report = self.report.write(); - let report = self.client.report(); - let paint = |c: Style, t: String| match self.with_color && stdout_isatty() { true => format!("{}", c.paint(t)), false => t, @@ -142,13 +297,16 @@ impl Informant { false => format!("Syncing {} {} {} {}+{} Qed", paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), paint(White.bold(), format!("{}", chain_info.best_block_hash)), - { - let last_report = match *write_report { Some(ref last_report) => last_report.clone(), _ => ClientReport::default() }; + if self.target.executes_transactions() { format!("{} blk/s {} tx/s {} Mgas/s", - paint(Yellow.bold(), format!("{:4}", ((report.blocks_imported - last_report.blocks_imported) * 1000) as u64 / elapsed.as_milliseconds())), - paint(Yellow.bold(), format!("{:4}", ((report.transactions_applied - last_report.transactions_applied) * 1000) as u64 / elapsed.as_milliseconds())), - paint(Yellow.bold(), format!("{:3}", ((report.gas_processed - last_report.gas_processed) / From::from(elapsed.as_milliseconds() * 1000)).low_u64())) - ) + paint(Yellow.bold(), format!("{:4}", (client_report.blocks_imported * 1000) as u64 / elapsed.as_milliseconds())), + paint(Yellow.bold(), format!("{:4}", (client_report.transactions_applied * 1000) as u64 / elapsed.as_milliseconds())), + paint(Yellow.bold(), format!("{:3}", (client_report.gas_processed / From::from(elapsed.as_milliseconds() * 1000)).low_u64())) + ) + } else { + format!("{} hdr/s", + paint(Yellow.bold(), format!("{:4}", (client_report.blocks_imported * 1000) as u64 / elapsed.as_milliseconds())) + ) }, paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) @@ -157,29 +315,21 @@ impl Informant { }, false => String::new(), }, - match (&sync_status, &network_config) { - (&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{} peers", + match sync_info.as_ref() { + Some(ref sync_info) => format!("{}{}/{} peers", match importing { - true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))), + true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number)))), false => match sync_info.last_imported_old_block_number { Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("#{}", number)))), None => String::new(), } }, paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)), - paint(Cyan.bold(), format!("{:2}", sync_info.current_max_peers(net_config.min_peers, net_config.max_peers))), + paint(Cyan.bold(), format!("{:2}", sync_info.max_peers)), ), _ => String::new(), }, - format!("{} db {} chain {} queue{}", - paint(Blue.bold(), format!("{:>8}", format_bytes(report.state_db_mem))), - paint(Blue.bold(), format!("{:>8}", format_bytes(cache_info.total()))), - paint(Blue.bold(), format!("{:>8}", format_bytes(queue_info.mem_used))), - match sync_status { - Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", format_bytes(sync_info.mem_used)))), - _ => String::new(), - } - ), + cache_sizes.display(Blue.bold(), &paint), match rpc_stats { Some(ref rpc_stats) => format!( "RPC: {} conn, {} req/s, {} µs", @@ -190,25 +340,24 @@ impl Informant { _ => String::new(), }, ); - - *write_report = Some(report); } } -impl ChainNotify for Informant { +impl ChainNotify for Informant { fn new_blocks(&self, imported: Vec, _invalid: Vec, _enacted: Vec, _retracted: Vec, _sealed: Vec, _proposed: Vec, duration: u64) { let mut last_import = self.last_import.lock(); - let sync_state = self.sync.as_ref().map(|s| s.status().state); - let importing = is_major_importing(sync_state, self.client.queue_info()); + let client = &self.target.client; + + let importing = self.target.is_major_importing(); let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing; let txs_imported = imported.iter() .take(imported.len().saturating_sub(if ripe { 1 } else { 0 })) - .filter_map(|h| self.client.block(BlockId::Hash(*h))) + .filter_map(|h| client.block(BlockId::Hash(*h))) .map(|b| b.transactions_count()) .sum(); if ripe { - if let Some(block) = imported.last().and_then(|h| self.client.block(BlockId::Hash(*h))) { + if let Some(block) = imported.last().and_then(|h| client.block(BlockId::Hash(*h))) { let header_view = block.header_view(); let size = block.rlp().as_raw().len(); let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported); @@ -241,7 +390,7 @@ impl ChainNotify for Informant { const INFO_TIMER: TimerToken = 0; -impl IoHandler for Informant { +impl IoHandler for Informant { fn initialize(&self, io: &IoContext) { io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); } diff --git a/parity/run.rs b/parity/run.rs index cc2547c3f..3cdfd19f5 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -16,26 +16,27 @@ use std::sync::Arc; use std::net::{TcpListener}; + use ctrlc::CtrlC; -use fdlimit::raise_fd_limit; -use parity_rpc::{NetworkSettings, informant, is_major_importing}; -use ethsync::NetworkConfiguration; -use util::{Colour, version, Mutex, Condvar}; use ethcore_logger::{Config as LogConfig, RotatingLogger}; -use ethcore::miner::{StratumOptions, Stratum}; -use ethcore::client::{Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient}; -use ethcore::service::ClientService; use ethcore::account_provider::{AccountProvider, AccountProviderSettings}; +use ethcore::client::{Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient}; +use ethcore::ethstore::ethkey; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; +use ethcore::miner::{StratumOptions, Stratum}; +use ethcore::service::ClientService; use ethcore::snapshot; use ethcore::verification::queue::VerifierSettings; -use ethcore::ethstore::ethkey; -use light::Cache as LightDataCache; +use ethsync::NetworkConfiguration; use ethsync::SyncConfig; -use informant::Informant; -use updater::{UpdatePolicy, Updater}; -use parity_reactor::EventLoop; +use fdlimit::raise_fd_limit; use hash_fetch::fetch::{Fetch, Client as FetchClient}; +use informant::{Informant, LightNodeInformantData, FullNodeInformantData}; +use light::Cache as LightDataCache; +use parity_reactor::EventLoop; +use parity_rpc::{NetworkSettings, informant, is_major_importing}; +use updater::{UpdatePolicy, Updater}; +use util::{Colour, version, Mutex, Condvar}; use params::{ SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch, @@ -209,6 +210,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> db_cache_size: Some(cmd.cache_config.blockchain() as usize * 1024 * 1024), db_compaction: compaction, db_wal: cmd.wal, + verify_full: true, }; config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; @@ -300,7 +302,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> logger: logger, settings: Arc::new(cmd.net_settings), on_demand: on_demand, - cache: cache, + cache: cache.clone(), transaction_queue: txq, dapps_service: dapps_service, dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()), @@ -322,16 +324,25 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?; let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?; - // minimal informant thread. Just prints block number every 5 seconds. - // TODO: integrate with informant.rs - let informant_client = service.client().clone(); - ::std::thread::spawn(move || loop { - info!("#{}", informant_client.best_block_header().number()); - ::std::thread::sleep(::std::time::Duration::from_secs(5)); - }); + // the informant + let informant = Arc::new(Informant::new( + LightNodeInformantData { + client: service.client().clone(), + sync: light_sync.clone(), + cache: cache, + }, + None, + Some(rpc_stats), + cmd.logger_config.color, + )); - // wait for ctrl-c. - Ok(wait_for_exit(None, None, can_restart)) + service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?; + + // wait for ctrl-c and then shut down the informant. + let res = wait_for_exit(None, None, can_restart); + informant.shutdown(); + + Ok(res) } pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<(bool, Option), String> { @@ -672,9 +683,11 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R // the informant let informant = Arc::new(Informant::new( - service.client(), - Some(sync_provider.clone()), - Some(manage_network.clone()), + FullNodeInformantData { + client: service.client(), + sync: Some(sync_provider.clone()), + net: Some(manage_network.clone()), + }, Some(snapshot_service.clone()), Some(rpc_stats.clone()), cmd.logger_config.color, diff --git a/sync/src/light_sync/tests/test_net.rs b/sync/src/light_sync/tests/test_net.rs index 525216a7e..f6d5eddf0 100644 --- a/sync/src/light_sync/tests/test_net.rs +++ b/sync/src/light_sync/tests/test_net.rs @@ -211,8 +211,12 @@ impl TestNet { pub fn light(n_light: usize, n_full: usize) -> Self { let mut peers = Vec::with_capacity(n_light + n_full); for _ in 0..n_light { + let mut config = ::light::client::Config::default(); + + // skip full verification because the blocks are bad. + config.verify_full = false; let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6)))); - let client = LightClient::in_memory(Default::default(), &Spec::new_test(), IoChannel::disconnected(), cache); + let client = LightClient::in_memory(config, &Spec::new_test(), IoChannel::disconnected(), cache); peers.push(Arc::new(Peer::new_light(Arc::new(client)))) }