diff --git a/sync/src/chain.rs b/sync/src/chain.rs index db2ec339e..ef05ec9d1 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -396,7 +396,7 @@ impl ChainSync { return Ok(()); } let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count); + trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}", peer_id, item_count, self.state); if self.state == SyncState::Idle { trace!(target: "sync", "Ignored unexpected block headers"); self.continue_sync(io); @@ -655,6 +655,7 @@ impl ChainSync { fn continue_sync(&mut self, io: &mut SyncIo) { let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating + trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); for (p, _) in peers { if self.active_peers.contains(&p) { self.sync_peer(io, p, false); @@ -737,6 +738,8 @@ impl ChainSync { None => { // TODO: get hash by number from the block queue trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + // just wait for full sync to complete + self.pause_sync(); } } } @@ -1128,14 +1131,19 @@ impl ChainSync { }) } - pub fn maintain_peers(&self, io: &mut SyncIo) { + pub fn maintain_peers(&mut self, io: &mut SyncIo) { let tick = time::precise_time_s(); + let mut aborting = Vec::new(); for (peer_id, peer) in &self.peers { if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { trace!(target:"sync", "Timeout {}", peer_id); io.disconnect_peer(*peer_id); + aborting.push(*peer_id); } } + for p in aborting { + self.on_peer_aborting(io, p); + } } fn check_resume(&mut self, io: &mut SyncIo) { @@ -1313,7 +1321,7 @@ impl ChainSync { self.check_resume(io); } - /// called when block is imported to chain, updates transactions queue and propagates the blocks + /// called when block is imported to chain, updates transactions queue and propagates the blocks pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { if io.is_chain_queue_empty() { // Propagate latests blocks diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index f4c4c2a8d..9ee045c87 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -139,6 +139,7 @@ impl GenericConnection { }, Ok(Some(size)) if (buf.position() as usize) == send_size => { self.stats.inc_send(size); + trace!(target:"network", "{}: Wrote {} bytes", self.token, send_size); Ok(WriteStatus::Complete) }, Ok(Some(_)) => { panic!("Wrote past buffer");}, diff --git a/util/src/network/host.rs b/util/src/network/host.rs index e2cf32aa0..d859db1c3 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -250,7 +250,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::Disconnect(peer)); } - /// Sheck if the session is till active. + /// Check if the session is still active. pub fn is_expired(&self) -> bool { self.session.as_ref().map(|s| s.lock().unwrap().expired()).unwrap_or(false) } @@ -664,7 +664,6 @@ impl Host where Message: Send + Sync + Clone { Err(e) => { trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); match e { - UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) | UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => { if let Some(id) = s.id() { self.nodes.write().unwrap().mark_as_useless(id);