From fba63de974afc719b8280acb62f4071f4ce9081d Mon Sep 17 00:00:00 2001 From: Seun LanLege Date: Tue, 2 Apr 2019 16:13:55 +0100 Subject: [PATCH] RPC: Implements eth_subscribe("syncing") (#10311) * implement eth_subscibe syncing * fix imports * add is_major_syncing to SyncProvider * add eth_subscribe(syncing) support for light clients * tests * fix TestSyncClient * correct LightFetch impl * added currentBlock, startingBlock, highestBlock to PubSubSyncStatus * fixed tests * fix PR grumbles * improve code style * is_syncing -> syncing * code style * use ethereum types --- Cargo.lock | 2 + ethcore/light/src/client/mod.rs | 16 ++++--- ethcore/src/client/client.rs | 8 ++-- ethcore/src/client/test_client.rs | 26 +++++------ ethcore/src/client/traits.rs | 6 +-- ethcore/sync/Cargo.toml | 2 + ethcore/sync/src/api.rs | 52 ++++++++++++++++++++- ethcore/sync/src/chain/mod.rs | 56 ++++++++++++++++++----- ethcore/sync/src/lib.rs | 2 + ethcore/sync/src/light_sync/mod.rs | 32 +++++++++++++ parity/informant.rs | 2 +- parity/modules.rs | 3 ++ parity/rpc_apis.rs | 35 +++++++++++++- parity/run.rs | 6 +-- rpc/src/lib.rs | 3 +- rpc/src/v1/helpers/block_import.rs | 35 -------------- rpc/src/v1/helpers/light_fetch.rs | 1 - rpc/src/v1/impls/eth.rs | 3 +- rpc/src/v1/impls/eth_pubsub.rs | 50 ++++++++++++++++++-- rpc/src/v1/impls/parity.rs | 4 +- rpc/src/v1/mod.rs | 1 + rpc/src/v1/tests/helpers/sync_provider.rs | 20 +++++++- rpc/src/v1/tests/mocked/eth_pubsub.rs | 5 +- rpc/src/v1/types/pubsub.rs | 11 +++++ secret-store/src/trusted_client.rs | 4 +- updater/src/updater.rs | 4 +- 26 files changed, 289 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37af7f020..6dff31936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1142,6 +1142,7 @@ dependencies = [ "ethkey 0.3.0", "ethstore 0.2.1", "fastmap 0.1.0", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "hash-db 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)", "keccak-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1152,6 +1153,7 @@ dependencies = [ "macros 0.1.0", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-runtime 0.1.0", "rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index 8205ae2ab..d20617f4f 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -34,6 +34,7 @@ use common_types::blockchain_info::BlockChainInfo; use common_types::encoded; use common_types::header::Header; use common_types::ids::BlockId; +use common_types::verification_queue_info::VerificationQueueInfo as BlockQueueInfo; use kvdb::KeyValueDB; @@ -91,6 +92,9 @@ pub trait LightChainClient: Send + Sync { /// Attempt to get a block hash by block id. fn block_hash(&self, id: BlockId) -> Option; + /// Get block queue information. + fn queue_info(&self) -> BlockQueueInfo; + /// Attempt to get block header by block id. fn block_header(&self, id: BlockId) -> Option; @@ -125,9 +129,6 @@ pub trait LightChainClient: Send + Sync { /// Flush the queue. fn flush_queue(&self); - /// Get queue info. - fn queue_info(&self) -> queue::QueueInfo; - /// Get the `i`th CHT root. fn cht_root(&self, i: usize) -> Option; @@ -534,6 +535,7 @@ impl Client { } } + impl LightChainClient for Client { fn add_listener(&self, listener: Weak) { Client::add_listener(self, listener) @@ -541,6 +543,10 @@ impl LightChainClient for Client { fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) } + fn queue_info(&self) -> queue::QueueInfo { + self.queue.queue_info() + } + fn queue_header(&self, header: Header) -> EthcoreResult { self.import_header(header) } @@ -600,10 +606,6 @@ impl LightChainClient for Client { Client::flush_queue(self); } - fn queue_info(&self) -> queue::QueueInfo { - self.queue.queue_info() - } - fn cht_root(&self, i: usize) -> Option { Client::cht_root(self, i) } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 0b07d7103..8e151e125 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1672,6 +1672,10 @@ impl BlockChainClient for Client { r } + fn queue_info(&self) -> BlockQueueInfo { + self.importer.block_queue.queue_info() + } + fn disable(&self) { self.set_mode(Mode::Off); self.enabled.store(false, AtomicOrdering::Relaxed); @@ -1934,10 +1938,6 @@ impl BlockChainClient for Client { self.chain.read().block_receipts(hash) } - fn queue_info(&self) -> BlockQueueInfo { - self.importer.block_queue.queue_info() - } - fn is_queue_empty(&self) -> bool { self.importer.block_queue.is_empty() } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 502a9ee72..1ae1a501c 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -56,7 +56,7 @@ use client::{ TransactionId, UncleId, TraceId, TraceFilter, LastHashes, CallAnalytics, ProvingBlockChainClient, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock, Call, StateClient, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, IoClient, - BadBlocks, + BadBlocks }; use engines::EthEngine; use error::{Error, EthcoreResult}; @@ -68,7 +68,7 @@ use spec::Spec; use state::StateInfo; use state_db::StateDB; use trace::LocalizedTrace; -use verification::queue::QueueInfo; +use verification::queue::QueueInfo as BlockQueueInfo; use verification::queue::kind::blocks::Unverified; /// Test client. @@ -649,6 +649,17 @@ impl BlockChainClient for TestBlockChainClient { self.execution_result.read().clone().unwrap() } + fn queue_info(&self) -> BlockQueueInfo { + BlockQueueInfo { + verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed), + unverified_queue_size: 0, + verifying_queue_size: 0, + max_queue_size: 0, + max_mem_use: 0, + mem_used: 0, + } + } + fn replay_block_transactions(&self, _block: BlockId, _analytics: CallAnalytics) -> Result>, CallError> { Ok(Box::new(self.traces.read().clone().unwrap().into_iter().map(|t| t.transaction_hash.unwrap_or(H256::new())).zip(self.execution_result.read().clone().unwrap().into_iter()))) } @@ -817,17 +828,6 @@ impl BlockChainClient for TestBlockChainClient { None } - fn queue_info(&self) -> QueueInfo { - QueueInfo { - verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed), - unverified_queue_size: 0, - verifying_queue_size: 0, - max_queue_size: 0, - max_mem_use: 0, - mem_used: 0, - } - } - fn clear_queue(&self) { } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 8e4abc01c..bcfa7417c 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -237,6 +237,9 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra .expect("code will return Some if given BlockId::Latest; qed") } + /// Get block queue information. + fn queue_info(&self) -> BlockQueueInfo; + /// Get address code hash at given block's state. /// Get value of the storage at given position at the given block's state. @@ -285,9 +288,6 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra /// Get block receipts data by block header hash. fn block_receipts(&self, hash: &H256) -> Option; - /// Get block queue information. - fn queue_info(&self) -> BlockQueueInfo; - /// Returns true if block queue is empty. fn is_queue_empty(&self) -> bool { self.queue_info().is_empty() diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index bc7a80e67..3cbb5b983 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -32,6 +32,8 @@ rand = "0.4" rlp = { version = "0.3.0", features = ["ethereum"] } trace-time = "0.1" triehash-ethereum = {version = "0.2", path = "../../util/triehash-ethereum" } +futures = "0.1" +parity-runtime = { path = "../../util/runtime" } [dev-dependencies] env_logger = "0.5" diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 4a66f468d..ddb7542b7 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -28,6 +28,8 @@ use network::client_version::ClientVersion; use types::pruning_info::PruningInfo; use ethereum_types::{H256, H512, U256}; +use futures::sync::mpsc as futures_mpsc; +use futures::Stream; use io::{TimerToken}; use ethkey::Secret; use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainMessageType}; @@ -39,7 +41,7 @@ use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, SyncState}; use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket}; use light::client::AsLightClient; use light::Provider; @@ -47,6 +49,8 @@ use light::net::{ self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext, SampleStore, }; +use parity_runtime::Executor; +use std::sync::atomic::{AtomicBool, Ordering}; use network::IpFilter; use private_tx::PrivateTxHandler; use types::transaction::UnverifiedTransaction; @@ -131,6 +135,9 @@ impl Default for SyncConfig { } } +/// receiving end of a futures::mpsc channel +pub type Notification = futures_mpsc::UnboundedReceiver; + /// Current sync status pub trait SyncProvider: Send + Sync { /// Get sync status @@ -142,8 +149,14 @@ pub trait SyncProvider: Send + Sync { /// Get the enode if available. fn enode(&self) -> Option; + /// gets sync status notifications + fn sync_notification(&self) -> Notification; + /// Returns propagation count for pending transactions. fn transactions_stats(&self) -> BTreeMap; + + /// are we in the middle of a major sync? + fn is_major_syncing(&self) -> bool; } /// Transaction stats @@ -266,6 +279,8 @@ impl PriorityTask { pub struct Params { /// Configuration. pub config: SyncConfig, + /// Runtime executor + pub executor: Executor, /// Blockchain client. pub chain: Arc, /// Snapshot service. @@ -296,6 +311,8 @@ pub struct EthSync { light_subprotocol_name: [u8; 3], /// Priority tasks notification channel priority_tasks: Mutex>, + /// for state tracking + is_major_syncing: Arc } fn light_params( @@ -355,6 +372,30 @@ impl EthSync { params.private_tx_handler.as_ref().cloned(), priority_tasks_rx, ); + + let is_major_syncing = Arc::new(AtomicBool::new(false)); + + { + // spawn task that constantly updates EthSync.is_major_sync + let notifications = sync.write().sync_notifications(); + let moved_client = Arc::downgrade(¶ms.chain); + let moved_is_major_syncing = is_major_syncing.clone(); + + params.executor.spawn(notifications.for_each(move |sync_status| { + if let Some(queue_info) = moved_client.upgrade().map(|client| client.queue_info()) { + let is_syncing_state = match sync_status { + SyncState::Idle | SyncState::NewBlocks => false, + _ => true + }; + let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; + moved_is_major_syncing.store(is_verifying || is_syncing_state, Ordering::SeqCst); + return Ok(()) + } + + // client has been dropped + return Err(()) + })); + } let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?; let sync = Arc::new(EthSync { @@ -370,6 +411,7 @@ impl EthSync { light_subprotocol_name: params.config.light_subprotocol_name, attached_protos: params.attached_protos, priority_tasks: Mutex::new(priority_tasks_tx), + is_major_syncing }); Ok(sync) @@ -420,6 +462,14 @@ impl SyncProvider for EthSync { fn transactions_stats(&self) -> BTreeMap { self.eth_handler.sync.transactions_stats() } + + fn sync_notification(&self) -> Notification { + self.eth_handler.sync.write().sync_notifications() + } + + fn is_major_syncing(&self) -> bool { + self.is_major_syncing.load(Ordering::SeqCst) + } } const PEERS_TIMER: TimerToken = 0; diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 81f1ccffe..ecbc27db7 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -99,6 +99,8 @@ use std::cmp; use std::time::{Duration, Instant}; use hash::keccak; use heapsize::HeapSizeOf; +use futures::sync::mpsc as futures_mpsc; +use api::Notification; use ethereum_types::{H256, U256}; use fastmap::{H256FastMap, H256FastSet}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; @@ -618,6 +620,8 @@ pub struct ChainSync { private_tx_handler: Option>, /// Enable warp sync. warp_sync: WarpSync, + + status_sinks: Vec> } impl ChainSync { @@ -649,6 +653,7 @@ impl ChainSync { transactions_stats: TransactionsStats::default(), private_tx_handler, warp_sync: config.warp_sync, + status_sinks: Vec::new() }; sync.update_targets(chain); sync @@ -707,6 +712,29 @@ impl ChainSync { self.peers.clear(); } + /// returns the receiving end of a future::mpsc channel that can + /// be polled for changes to node's SyncState. + pub fn sync_notifications(&mut self) -> Notification { + let (sender, receiver) = futures_mpsc::unbounded(); + self.status_sinks.push(sender); + receiver + } + + /// notify all subscibers of a new SyncState + fn notify_sync_state(&mut self, state: SyncState) { + // remove any sender whose receiving end has been dropped + self.status_sinks.retain(|sender| { + sender.unbounded_send(state).is_ok() + }); + } + + /// sets a new SyncState + fn set_state(&mut self, state: SyncState) { + self.notify_sync_state(state); + + self.state = state; + } + /// Reset sync. Clear all downloaded data but keep the queue. /// Set sync state to the given state or to the initial state if `None` is provided. fn reset(&mut self, io: &mut SyncIo, state: Option) { @@ -721,7 +749,10 @@ impl ChainSync { } } } - self.state = state.unwrap_or_else(|| Self::get_init_state(self.warp_sync, io.chain())); + + let warp_sync = self.warp_sync; + + self.set_state(state.unwrap_or_else(|| Self::get_init_state(warp_sync, io.chain()))); // Reactivate peers only if some progress has been made // since the last sync round of if starting fresh. self.active_peers = self.peers.keys().cloned().collect(); @@ -808,7 +839,7 @@ impl ChainSync { } } else if timeout && !self.warp_sync.is_warp_only() { trace!(target: "sync", "No snapshots found, starting full sync"); - self.state = SyncState::Idle; + self.set_state(SyncState::Idle); self.continue_sync(io); } } @@ -820,10 +851,10 @@ impl ChainSync { SyncRequester::request_snapshot_manifest(self, io, *p); } } - self.state = SyncState::SnapshotManifest; + self.set_state(SyncState::SnapshotManifest); trace!(target: "sync", "New snapshot sync with {:?}", peers); } else { - self.state = SyncState::SnapshotData; + self.set_state(SyncState::SnapshotData); trace!(target: "sync", "Resumed snapshot sync with {:?}", peers); } } @@ -904,7 +935,7 @@ impl ChainSync { /// Enter waiting state fn pause_sync(&mut self) { trace!(target: "sync", "Block queue full, pausing sync"); - self.state = SyncState::Waiting; + self.set_state(SyncState::Waiting); } /// Find something to do for a peer. Called for a new peer or when a peer is done with its task. @@ -955,7 +986,7 @@ impl ChainSync { if let Some(request) = self.new_blocks.request_blocks(peer_id, io, num_active_peers) { SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::NewBlocks); if self.state == SyncState::Idle { - self.state = SyncState::Blocks; + self.set_state(SyncState::Blocks); } return; } @@ -987,7 +1018,7 @@ impl ChainSync { self.snapshot.initialize(io.snapshot_service()); if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { trace!(target: "sync", "Snapshot queue full, pausing sync"); - self.state = SyncState::SnapshotWaiting; + self.set_state(SyncState::SnapshotWaiting); return; } }, @@ -1171,12 +1202,12 @@ impl ChainSync { fn check_resume(&mut self, io: &mut SyncIo) { match self.state { SyncState::Waiting if !io.chain().queue_info().is_full() => { - self.state = SyncState::Blocks; + self.set_state(SyncState::Blocks); self.continue_sync(io); }, SyncState::SnapshotData => match io.snapshot_service().status() { RestorationStatus::Inactive | RestorationStatus::Failed => { - self.state = SyncState::SnapshotWaiting; + self.set_state(SyncState::SnapshotWaiting); }, RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (), }, @@ -1192,13 +1223,13 @@ impl ChainSync { RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => { if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { trace!(target:"sync", "Resuming snapshot sync"); - self.state = SyncState::SnapshotData; + self.set_state(SyncState::SnapshotData); self.continue_sync(io); } }, RestorationStatus::Failed => { trace!(target: "sync", "Snapshot restoration aborted"); - self.state = SyncState::WaitingPeers; + self.set_state(SyncState::WaitingPeers); self.snapshot.clear(); self.continue_sync(io); }, @@ -1421,7 +1452,8 @@ pub mod tests { } pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync { - let mut sync = ChainSync::new(SyncConfig::default(), client, None); + + let mut sync = ChainSync::new(SyncConfig::default(), client, None,); insert_dummy_peer(&mut sync, 0, peer_latest_hash); sync } diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index 8a1e19569..712a47801 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -32,10 +32,12 @@ extern crate ethstore; extern crate fastmap; extern crate keccak_hash as hash; extern crate parity_bytes as bytes; +extern crate parity_runtime; extern crate parking_lot; extern crate rand; extern crate rlp; extern crate triehash_ethereum; +extern crate futures; extern crate ethcore_light as light; diff --git a/ethcore/sync/src/light_sync/mod.rs b/ethcore/sync/src/light_sync/mod.rs index dae05c318..c9406b3bc 100644 --- a/ethcore/sync/src/light_sync/mod.rs +++ b/ethcore/sync/src/light_sync/mod.rs @@ -45,13 +45,16 @@ use light::net::{ EventContext, Capabilities, ReqId, Status, Error as NetError, }; +use chain::SyncState as ChainSyncState; use light::request::{self, CompleteHeadersRequest as HeadersRequest}; use network::PeerId; use ethereum_types::{H256, U256}; use parking_lot::{Mutex, RwLock}; use rand::{Rng, OsRng}; +use futures::sync::mpsc; use self::sync_round::{AbortReason, SyncRound, ResponseContext}; +use api::Notification; mod response; mod sync_round; @@ -275,6 +278,7 @@ pub struct LightSync { client: Arc, rng: Mutex, state: Mutex, + senders: RwLock>>, // We duplicate this state tracking to avoid deadlocks in `is_major_importing`. is_idle: Mutex, } @@ -454,9 +458,21 @@ impl LightSync { /// Sets the LightSync's state, and update /// `is_idle` fn set_state(&self, state: &mut SyncStateWrapper, next_state: SyncState) { + + match next_state { + SyncState::Idle => self.notify_senders(ChainSyncState::Idle), + _ => self.notify_senders(ChainSyncState::Blocks) + }; + state.set(next_state, &mut self.is_idle.lock()); } + fn notify_senders(&self, state: ChainSyncState) { + self.senders.write().retain(|sender| { + sender.unbounded_send(state).is_ok() + }) + } + // Begins a search for the common ancestor and our best block. // does not lock state, instead has a mutable reference to it passed. fn begin_search(&self, state: &mut SyncStateWrapper) { @@ -667,6 +683,14 @@ impl LightSync { self.set_state(&mut state, next_state); } } + + // returns receiving end of futures::mpsc::unbounded channel + // poll the channel for changes to sync state. + fn sync_notification(&self) -> Notification { + let (sender, receiver) = futures::sync::mpsc::unbounded(); + self.senders.write().push(sender); + receiver + } } // public API @@ -683,6 +707,7 @@ impl LightSync { pending_reqs: Mutex::new(HashMap::new()), client: client, rng: Mutex::new(OsRng::new()?), + senders: RwLock::new(Vec::new()), state: Mutex::new(SyncStateWrapper::idle()), is_idle: Mutex::new(true), }) @@ -699,6 +724,10 @@ pub trait SyncInfo { /// Whether major sync is underway. fn is_major_importing(&self) -> bool; + + /// returns the receieving end of a futures::mpsc unbounded channel + /// poll the channel for changes to sync state + fn sync_notification(&self) -> Notification; } impl SyncInfo for LightSync { @@ -720,4 +749,7 @@ impl SyncInfo for LightSync { is_verifying || is_syncing } + fn sync_notification(&self) -> Notification { + self.sync_notification() + } } diff --git a/parity/informant.rs b/parity/informant.rs index 78d055686..e0ba66f07 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -24,7 +24,7 @@ use std::time::{Instant, Duration}; use atty; use ethcore::client::{ - BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo, + BlockId, ChainInfo, BlockInfo, BlockChainInfo, BlockChainClient, BlockQueueInfo, ChainNotify, NewBlocks, ClientReport, Client, ClientIoMessage }; use types::BlockNumber; diff --git a/parity/modules.rs b/parity/modules.rs index 9f5d25a11..d5dc3a0de 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -20,6 +20,7 @@ use ethcore::client::BlockChainClient; use sync::{self, AttachedProtocol, SyncConfig, NetworkConfiguration, Params, ConnectionFilter}; use ethcore::snapshot::SnapshotService; use light::Provider; +use parity_runtime::Executor; pub use sync::{EthSync, SyncProvider, ManageNetwork, PrivateTxHandler}; pub use ethcore::client::ChainNotify; @@ -34,6 +35,7 @@ pub type SyncModules = ( pub fn sync( config: SyncConfig, + executor: Executor, network_config: NetworkConfiguration, chain: Arc, snapshot_service: Arc, @@ -45,6 +47,7 @@ pub fn sync( ) -> Result { let eth_sync = EthSync::new(Params { config, + executor, chain, provider, snapshot_service, diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 288f85ab4..cc85af71f 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -25,6 +25,8 @@ use account_utils::{self, AccountProvider}; use ethcore::client::Client; use ethcore::miner::Miner; use ethcore::snapshot::SnapshotService; +use ethcore::client::BlockChainClient; +use sync::SyncState; use ethcore_logger::RotatingLogger; use ethcore_private_tx::Provider as PrivateTransactionManager; use ethcore_service::PrivateTxService; @@ -320,8 +322,22 @@ impl FullDependencies { } Api::EthPubSub => { if !for_generic_pubsub { - let client = + let mut client = EthPubSubClient::new(self.client.clone(), self.executor.clone()); + let weak_client = Arc::downgrade(&self.client); + + client.add_sync_notifier(self.sync.sync_notification(), move |state| { + let client = weak_client.upgrade()?; + let queue_info = client.queue_info(); + + let is_syncing_state = match state { SyncState::Idle | SyncState::NewBlocks => false, _ => true }; + let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; + + Some(PubSubSyncStatus { + syncing: is_verifying || is_syncing_state, + }) + }); + let h = client.handler(); self.miner .add_transactions_listener(Box::new(move |hashes| { @@ -553,7 +569,7 @@ impl LightDependencies { } } Api::EthPubSub => { - let client = EthPubSubClient::light( + let mut client = EthPubSubClient::light( self.client.clone(), self.on_demand.clone(), self.sync.clone(), @@ -561,6 +577,21 @@ impl LightDependencies { self.executor.clone(), self.gas_price_percentile, ); + + let weak_client = Arc::downgrade(&self.client); + + client.add_sync_notifier(self.sync.sync_notification(), move |state| { + let client = weak_client.upgrade()?; + let queue_info = client.queue_info(); + + let is_syncing_state = match state { SyncState::Idle | SyncState::NewBlocks => false, _ => true }; + let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; + + Some(PubSubSyncStatus { + syncing: is_verifying || is_syncing_state, + }) + }); + self.client.add_listener(client.handler() as Weak<_>); let h = client.handler(); self.transaction_queue diff --git a/parity/run.rs b/parity/run.rs index c4a3c7512..8160c930d 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -41,7 +41,7 @@ use node_filter::NodeFilter; use parity_runtime::Runtime; use sync::{self, SyncConfig, PrivateTxHandler}; use parity_rpc::{ - Origin, Metadata, NetworkSettings, informant, is_major_importing, PubSubSession, FutureResult, FutureResponse, FutureOutput + Origin, Metadata, NetworkSettings, informant, PubSubSession, FutureResult, FutureResponse, FutureOutput }; use updater::{UpdatePolicy, Updater}; use parity_version::version; @@ -661,6 +661,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: // create sync object let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( sync_config, + runtime.executor(), net_conf.clone().into(), client.clone(), snapshot_service.clone(), @@ -815,10 +816,9 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: true => None, false => { let sync = sync_provider.clone(); - let client = client.clone(); let watcher = Arc::new(snapshot::Watcher::new( service.client(), - move || is_major_importing(Some(sync.status().state), client.queue_info()), + move || sync.is_major_syncing(), service.io().channel(), SNAPSHOT_PERIOD, SNAPSHOT_HISTORY, diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index a537cb294..e34aab673 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -134,7 +134,8 @@ pub use http::{ }; pub use v1::{NetworkSettings, Metadata, Origin, informant, dispatch, signer}; -pub use v1::block_import::{is_major_importing, is_major_importing_or_waiting}; +pub use v1::block_import::{is_major_importing_or_waiting}; +pub use v1::PubSubSyncStatus; pub use v1::extractors::{RpcExtractor, WsExtractor, WsStats, WsDispatcher}; pub use authcodes::{AuthCodes, TimeProvider}; pub use http_common::HttpMetaExtractor; diff --git a/rpc/src/v1/helpers/block_import.rs b/rpc/src/v1/helpers/block_import.rs index 3fd5d9fff..8e8dba28f 100644 --- a/rpc/src/v1/helpers/block_import.rs +++ b/rpc/src/v1/helpers/block_import.rs @@ -30,38 +30,3 @@ pub fn is_major_importing_or_waiting(sync_state: Option, queue_info: let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; is_verifying || is_syncing_state } - -/// Check if client is during major sync or during block import. -pub fn is_major_importing(sync_state: Option, queue_info: BlockQueueInfo) -> bool { - is_major_importing_or_waiting(sync_state, queue_info, true) -} - -#[cfg(test)] -mod tests { - use ethcore::client::BlockQueueInfo; - use sync::SyncState; - use super::is_major_importing; - - fn queue_info(unverified: usize, verified: usize) -> BlockQueueInfo { - BlockQueueInfo { - unverified_queue_size: unverified, - verified_queue_size: verified, - verifying_queue_size: 0, - max_queue_size: 1000, - max_mem_use: 1000, - mem_used: 500 - } - } - - #[test] - fn is_still_verifying() { - assert!(!is_major_importing(None, queue_info(2, 1))); - assert!(is_major_importing(None, queue_info(2, 2))); - } - - #[test] - fn is_synced_state() { - assert!(is_major_importing(Some(SyncState::Blocks), queue_info(0, 0))); - assert!(!is_major_importing(Some(SyncState::Idle), queue_info(0, 0))); - } -} diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index e18695fcb..540772a5d 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -106,7 +106,6 @@ where } } - /// Extract a transaction at given index. pub fn extract_transaction_at_index(block: encoded::Block, index: usize) -> Option { block.transactions().into_iter().nth(index) diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 32f9c1c57..bb744367d 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -42,7 +42,6 @@ use jsonrpc_core::futures::future; use v1::helpers::{self, errors, limit_logs, fake_sign}; use v1::helpers::deprecated::{self, DeprecationNotice}; use v1::helpers::dispatch::{FullDispatcher, default_gas_price}; -use v1::helpers::block_import::is_major_importing; use v1::traits::Eth; use v1::types::{ RichBlock, Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, @@ -510,7 +509,7 @@ impl Eth for EthClient< _ => (false, None, None), }; - if warping || is_major_importing(Some(status.state), client.queue_info()) { + if warping || self.sync.is_major_syncing() { let chain_info = client.chain_info(); let current_block = U256::from(chain_info.best_block_number); let highest_block = U256::from(status.highest_block_number.unwrap_or(status.start_block_number)); diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 450728157..7dc860748 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -20,15 +20,17 @@ use std::sync::{Arc, Weak}; use std::collections::BTreeMap; use jsonrpc_core::{BoxFuture, Result, Error}; -use jsonrpc_core::futures::{self, Future, IntoFuture}; -use jsonrpc_pubsub::{SubscriptionId, typed::{Sink, Subscriber}}; +use jsonrpc_core::futures::{self, Future, IntoFuture, Stream}; +use jsonrpc_pubsub::typed::{Sink, Subscriber}; +use jsonrpc_pubsub::SubscriptionId; -use v1::helpers::{errors, limit_logs, Subscribers}; +use v1::helpers::{errors, limit_logs, Subscribers, }; use v1::helpers::light_fetch::LightFetch; use v1::metadata::Metadata; use v1::traits::EthPubSub; use v1::types::{pubsub, RichHeader, Log}; +use sync::{SyncState, Notification}; use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainRouteType, BlockId}; use ethereum_types::H256; use light::cache::Cache; @@ -50,6 +52,30 @@ pub struct EthPubSubClient { heads_subscribers: Arc>>, logs_subscribers: Arc>>, transactions_subscribers: Arc>>, + sync_subscribers: Arc>>, +} + +impl EthPubSubClient + where + C: 'static + Send + Sync +{ + /// adds a sync notification channel to the pubsub client + pub fn add_sync_notifier(&mut self, receiver: Notification, f: F) + where + F: 'static + Fn(SyncState) -> Option + Send + { + let handler = self.handler.clone(); + + self.handler.executor.spawn( + receiver.for_each(move |state| { + if let Some(status) = f(state) { + handler.notify_syncing(status); + return Ok(()) + } + Err(()) + }) + ) + } } impl EthPubSubClient { @@ -58,6 +84,7 @@ impl EthPubSubClient { let heads_subscribers = Arc::new(RwLock::new(Subscribers::default())); let logs_subscribers = Arc::new(RwLock::new(Subscribers::default())); let transactions_subscribers = Arc::new(RwLock::new(Subscribers::default())); + let sync_subscribers = Arc::new(RwLock::new(Subscribers::default())); EthPubSubClient { handler: Arc::new(ChainNotificationHandler { @@ -66,7 +93,9 @@ impl EthPubSubClient { heads_subscribers: heads_subscribers.clone(), logs_subscribers: logs_subscribers.clone(), transactions_subscribers: transactions_subscribers.clone(), + sync_subscribers: sync_subscribers.clone(), }), + sync_subscribers, heads_subscribers, logs_subscribers, transactions_subscribers, @@ -80,6 +109,7 @@ impl EthPubSubClient { *client.heads_subscribers.write() = Subscribers::new_test(); *client.logs_subscribers.write() = Subscribers::new_test(); *client.transactions_subscribers.write() = Subscribers::new_test(); + *client.sync_subscribers.write() = Subscribers::new_test(); client } @@ -121,6 +151,7 @@ pub struct ChainNotificationHandler { heads_subscribers: Arc>>, logs_subscribers: Arc>>, transactions_subscribers: Arc>>, + sync_subscribers: Arc>>, } impl ChainNotificationHandler { @@ -143,6 +174,12 @@ impl ChainNotificationHandler { } } + fn notify_syncing(&self, sync_status: pubsub::PubSubSyncStatus) { + for subscriber in self.sync_subscribers.read().values() { + Self::notify(&self.executor, subscriber, pubsub::Result::SyncState(sync_status.clone())); + } + } + fn notify_logs(&self, enacted: &[(H256, Ex)], logs: F) where F: Fn(EthFilter, &Ex) -> T, Ex: Send, @@ -274,6 +311,10 @@ impl EthPubSub for EthPubSubClient { self.heads_subscribers.write().push(subscriber); return; }, + (pubsub::Kind::Syncing, None) => { + self.sync_subscribers.write().push(subscriber); + return; + }, (pubsub::Kind::NewHeads, _) => { errors::invalid_params("newHeads", "Expected no parameters.") }, @@ -308,7 +349,8 @@ impl EthPubSub for EthPubSubClient { let res = self.heads_subscribers.write().remove(&id).is_some(); let res2 = self.logs_subscribers.write().remove(&id).is_some(); let res3 = self.transactions_subscribers.write().remove(&id).is_some(); + let res4 = self.sync_subscribers.write().remove(&id).is_some(); - Ok(res || res2 || res3) + Ok(res || res2 || res3 || res4) } } diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 27a703795..796b3f9f5 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -35,7 +35,6 @@ use types::ids::BlockId; use updater::{Service as UpdateService}; use version::version_data; -use v1::helpers::block_import::is_major_importing; use v1::helpers::{self, errors, fake_sign, ipfs, NetworkSettings, verify_signature}; use v1::helpers::external_signer::{SigningQueue, SignerService}; use v1::metadata::Metadata; @@ -445,8 +444,7 @@ impl Parity for ParityClient where _ => false, }; let is_not_syncing = - !is_warping && - !is_major_importing(Some(self.sync.status().state), self.client.queue_info()); + !is_warping && !self.sync.is_major_syncing(); if has_peers && is_not_syncing { Ok(()) diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 8b8afacdb..368dded68 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -46,6 +46,7 @@ pub use self::impls::*; pub use self::helpers::{NetworkSettings, block_import, dispatch}; pub use self::metadata::Metadata; pub use self::types::Origin; +pub use self::types::pubsub::PubSubSyncStatus; pub use self::extractors::{RpcExtractor, WsExtractor, WsStats, WsDispatcher}; /// Signer utilities diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 37c2f9355..d044fd564 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -19,8 +19,9 @@ use std::collections::BTreeMap; use ethereum_types::H256; use parking_lot::RwLock; -use sync::{SyncProvider, EthProtocolInfo, SyncStatus, SyncState, PeerInfo, TransactionStats}; use network::client_version::ClientVersion; +use futures::sync::mpsc; +use sync::{SyncProvider, EthProtocolInfo, SyncStatus, PeerInfo, TransactionStats, SyncState}; /// TestSyncProvider config. pub struct Config { @@ -34,6 +35,8 @@ pub struct Config { pub struct TestSyncProvider { /// Sync status. pub status: RwLock, + /// is major importing? + is_importing: RwLock, } impl TestSyncProvider { @@ -56,12 +59,14 @@ impl TestSyncProvider { snapshot_chunks_done: 0, last_imported_old_block_number: None, }), + is_importing: RwLock::new(false) } } /// Simulate importing blocks. pub fn increase_imported_block_number(&self, count: u64) { let mut status = self.status.write(); + *self.is_importing.write() = true; let current_number = status.last_imported_block_number.unwrap_or(0); status.last_imported_block_number = Some(current_number + count); } @@ -123,4 +128,17 @@ impl SyncProvider for TestSyncProvider { } ] } + + fn sync_notification(&self) -> mpsc::UnboundedReceiver { + unimplemented!() + } + + fn is_major_syncing(&self) -> bool { + match (self.status.read().state, *self.is_importing.read()) { + (SyncState::Idle, _) => false, + (SyncState::Blocks, _) => true, + (_, true) => true, + _ => false + } + } } diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index 6fd7394a4..21f413530 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -23,7 +23,6 @@ use jsonrpc_pubsub::Session; use std::time::Duration; use v1::{EthPubSub, EthPubSubClient, Metadata}; - use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, NewBlocks, ChainRoute, ChainRouteType}; use parity_runtime::Runtime; @@ -201,7 +200,7 @@ fn should_subscribe_to_pending_transactions() { } #[test] -fn should_return_unimplemented() { +fn eth_subscribe_syncing() { // given let el = Runtime::with_thread_count(1); let client = TestBlockChainClient::new(); @@ -216,7 +215,7 @@ fn should_return_unimplemented() { metadata.session = Some(Arc::new(Session::new(sender))); // Subscribe - let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#; let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#; assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/pubsub.rs b/rpc/src/v1/types/pubsub.rs index 1586b115c..1b3c8fdd8 100644 --- a/rpc/src/v1/types/pubsub.rs +++ b/rpc/src/v1/types/pubsub.rs @@ -31,6 +31,16 @@ pub enum Result { Log(Box), /// Transaction hash TransactionHash(H256), + /// SyncStatus + SyncState(PubSubSyncStatus) +} + +/// PubSbub sync status +#[derive(Debug, Serialize, Eq, PartialEq, Clone)] +#[serde(rename_all="camelCase")] +pub struct PubSubSyncStatus { + /// is_major_syncing? + pub syncing: bool, } impl Serialize for Result { @@ -41,6 +51,7 @@ impl Serialize for Result { Result::Header(ref header) => header.serialize(serializer), Result::Log(ref log) => log.serialize(serializer), Result::TransactionHash(ref hash) => hash.serialize(serializer), + Result::SyncState(ref sync) => sync.serialize(serializer), } } } diff --git a/secret-store/src/trusted_client.rs b/secret-store/src/trusted_client.rs index a20373ad0..6fb9626f6 100644 --- a/secret-store/src/trusted_client.rs +++ b/secret-store/src/trusted_client.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use call_contract::RegistryInfo; use common_types::transaction::{Transaction, SignedTransaction, Action}; use ethereum_types::Address; -use ethcore::client::{Client, BlockChainClient, ChainInfo, Nonce, BlockId}; +use ethcore::client::{Client, ChainInfo, Nonce, BlockId}; use ethcore::miner::{Miner, MinerService}; use sync::SyncProvider; use helpers::{get_confirmed_block_hash, REQUEST_CONFIRMATIONS_REQUIRED}; @@ -54,7 +54,7 @@ impl TrustedClient { self.client.upgrade() .and_then(|client| self.sync.upgrade().map(|sync| (client, sync))) .and_then(|(client, sync)| { - let is_synced = !sync.status().is_syncing(client.queue_info()); + let is_synced = !sync.is_major_syncing(); let is_trusted = client.chain_info().security_level().is_full(); match is_synced && is_trusted { true => Some(client), diff --git a/updater/src/updater.rs b/updater/src/updater.rs index 70adc9f3b..072d0584a 100644 --- a/updater/src/updater.rs +++ b/updater/src/updater.rs @@ -670,8 +670,8 @@ impl Updater self.poll(), + match self.sync.as_ref().and_then(Weak::upgrade) { + Some(ref s) if !s.is_major_syncing() => self.poll(), _ => {}, } }