RPC: Implements eth_subscribe("syncing") (#10311)

* implement eth_subscibe syncing

* fix imports

* add is_major_syncing to SyncProvider

* add eth_subscribe(syncing) support for light clients

* tests

* fix TestSyncClient

* correct LightFetch impl

* added currentBlock, startingBlock, highestBlock to PubSubSyncStatus

* fixed tests

* fix PR grumbles

* improve code style

* is_syncing -> syncing

* code style

* use ethereum types
This commit is contained in:
Seun LanLege
2019-04-02 16:13:55 +01:00
committed by Talha Cross
parent 288d73789a
commit fba63de974
26 changed files with 289 additions and 100 deletions

View File

@@ -99,6 +99,8 @@ use std::cmp;
use std::time::{Duration, Instant};
use hash::keccak;
use heapsize::HeapSizeOf;
use futures::sync::mpsc as futures_mpsc;
use api::Notification;
use ethereum_types::{H256, U256};
use fastmap::{H256FastMap, H256FastSet};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
@@ -618,6 +620,8 @@ pub struct ChainSync {
private_tx_handler: Option<Arc<PrivateTxHandler>>,
/// Enable warp sync.
warp_sync: WarpSync,
status_sinks: Vec<futures_mpsc::UnboundedSender<SyncState>>
}
impl ChainSync {
@@ -649,6 +653,7 @@ impl ChainSync {
transactions_stats: TransactionsStats::default(),
private_tx_handler,
warp_sync: config.warp_sync,
status_sinks: Vec::new()
};
sync.update_targets(chain);
sync
@@ -707,6 +712,29 @@ impl ChainSync {
self.peers.clear();
}
/// returns the receiving end of a future::mpsc channel that can
/// be polled for changes to node's SyncState.
pub fn sync_notifications(&mut self) -> Notification<SyncState> {
let (sender, receiver) = futures_mpsc::unbounded();
self.status_sinks.push(sender);
receiver
}
/// notify all subscibers of a new SyncState
fn notify_sync_state(&mut self, state: SyncState) {
// remove any sender whose receiving end has been dropped
self.status_sinks.retain(|sender| {
sender.unbounded_send(state).is_ok()
});
}
/// sets a new SyncState
fn set_state(&mut self, state: SyncState) {
self.notify_sync_state(state);
self.state = state;
}
/// Reset sync. Clear all downloaded data but keep the queue.
/// Set sync state to the given state or to the initial state if `None` is provided.
fn reset(&mut self, io: &mut SyncIo, state: Option<SyncState>) {
@@ -721,7 +749,10 @@ impl ChainSync {
}
}
}
self.state = state.unwrap_or_else(|| Self::get_init_state(self.warp_sync, io.chain()));
let warp_sync = self.warp_sync;
self.set_state(state.unwrap_or_else(|| Self::get_init_state(warp_sync, io.chain())));
// Reactivate peers only if some progress has been made
// since the last sync round of if starting fresh.
self.active_peers = self.peers.keys().cloned().collect();
@@ -808,7 +839,7 @@ impl ChainSync {
}
} else if timeout && !self.warp_sync.is_warp_only() {
trace!(target: "sync", "No snapshots found, starting full sync");
self.state = SyncState::Idle;
self.set_state(SyncState::Idle);
self.continue_sync(io);
}
}
@@ -820,10 +851,10 @@ impl ChainSync {
SyncRequester::request_snapshot_manifest(self, io, *p);
}
}
self.state = SyncState::SnapshotManifest;
self.set_state(SyncState::SnapshotManifest);
trace!(target: "sync", "New snapshot sync with {:?}", peers);
} else {
self.state = SyncState::SnapshotData;
self.set_state(SyncState::SnapshotData);
trace!(target: "sync", "Resumed snapshot sync with {:?}", peers);
}
}
@@ -904,7 +935,7 @@ impl ChainSync {
/// Enter waiting state
fn pause_sync(&mut self) {
trace!(target: "sync", "Block queue full, pausing sync");
self.state = SyncState::Waiting;
self.set_state(SyncState::Waiting);
}
/// Find something to do for a peer. Called for a new peer or when a peer is done with its task.
@@ -955,7 +986,7 @@ impl ChainSync {
if let Some(request) = self.new_blocks.request_blocks(peer_id, io, num_active_peers) {
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::NewBlocks);
if self.state == SyncState::Idle {
self.state = SyncState::Blocks;
self.set_state(SyncState::Blocks);
}
return;
}
@@ -987,7 +1018,7 @@ impl ChainSync {
self.snapshot.initialize(io.snapshot_service());
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target: "sync", "Snapshot queue full, pausing sync");
self.state = SyncState::SnapshotWaiting;
self.set_state(SyncState::SnapshotWaiting);
return;
}
},
@@ -1171,12 +1202,12 @@ impl ChainSync {
fn check_resume(&mut self, io: &mut SyncIo) {
match self.state {
SyncState::Waiting if !io.chain().queue_info().is_full() => {
self.state = SyncState::Blocks;
self.set_state(SyncState::Blocks);
self.continue_sync(io);
},
SyncState::SnapshotData => match io.snapshot_service().status() {
RestorationStatus::Inactive | RestorationStatus::Failed => {
self.state = SyncState::SnapshotWaiting;
self.set_state(SyncState::SnapshotWaiting);
},
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (),
},
@@ -1192,13 +1223,13 @@ impl ChainSync {
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target:"sync", "Resuming snapshot sync");
self.state = SyncState::SnapshotData;
self.set_state(SyncState::SnapshotData);
self.continue_sync(io);
}
},
RestorationStatus::Failed => {
trace!(target: "sync", "Snapshot restoration aborted");
self.state = SyncState::WaitingPeers;
self.set_state(SyncState::WaitingPeers);
self.snapshot.clear();
self.continue_sync(io);
},
@@ -1421,7 +1452,8 @@ pub mod tests {
}
pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
let mut sync = ChainSync::new(SyncConfig::default(), client, None);
let mut sync = ChainSync::new(SyncConfig::default(), client, None,);
insert_dummy_peer(&mut sync, 0, peer_latest_hash);
sync
}