From 866ab9c7a3fd0e9c0b44b2d1a29fb1fc6250fe70 Mon Sep 17 00:00:00 2001 From: keorn Date: Thu, 20 Oct 2016 22:36:18 +0100 Subject: [PATCH] Check queue to determine major importing (#2763) * simplify major sync detection * fix typos * fix merge * more realistic EthTester * add new synced state * remove Blocks synced state * move is_major_importing to rpc crate and check queue * add tests --- dapps/src/apps/fetcher.rs | 4 +- dapps/src/lib.rs | 4 +- ethcore/src/client/client.rs | 3 +- ethcore/src/snapshot/watcher.rs | 8 ++-- parity/dapps.rs | 4 +- parity/informant.rs | 6 ++- parity/run.rs | 6 +-- rpc/src/lib.rs | 1 + rpc/src/v1/helpers/block_import.rs | 60 ++++++++++++++++++++++++++++++ rpc/src/v1/helpers/mod.rs | 1 + rpc/src/v1/impls/eth.rs | 6 ++- rpc/src/v1/mod.rs | 2 +- sync/src/chain.rs | 11 ------ 13 files changed, 87 insertions(+), 29 deletions(-) create mode 100644 rpc/src/v1/helpers/block_import.rs diff --git a/dapps/src/apps/fetcher.rs b/dapps/src/apps/fetcher.rs index 2e1328858..9d66276ea 100644 --- a/dapps/src/apps/fetcher.rs +++ b/dapps/src/apps/fetcher.rs @@ -85,7 +85,7 @@ impl ContentFetcher { // fallback to resolver if let Ok(content_id) = content_id.from_hex() { // if app_id is valid, but we are syncing always return true. - if self.sync.is_major_syncing() { + if self.sync.is_major_importing() { return true; } // else try to resolve the app_id @@ -99,7 +99,7 @@ impl ContentFetcher { let mut cache = self.cache.lock(); let content_id = path.app_id.clone(); - if self.sync.is_major_syncing() { + if self.sync.is_major_importing() { return Box::new(ContentHandler::error( StatusCode::ServiceUnavailable, "Sync In Progress", diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index 95fbbb191..8fd826ff9 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -95,11 +95,11 @@ static DAPPS_DOMAIN : &'static str = ".parity"; /// Indicates sync status pub trait SyncStatus: Send + Sync { /// Returns true if there is a major sync happening. - fn is_major_syncing(&self) -> bool; + fn is_major_importing(&self) -> bool; } impl SyncStatus for F where F: Fn() -> bool + Send + Sync { - fn is_major_syncing(&self) -> bool { self() } + fn is_major_importing(&self) -> bool { self() } } /// Webapps HTTP+RPC server build. diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 6f5558708..1204cd12f 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -46,7 +46,7 @@ use transaction::{LocalizedTransaction, SignedTransaction, Action}; use blockchain::extras::TransactionAddress; use types::filter::Filter; use log_entry::LocalizedLogEntry; -use verification::queue::{BlockQueue, QueueInfo as BlockQueueInfo}; +use verification::queue::BlockQueue; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; use client::{ BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient, @@ -71,6 +71,7 @@ use state_db::StateDB; pub use types::blockchain_info::BlockChainInfo; pub use types::block_status::BlockStatus; pub use blockchain::CacheSize as BlockChainCacheSize; +pub use verification::queue::QueueInfo as BlockQueueInfo; const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index d37be7c81..498f00737 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -30,7 +30,7 @@ use std::sync::Arc; trait Oracle: Send + Sync { fn to_number(&self, hash: H256) -> Option; - fn is_major_syncing(&self) -> bool; + fn is_major_importing(&self) -> bool; } struct StandardOracle where F: 'static + Send + Sync + Fn() -> bool { @@ -45,7 +45,7 @@ impl Oracle for StandardOracle self.client.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) } - fn is_major_syncing(&self) -> bool { + fn is_major_importing(&self) -> bool { (self.sync_status)() } } @@ -108,7 +108,7 @@ impl ChainNotify for Watcher { _: Vec, _duration: u64) { - if self.oracle.is_major_syncing() { return } + if self.oracle.is_major_importing() { return } trace!(target: "snapshot_watcher", "{} imported", imported.len()); @@ -143,7 +143,7 @@ mod tests { self.0.get(&hash).cloned() } - fn is_major_syncing(&self) -> bool { false } + fn is_major_importing(&self) -> bool { false } } struct TestBroadcast(Option); diff --git a/parity/dapps.rs b/parity/dapps.rs index ede57ae5d..d7bb0c07a 100644 --- a/parity/dapps.rs +++ b/parity/dapps.rs @@ -108,6 +108,7 @@ mod server { use ethcore::client::{Client, BlockChainClient, BlockID}; use rpc_apis; + use ethcore_rpc::is_major_importing; use ethcore_dapps::ContractClient; pub use ethcore_dapps::Server as WebappServer; @@ -127,7 +128,8 @@ mod server { Arc::new(Registrar { client: deps.client.clone() }) ); let sync = deps.sync.clone(); - server.with_sync_status(Arc::new(move || sync.status().is_major_syncing())); + let client = deps.client.clone(); + server.with_sync_status(Arc::new(move || is_major_importing(Some(sync.status().state), client.queue_info()))); server.with_signer_port(signer_port); let server = rpc_apis::setup_rpc(server, deps.apis.clone(), rpc_apis::ApiSet::UnsafeContext); diff --git a/parity/informant.rs b/parity/informant.rs index b3cd5bfb6..9d1679615 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -29,6 +29,7 @@ use ethcore::views::BlockView; use ethcore::snapshot::service::Service as SnapshotService; use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; use number_prefix::{binary_prefix, Standalone, Prefixed}; +use ethcore_rpc::is_major_importing; pub struct Informant { chain_info: RwLock>, @@ -95,7 +96,7 @@ impl Informant { let network_config = self.net.as_ref().map(|n| n.network_config()); let sync_status = self.sync.as_ref().map(|s| s.status()); - let importing = self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing()); + let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info()); let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s| match s.status() { RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => @@ -174,7 +175,8 @@ impl Informant { impl ChainNotify for Informant { fn new_blocks(&self, imported: Vec, _invalid: Vec, _enacted: Vec, _retracted: Vec, _sealed: Vec, duration: u64) { let mut last_import = self.last_import.lock(); - let importing = self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing()); + let sync_state = self.sync.as_ref().map(|s| s.status().state); + let importing = is_major_importing(sync_state, self.client.queue_info()); if Instant::now() > *last_import + Duration::from_secs(1) && !importing { if let Some(block) = imported.last().and_then(|h| self.client.block(BlockID::Hash(*h))) { let view = BlockView::new(&block); diff --git a/parity/run.rs b/parity/run.rs index 47b071734..4d6b92600 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -18,11 +18,11 @@ use std::sync::{Arc, Mutex, Condvar}; use ctrlc::CtrlC; use fdlimit::raise_fd_limit; use ethcore_logger::{Config as LogConfig, setup_log}; -use ethcore_rpc::NetworkSettings; +use ethcore_rpc::{NetworkSettings, is_major_importing}; use ethsync::NetworkConfiguration; use util::{Colour, version, U256}; use io::{MayPanic, ForwardPanic, PanicHandler}; -use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, ChainNotify}; +use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, ChainNotify, BlockChainClient}; use ethcore::service::ClientService; use ethcore::account_provider::AccountProvider; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; @@ -315,7 +315,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { let sync = sync_provider.clone(); let watcher = Arc::new(snapshot::Watcher::new( service.client(), - move || ::ethsync::SyncProvider::status(&*sync).is_major_syncing(), + move || is_major_importing(Some(sync.status().state), client.queue_info()), service.io().channel(), SNAPSHOT_PERIOD, SNAPSHOT_HISTORY, diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 01ba44941..3b67ae7f0 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -56,6 +56,7 @@ use self::jsonrpc_core::{IoHandler, IoDelegate}; pub use jsonrpc_http_server::{ServerBuilder, Server, RpcServerError}; pub mod v1; pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings}; +pub use v1::block_import::is_major_importing; /// An object that can be extended with `IoDelegates` pub trait Extendable { diff --git a/rpc/src/v1/helpers/block_import.rs b/rpc/src/v1/helpers/block_import.rs new file mode 100644 index 000000000..4bb2920ed --- /dev/null +++ b/rpc/src/v1/helpers/block_import.rs @@ -0,0 +1,60 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Block import analysis functions. + +use ethcore::client::BlockQueueInfo; +use ethsync::SyncState; + +/// Check if client is during major sync or during block import. +pub fn is_major_importing(sync_state: Option, queue_info: BlockQueueInfo) -> bool { + let is_syncing_state = sync_state.map_or(false, |s| + s != SyncState::Idle && s != SyncState::NewBlocks + ); + let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; + is_verifying || is_syncing_state +} + +#[cfg(test)] +mod tests { + use ethcore::client::BlockQueueInfo; + use ethsync::SyncState; + use super::is_major_importing; + + + fn queue_info(unverified: usize, verified: usize) -> BlockQueueInfo { + BlockQueueInfo { + unverified_queue_size: unverified, + verified_queue_size: verified, + verifying_queue_size: 0, + max_queue_size: 1000, + max_mem_use: 1000, + mem_used: 500 + } + } + + #[test] + fn is_still_verifying() { + assert!(!is_major_importing(None, queue_info(2, 1))); + assert!(is_major_importing(None, queue_info(2, 2))); + } + + #[test] + fn is_synced_state() { + assert!(is_major_importing(Some(SyncState::Blocks), queue_info(0, 0))); + assert!(!is_major_importing(Some(SyncState::Idle), queue_info(0, 0))); + } +} diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index e6ada3379..d5f64dde9 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -22,6 +22,7 @@ pub mod errors; pub mod dispatch; pub mod params; +pub mod block_import; mod poll_manager; mod poll_filter; diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index e48e50f13..e414fe765 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -49,6 +49,7 @@ use v1::types::{ }; use v1::helpers::{CallRequest as CRequest, errors, limit_logs}; use v1::helpers::dispatch::{default_gas_price, dispatch_transaction}; +use v1::helpers::block_import::is_major_importing; use v1::helpers::auto_args::Trailing; /// Eth RPC options @@ -254,8 +255,9 @@ impl Eth for EthClient where fn syncing(&self) -> Result { try!(self.active()); let status = take_weak!(self.sync).status(); - if status.is_major_syncing() { - let current_block = U256::from(take_weak!(self.client).chain_info().best_block_number); + let client = take_weak!(self.client); + if is_major_importing(Some(status.state), client.queue_info()) { + let current_block = U256::from(client.chain_info().best_block_number); let highest_block = U256::from(status.highest_block_number.unwrap_or(status.start_block_number)); let info = SyncInfo { starting_block: status.start_block_number.into(), diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 889b7840b..5ba302cea 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -28,4 +28,4 @@ pub mod types; pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Personal, PersonalSigner, Net, Ethcore, EthcoreSet, Traces, Rpc}; pub use self::impls::*; -pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings}; +pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import}; diff --git a/sync/src/chain.rs b/sync/src/chain.rs index b667603ad..181d8dda3 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -202,17 +202,6 @@ pub struct SyncStatus { } impl SyncStatus { - /// Indicates if initial sync is still in progress. - pub fn is_major_syncing(&self) -> bool { - let is_synced_state = match self.state { - SyncState::Idle | SyncState::NewBlocks | SyncState::Blocks => true, - _ => false, - }; - let is_current_block = self.highest_block_number.unwrap_or(self.start_block_number) < self.last_imported_block_number.unwrap_or(0) + BlockNumber::from(4u64); - // If not synced then is major syncing. - !(is_synced_state && is_current_block) - } - /// Indicates if snapshot download is in progress pub fn is_snapshot_syncing(&self) -> bool { self.state == SyncState::SnapshotManifest