diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 90e18d8e8..ae58c4513 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -405,6 +405,7 @@ impl ChainSync { } if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) { self.deactivate_peer(io, peer_id); //TODO: is this too harsh? + self.continue_sync(io); return Ok(()); } @@ -452,7 +453,12 @@ impl ChainSync { // Disable the peer for this syncing round if it gives invalid chain if !valid_response { - trace!(target: "sync", "{} Disabled for invalid headers response", peer_id); + trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id); + self.deactivate_peer(io, peer_id); + } + if headers.is_empty() { + // Peer does not have any new subchain heads, deactivate it nd try with another + trace!(target: "sync", "{} Deactivated for no data", peer_id); self.deactivate_peer(io, peer_id); } match self.state { diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index 90e3bc67d..179309a6f 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -128,7 +128,6 @@ impl Handshake { /// Readable IO handler. Drives the state change. pub fn readable(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { if !self.expired() { - io.clear_timer(self.connection.token).ok(); match self.state { HandshakeState::New => {} HandshakeState::ReadingAuth => { @@ -151,7 +150,9 @@ impl Handshake { try!(self.read_ack_eip8(host.secret(), &data)); }; }, - HandshakeState::StartSession => {}, + HandshakeState::StartSession => { + io.clear_timer(self.connection.token).ok(); + }, } } Ok(()) diff --git a/util/src/network/host.rs b/util/src/network/host.rs index b37538c9c..de6908200 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -523,11 +523,13 @@ impl Host where Message: Send + Sync + Clone { } let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() }; + let mut started: usize = 0; for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id)) .take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) { self.connect_peer(&id, io); + started += 1; } - debug!(target: "network", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count()); + debug!(target: "network", "Connecting peers: {} sessions, {} pending, {} started", self.session_count(), self.handshake_count(), started); } #[cfg_attr(feature="dev", allow(single_match))] @@ -650,6 +652,7 @@ impl Host where Message: Send + Sync + Clone { break; }, Ok(SessionData::Ready) => { + self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; @@ -667,7 +670,6 @@ impl Host where Message: Send + Sync + Clone { } } } - self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); for (p, _) in self.handlers.read().unwrap().iter() { if s.have_capability(p) { ready_data.push(p); @@ -828,6 +830,7 @@ impl IoHandler> for Host where Messa io.update_registration(DISCOVERY).expect("Error updating discovery registration"); }, NODE_TABLE => { + trace!(target: "network", "Refreshing node table"); self.nodes.write().unwrap().clear_useless(); }, _ => match self.timers.read().unwrap().get(&token).cloned() {