From 5e9dc185a5a703bca35515ab864dafeec7f54e7d Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Wed, 28 Nov 2018 12:19:35 +0100 Subject: [PATCH] Fix unstable peers and slowness in sync (#9967) * Don't sync all peers after each response * Update formating * Fix tests: add `continue_sync` to `Sync_step` * Update ethcore/sync/src/chain/mod.rs Co-Authored-By: ngotchac --- ethcore/sync/src/api.rs | 13 +++++---- ethcore/sync/src/chain/handler.rs | 2 -- ethcore/sync/src/chain/mod.rs | 46 +++++++++++++++---------------- ethcore/sync/src/tests/helpers.rs | 8 ++++-- 4 files changed, 35 insertions(+), 34 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 6fef887a0..7474d79b3 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -420,9 +420,10 @@ impl SyncProvider for EthSync { } const PEERS_TIMER: TimerToken = 0; -const SYNC_TIMER: TimerToken = 1; -const TX_TIMER: TimerToken = 2; -const PRIORITY_TIMER: TimerToken = 3; +const MAINTAIN_SYNC_TIMER: TimerToken = 1; +const CONTINUE_SYNC_TIMER: TimerToken = 2; +const TX_TIMER: TimerToken = 3; +const PRIORITY_TIMER: TimerToken = 4; pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250); @@ -441,7 +442,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); - io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer"); @@ -474,7 +476,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); match timer { PEERS_TIMER => self.sync.write().maintain_peers(&mut io), - SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io), TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), PRIORITY_TIMER => self.sync.process_priority_queue(&mut io), _ => warn!("Unknown timer {} triggered.", timer), diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 104a80320..e1518d4f5 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -97,8 +97,6 @@ impl SyncHandler { sync.sync_peer(io, peer, false); }, } - // give tasks to other peers - sync.continue_sync(io); } /// Called when peer sends us new consensus packet diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index cdedd5630..a01b25528 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -866,37 +866,35 @@ impl ChainSync { } /// Resume downloading - fn continue_sync(&mut self, io: &mut SyncIo) { - // Collect active peers that can sync - let confirmed_peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| - if peer.can_sync() { - Some((*peer_id, peer.protocol_version)) - } else { - None - } - ).collect(); - - trace!( - target: "sync", - "Syncing with peers: {} active, {} confirmed, {} total", - self.active_peers.len(), confirmed_peers.len(), self.peers.len() - ); - + pub fn continue_sync(&mut self, io: &mut SyncIo) { if self.state == SyncState::Waiting { trace!(target: "sync", "Waiting for the block queue"); } else if self.state == SyncState::SnapshotWaiting { trace!(target: "sync", "Waiting for the snapshot restoration"); } else { - let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)| - self.active_peers.contains(&peer_id) - ).map(|v| *v).collect(); + // Collect active peers that can sync + let mut peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| + if peer.can_sync() && peer.asking == PeerAsking::Nothing && self.active_peers.contains(&peer_id) { + Some((*peer_id, peer.protocol_version)) + } else { + None + } + ).collect(); - random::new().shuffle(&mut peers); //TODO: sort by rating - // prefer peers with higher protocol version - peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); + if peers.len() > 0 { + trace!( + target: "sync", + "Syncing with peers: {} active, {} available, {} total", + self.active_peers.len(), peers.len(), self.peers.len() + ); - for (peer_id, _) in peers { - self.sync_peer(io, peer_id, false); + random::new().shuffle(&mut peers); // TODO (#646): sort by rating + // prefer peers with higher protocol version + peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); + + for (peer_id, _) in peers { + self.sync_peer(io, peer_id, false); + } } } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index d83597e52..3eac91a0d 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -286,10 +286,12 @@ impl Peer for EthPeer { } fn sync_step(&self) { + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); self.chain.flush(); - self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); - self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); - self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + self.sync.write().maintain_peers(&mut io); + self.sync.write().maintain_sync(&mut io); + self.sync.write().continue_sync(&mut io); + self.sync.write().propagate_new_transactions(&mut io); } fn restart_sync(&self) {