Check queue to determine major importing (#2763)
* simplify major sync detection * fix typos * fix merge * more realistic EthTester * add new synced state * remove Blocks synced state * move is_major_importing to rpc crate and check queue * add tests
This commit is contained in:
parent
236fb82886
commit
866ab9c7a3
@ -85,7 +85,7 @@ impl<R: URLHint> ContentFetcher<R> {
|
|||||||
// fallback to resolver
|
// fallback to resolver
|
||||||
if let Ok(content_id) = content_id.from_hex() {
|
if let Ok(content_id) = content_id.from_hex() {
|
||||||
// if app_id is valid, but we are syncing always return true.
|
// if app_id is valid, but we are syncing always return true.
|
||||||
if self.sync.is_major_syncing() {
|
if self.sync.is_major_importing() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// else try to resolve the app_id
|
// else try to resolve the app_id
|
||||||
@ -99,7 +99,7 @@ impl<R: URLHint> ContentFetcher<R> {
|
|||||||
let mut cache = self.cache.lock();
|
let mut cache = self.cache.lock();
|
||||||
let content_id = path.app_id.clone();
|
let content_id = path.app_id.clone();
|
||||||
|
|
||||||
if self.sync.is_major_syncing() {
|
if self.sync.is_major_importing() {
|
||||||
return Box::new(ContentHandler::error(
|
return Box::new(ContentHandler::error(
|
||||||
StatusCode::ServiceUnavailable,
|
StatusCode::ServiceUnavailable,
|
||||||
"Sync In Progress",
|
"Sync In Progress",
|
||||||
|
@ -95,11 +95,11 @@ static DAPPS_DOMAIN : &'static str = ".parity";
|
|||||||
/// Indicates sync status
|
/// Indicates sync status
|
||||||
pub trait SyncStatus: Send + Sync {
|
pub trait SyncStatus: Send + Sync {
|
||||||
/// Returns true if there is a major sync happening.
|
/// Returns true if there is a major sync happening.
|
||||||
fn is_major_syncing(&self) -> bool;
|
fn is_major_importing(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> SyncStatus for F where F: Fn() -> bool + Send + Sync {
|
impl<F> SyncStatus for F where F: Fn() -> bool + Send + Sync {
|
||||||
fn is_major_syncing(&self) -> bool { self() }
|
fn is_major_importing(&self) -> bool { self() }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Webapps HTTP+RPC server build.
|
/// Webapps HTTP+RPC server build.
|
||||||
|
@ -46,7 +46,7 @@ use transaction::{LocalizedTransaction, SignedTransaction, Action};
|
|||||||
use blockchain::extras::TransactionAddress;
|
use blockchain::extras::TransactionAddress;
|
||||||
use types::filter::Filter;
|
use types::filter::Filter;
|
||||||
use log_entry::LocalizedLogEntry;
|
use log_entry::LocalizedLogEntry;
|
||||||
use verification::queue::{BlockQueue, QueueInfo as BlockQueueInfo};
|
use verification::queue::BlockQueue;
|
||||||
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
|
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
|
||||||
use client::{
|
use client::{
|
||||||
BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient,
|
BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient,
|
||||||
@ -71,6 +71,7 @@ use state_db::StateDB;
|
|||||||
pub use types::blockchain_info::BlockChainInfo;
|
pub use types::blockchain_info::BlockChainInfo;
|
||||||
pub use types::block_status::BlockStatus;
|
pub use types::block_status::BlockStatus;
|
||||||
pub use blockchain::CacheSize as BlockChainCacheSize;
|
pub use blockchain::CacheSize as BlockChainCacheSize;
|
||||||
|
pub use verification::queue::QueueInfo as BlockQueueInfo;
|
||||||
|
|
||||||
const MAX_TX_QUEUE_SIZE: usize = 4096;
|
const MAX_TX_QUEUE_SIZE: usize = 4096;
|
||||||
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
|
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
|
||||||
|
@ -30,7 +30,7 @@ use std::sync::Arc;
|
|||||||
trait Oracle: Send + Sync {
|
trait Oracle: Send + Sync {
|
||||||
fn to_number(&self, hash: H256) -> Option<u64>;
|
fn to_number(&self, hash: H256) -> Option<u64>;
|
||||||
|
|
||||||
fn is_major_syncing(&self) -> bool;
|
fn is_major_importing(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct StandardOracle<F> where F: 'static + Send + Sync + Fn() -> bool {
|
struct StandardOracle<F> where F: 'static + Send + Sync + Fn() -> bool {
|
||||||
@ -45,7 +45,7 @@ impl<F> Oracle for StandardOracle<F>
|
|||||||
self.client.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number())
|
self.client.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_major_syncing(&self) -> bool {
|
fn is_major_importing(&self) -> bool {
|
||||||
(self.sync_status)()
|
(self.sync_status)()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -108,7 +108,7 @@ impl ChainNotify for Watcher {
|
|||||||
_: Vec<H256>,
|
_: Vec<H256>,
|
||||||
_duration: u64)
|
_duration: u64)
|
||||||
{
|
{
|
||||||
if self.oracle.is_major_syncing() { return }
|
if self.oracle.is_major_importing() { return }
|
||||||
|
|
||||||
trace!(target: "snapshot_watcher", "{} imported", imported.len());
|
trace!(target: "snapshot_watcher", "{} imported", imported.len());
|
||||||
|
|
||||||
@ -143,7 +143,7 @@ mod tests {
|
|||||||
self.0.get(&hash).cloned()
|
self.0.get(&hash).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_major_syncing(&self) -> bool { false }
|
fn is_major_importing(&self) -> bool { false }
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TestBroadcast(Option<u64>);
|
struct TestBroadcast(Option<u64>);
|
||||||
|
@ -108,6 +108,7 @@ mod server {
|
|||||||
use ethcore::client::{Client, BlockChainClient, BlockID};
|
use ethcore::client::{Client, BlockChainClient, BlockID};
|
||||||
|
|
||||||
use rpc_apis;
|
use rpc_apis;
|
||||||
|
use ethcore_rpc::is_major_importing;
|
||||||
use ethcore_dapps::ContractClient;
|
use ethcore_dapps::ContractClient;
|
||||||
|
|
||||||
pub use ethcore_dapps::Server as WebappServer;
|
pub use ethcore_dapps::Server as WebappServer;
|
||||||
@ -127,7 +128,8 @@ mod server {
|
|||||||
Arc::new(Registrar { client: deps.client.clone() })
|
Arc::new(Registrar { client: deps.client.clone() })
|
||||||
);
|
);
|
||||||
let sync = deps.sync.clone();
|
let sync = deps.sync.clone();
|
||||||
server.with_sync_status(Arc::new(move || sync.status().is_major_syncing()));
|
let client = deps.client.clone();
|
||||||
|
server.with_sync_status(Arc::new(move || is_major_importing(Some(sync.status().state), client.queue_info())));
|
||||||
server.with_signer_port(signer_port);
|
server.with_signer_port(signer_port);
|
||||||
|
|
||||||
let server = rpc_apis::setup_rpc(server, deps.apis.clone(), rpc_apis::ApiSet::UnsafeContext);
|
let server = rpc_apis::setup_rpc(server, deps.apis.clone(), rpc_apis::ApiSet::UnsafeContext);
|
||||||
|
@ -29,6 +29,7 @@ use ethcore::views::BlockView;
|
|||||||
use ethcore::snapshot::service::Service as SnapshotService;
|
use ethcore::snapshot::service::Service as SnapshotService;
|
||||||
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
|
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
|
||||||
use number_prefix::{binary_prefix, Standalone, Prefixed};
|
use number_prefix::{binary_prefix, Standalone, Prefixed};
|
||||||
|
use ethcore_rpc::is_major_importing;
|
||||||
|
|
||||||
pub struct Informant {
|
pub struct Informant {
|
||||||
chain_info: RwLock<Option<BlockChainInfo>>,
|
chain_info: RwLock<Option<BlockChainInfo>>,
|
||||||
@ -95,7 +96,7 @@ impl Informant {
|
|||||||
let network_config = self.net.as_ref().map(|n| n.network_config());
|
let network_config = self.net.as_ref().map(|n| n.network_config());
|
||||||
let sync_status = self.sync.as_ref().map(|s| s.status());
|
let sync_status = self.sync.as_ref().map(|s| s.status());
|
||||||
|
|
||||||
let importing = self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing());
|
let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info());
|
||||||
let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s|
|
let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s|
|
||||||
match s.status() {
|
match s.status() {
|
||||||
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
|
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
|
||||||
@ -174,7 +175,8 @@ impl Informant {
|
|||||||
impl ChainNotify for Informant {
|
impl ChainNotify for Informant {
|
||||||
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, duration: u64) {
|
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, duration: u64) {
|
||||||
let mut last_import = self.last_import.lock();
|
let mut last_import = self.last_import.lock();
|
||||||
let importing = self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing());
|
let sync_state = self.sync.as_ref().map(|s| s.status().state);
|
||||||
|
let importing = is_major_importing(sync_state, self.client.queue_info());
|
||||||
if Instant::now() > *last_import + Duration::from_secs(1) && !importing {
|
if Instant::now() > *last_import + Duration::from_secs(1) && !importing {
|
||||||
if let Some(block) = imported.last().and_then(|h| self.client.block(BlockID::Hash(*h))) {
|
if let Some(block) = imported.last().and_then(|h| self.client.block(BlockID::Hash(*h))) {
|
||||||
let view = BlockView::new(&block);
|
let view = BlockView::new(&block);
|
||||||
|
@ -18,11 +18,11 @@ use std::sync::{Arc, Mutex, Condvar};
|
|||||||
use ctrlc::CtrlC;
|
use ctrlc::CtrlC;
|
||||||
use fdlimit::raise_fd_limit;
|
use fdlimit::raise_fd_limit;
|
||||||
use ethcore_logger::{Config as LogConfig, setup_log};
|
use ethcore_logger::{Config as LogConfig, setup_log};
|
||||||
use ethcore_rpc::NetworkSettings;
|
use ethcore_rpc::{NetworkSettings, is_major_importing};
|
||||||
use ethsync::NetworkConfiguration;
|
use ethsync::NetworkConfiguration;
|
||||||
use util::{Colour, version, U256};
|
use util::{Colour, version, U256};
|
||||||
use io::{MayPanic, ForwardPanic, PanicHandler};
|
use io::{MayPanic, ForwardPanic, PanicHandler};
|
||||||
use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, ChainNotify};
|
use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, ChainNotify, BlockChainClient};
|
||||||
use ethcore::service::ClientService;
|
use ethcore::service::ClientService;
|
||||||
use ethcore::account_provider::AccountProvider;
|
use ethcore::account_provider::AccountProvider;
|
||||||
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
|
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
|
||||||
@ -315,7 +315,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
|
|||||||
let sync = sync_provider.clone();
|
let sync = sync_provider.clone();
|
||||||
let watcher = Arc::new(snapshot::Watcher::new(
|
let watcher = Arc::new(snapshot::Watcher::new(
|
||||||
service.client(),
|
service.client(),
|
||||||
move || ::ethsync::SyncProvider::status(&*sync).is_major_syncing(),
|
move || is_major_importing(Some(sync.status().state), client.queue_info()),
|
||||||
service.io().channel(),
|
service.io().channel(),
|
||||||
SNAPSHOT_PERIOD,
|
SNAPSHOT_PERIOD,
|
||||||
SNAPSHOT_HISTORY,
|
SNAPSHOT_HISTORY,
|
||||||
|
@ -56,6 +56,7 @@ use self::jsonrpc_core::{IoHandler, IoDelegate};
|
|||||||
pub use jsonrpc_http_server::{ServerBuilder, Server, RpcServerError};
|
pub use jsonrpc_http_server::{ServerBuilder, Server, RpcServerError};
|
||||||
pub mod v1;
|
pub mod v1;
|
||||||
pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings};
|
pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings};
|
||||||
|
pub use v1::block_import::is_major_importing;
|
||||||
|
|
||||||
/// An object that can be extended with `IoDelegates`
|
/// An object that can be extended with `IoDelegates`
|
||||||
pub trait Extendable {
|
pub trait Extendable {
|
||||||
|
60
rpc/src/v1/helpers/block_import.rs
Normal file
60
rpc/src/v1/helpers/block_import.rs
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Block import analysis functions.
|
||||||
|
|
||||||
|
use ethcore::client::BlockQueueInfo;
|
||||||
|
use ethsync::SyncState;
|
||||||
|
|
||||||
|
/// Check if client is during major sync or during block import.
|
||||||
|
pub fn is_major_importing(sync_state: Option<SyncState>, queue_info: BlockQueueInfo) -> bool {
|
||||||
|
let is_syncing_state = sync_state.map_or(false, |s|
|
||||||
|
s != SyncState::Idle && s != SyncState::NewBlocks
|
||||||
|
);
|
||||||
|
let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
|
||||||
|
is_verifying || is_syncing_state
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use ethcore::client::BlockQueueInfo;
|
||||||
|
use ethsync::SyncState;
|
||||||
|
use super::is_major_importing;
|
||||||
|
|
||||||
|
|
||||||
|
fn queue_info(unverified: usize, verified: usize) -> BlockQueueInfo {
|
||||||
|
BlockQueueInfo {
|
||||||
|
unverified_queue_size: unverified,
|
||||||
|
verified_queue_size: verified,
|
||||||
|
verifying_queue_size: 0,
|
||||||
|
max_queue_size: 1000,
|
||||||
|
max_mem_use: 1000,
|
||||||
|
mem_used: 500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_still_verifying() {
|
||||||
|
assert!(!is_major_importing(None, queue_info(2, 1)));
|
||||||
|
assert!(is_major_importing(None, queue_info(2, 2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_synced_state() {
|
||||||
|
assert!(is_major_importing(Some(SyncState::Blocks), queue_info(0, 0)));
|
||||||
|
assert!(!is_major_importing(Some(SyncState::Idle), queue_info(0, 0)));
|
||||||
|
}
|
||||||
|
}
|
@ -22,6 +22,7 @@ pub mod errors;
|
|||||||
|
|
||||||
pub mod dispatch;
|
pub mod dispatch;
|
||||||
pub mod params;
|
pub mod params;
|
||||||
|
pub mod block_import;
|
||||||
|
|
||||||
mod poll_manager;
|
mod poll_manager;
|
||||||
mod poll_filter;
|
mod poll_filter;
|
||||||
|
@ -49,6 +49,7 @@ use v1::types::{
|
|||||||
};
|
};
|
||||||
use v1::helpers::{CallRequest as CRequest, errors, limit_logs};
|
use v1::helpers::{CallRequest as CRequest, errors, limit_logs};
|
||||||
use v1::helpers::dispatch::{default_gas_price, dispatch_transaction};
|
use v1::helpers::dispatch::{default_gas_price, dispatch_transaction};
|
||||||
|
use v1::helpers::block_import::is_major_importing;
|
||||||
use v1::helpers::auto_args::Trailing;
|
use v1::helpers::auto_args::Trailing;
|
||||||
|
|
||||||
/// Eth RPC options
|
/// Eth RPC options
|
||||||
@ -254,8 +255,9 @@ impl<C, S: ?Sized, M, EM> Eth for EthClient<C, S, M, EM> where
|
|||||||
fn syncing(&self) -> Result<SyncStatus, Error> {
|
fn syncing(&self) -> Result<SyncStatus, Error> {
|
||||||
try!(self.active());
|
try!(self.active());
|
||||||
let status = take_weak!(self.sync).status();
|
let status = take_weak!(self.sync).status();
|
||||||
if status.is_major_syncing() {
|
let client = take_weak!(self.client);
|
||||||
let current_block = U256::from(take_weak!(self.client).chain_info().best_block_number);
|
if is_major_importing(Some(status.state), client.queue_info()) {
|
||||||
|
let current_block = U256::from(client.chain_info().best_block_number);
|
||||||
let highest_block = U256::from(status.highest_block_number.unwrap_or(status.start_block_number));
|
let highest_block = U256::from(status.highest_block_number.unwrap_or(status.start_block_number));
|
||||||
let info = SyncInfo {
|
let info = SyncInfo {
|
||||||
starting_block: status.start_block_number.into(),
|
starting_block: status.start_block_number.into(),
|
||||||
|
@ -28,4 +28,4 @@ pub mod types;
|
|||||||
|
|
||||||
pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Personal, PersonalSigner, Net, Ethcore, EthcoreSet, Traces, Rpc};
|
pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Personal, PersonalSigner, Net, Ethcore, EthcoreSet, Traces, Rpc};
|
||||||
pub use self::impls::*;
|
pub use self::impls::*;
|
||||||
pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings};
|
pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import};
|
||||||
|
@ -202,17 +202,6 @@ pub struct SyncStatus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SyncStatus {
|
impl SyncStatus {
|
||||||
/// Indicates if initial sync is still in progress.
|
|
||||||
pub fn is_major_syncing(&self) -> bool {
|
|
||||||
let is_synced_state = match self.state {
|
|
||||||
SyncState::Idle | SyncState::NewBlocks | SyncState::Blocks => true,
|
|
||||||
_ => false,
|
|
||||||
};
|
|
||||||
let is_current_block = self.highest_block_number.unwrap_or(self.start_block_number) < self.last_imported_block_number.unwrap_or(0) + BlockNumber::from(4u64);
|
|
||||||
// If not synced then is major syncing.
|
|
||||||
!(is_synced_state && is_current_block)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Indicates if snapshot download is in progress
|
/// Indicates if snapshot download is in progress
|
||||||
pub fn is_snapshot_syncing(&self) -> bool {
|
pub fn is_snapshot_syncing(&self) -> bool {
|
||||||
self.state == SyncState::SnapshotManifest
|
self.state == SyncState::SnapshotManifest
|
||||||
|
Loading…
Reference in New Issue
Block a user