From c66178e3f7c90b483f4f5a9fceeda0752f888235 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 26 Feb 2016 11:37:06 +0100 Subject: [PATCH] Fixed a race condition when a connecting peer disconnects immediately --- sync/src/chain.rs | 13 ++++++------- util/src/network/host.rs | 12 ++++++++---- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f76978bda..930518007 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -555,7 +555,10 @@ impl ChainSync { /// Called when a new peer is connected pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Connected {}", peer); - self.send_status(io, peer); + if let Err(e) = self.send_status(io) { + warn!(target:"sync", "Error sending status request: {:?}", e); + io.disable_peer(peer); + } } /// Resume downloading @@ -887,7 +890,7 @@ impl ChainSync { } /// Send Status message - fn send_status(&mut self, io: &mut SyncIo, peer_id: PeerId) { + fn send_status(&mut self, io: &mut SyncIo) -> Result<(), UtilError> { let mut packet = RlpStream::new_list(5); let chain = io.chain().chain_info(); packet.append(&(PROTOCOL_VERSION as u32)); @@ -895,11 +898,7 @@ impl ChainSync { packet.append(&chain.total_difficulty); packet.append(&chain.best_block_hash); packet.append(&chain.genesis_hash); - //TODO: handle timeout for status request - if let Err(e) = io.send(peer_id, STATUS_PACKET, packet.out()) { - warn!(target:"sync", "Error sending status request: {:?}", e); - io.disable_peer(peer_id); - } + io.respond(STATUS_PACKET, packet.out()) } /// Respond to GetBlockHeaders request diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 8dd9eb9cc..d50f2d86d 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -579,7 +579,7 @@ impl Host where Message: Send + Sync + Clone { } } if kill { - self.kill_connection(token, io, true); //TODO: mark connection as dead an check in kill_connection + self.kill_connection(token, io, true); return; } else if create_session { self.start_session(token, io); @@ -621,7 +621,7 @@ impl Host where Message: Send + Sync + Clone { } } if kill { - self.kill_connection(token, io, true); //TODO: mark connection as dead an check in kill_connection + self.kill_connection(token, io, true); } for p in ready_data { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); @@ -685,6 +685,7 @@ impl Host where Message: Send + Sync + Clone { fn kill_connection(&self, token: StreamToken, io: &IoContext>, remote: bool) { let mut to_disconnect: Vec = Vec::new(); let mut failure_id = None; + let mut deregister = false; match token { FIRST_HANDSHAKE ... LAST_HANDSHAKE => { let handshakes = self.handshakes.write().unwrap(); @@ -693,7 +694,7 @@ impl Host where Message: Send + Sync + Clone { if !handshake.expired() { handshake.set_expired(); failure_id = Some(handshake.id().clone()); - io.deregister_stream(token).expect("Error deregistering stream"); + deregister = true; } } }, @@ -711,7 +712,7 @@ impl Host where Message: Send + Sync + Clone { } s.set_expired(); failure_id = Some(s.id().clone()); - io.deregister_stream(token).expect("Error deregistering stream"); + deregister = true; } } }, @@ -726,6 +727,9 @@ impl Host where Message: Send + Sync + Clone { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); h.disconnected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token); } + if deregister { + io.deregister_stream(token).expect("Error deregistering stream"); + } } fn update_nodes(&self, io: &IoContext>, node_changes: TableUpdates) {