Fix unstable peers and slowness in sync (#9967)

* Don't sync all peers after each response

* Update formating

* Fix tests: add `continue_sync` to `Sync_step`

* Update ethcore/sync/src/chain/mod.rs

Co-Authored-By: ngotchac <ngotchac@gmail.com>
This commit is contained in:
Nicolas Gotchac 2018-11-28 12:19:35 +01:00 committed by Afri Schoedon
parent 0e94ac0111
commit 5e9dc185a5
4 changed files with 35 additions and 34 deletions

View File

@ -420,9 +420,10 @@ impl SyncProvider for EthSync {
} }
const PEERS_TIMER: TimerToken = 0; const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1; const MAINTAIN_SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2; const CONTINUE_SYNC_TIMER: TimerToken = 2;
const PRIORITY_TIMER: TimerToken = 3; const TX_TIMER: TimerToken = 3;
const PRIORITY_TIMER: TimerToken = 4;
pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250); pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);
@ -441,7 +442,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) { fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer"); io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
@ -474,7 +476,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
match timer { match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io), PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io), MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io),
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io), PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
_ => warn!("Unknown timer {} triggered.", timer), _ => warn!("Unknown timer {} triggered.", timer),

View File

@ -97,8 +97,6 @@ impl SyncHandler {
sync.sync_peer(io, peer, false); sync.sync_peer(io, peer, false);
}, },
} }
// give tasks to other peers
sync.continue_sync(io);
} }
/// Called when peer sends us new consensus packet /// Called when peer sends us new consensus packet

View File

@ -866,32 +866,29 @@ impl ChainSync {
} }
/// Resume downloading /// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) { pub fn continue_sync(&mut self, io: &mut SyncIo) {
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
// Collect active peers that can sync // Collect active peers that can sync
let confirmed_peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| let mut peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)|
if peer.can_sync() { if peer.can_sync() && peer.asking == PeerAsking::Nothing && self.active_peers.contains(&peer_id) {
Some((*peer_id, peer.protocol_version)) Some((*peer_id, peer.protocol_version))
} else { } else {
None None
} }
).collect(); ).collect();
if peers.len() > 0 {
trace!( trace!(
target: "sync", target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total", "Syncing with peers: {} active, {} available, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len() self.active_peers.len(), peers.len(), self.peers.len()
); );
if self.state == SyncState::Waiting { random::new().shuffle(&mut peers); // TODO (#646): sort by rating
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();
random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version // prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
@ -899,6 +896,7 @@ impl ChainSync {
self.sync_peer(io, peer_id, false); self.sync_peer(io, peer_id, false);
} }
} }
}
if if
(self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) &&

View File

@ -286,10 +286,12 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
} }
fn sync_step(&self) { fn sync_step(&self) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
self.chain.flush(); self.chain.flush();
self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); self.sync.write().maintain_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); self.sync.write().continue_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io);
} }
fn restart_sync(&self) { fn restart_sync(&self) {