From 34b465a125621c8d5c1bdec0e122524894168ba2 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 12 Feb 2016 14:20:18 +0100 Subject: [PATCH] Check for peer registration --- sync/src/chain.rs | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6717c0814..f82162b79 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -449,7 +449,7 @@ impl ChainSync { let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = header.hash(); } // TODO: Decompose block and add to self.headers and self.bodies instead @@ -481,7 +481,7 @@ impl ChainSync { trace!(target: "sync", "New block unknown {:?}", h); //TODO: handle too many unknown blocks let difficulty: U256 = try!(r.val_at(1)); - let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + let peer_difficulty = self.peers.get_mut(&peer_id).unwrap().difficulty; if difficulty > peer_difficulty { trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); self.sync_peer(io, peer_id, true); @@ -492,7 +492,7 @@ impl ChainSync { /// Handles NewHashes packet. Initiates headers download for any unknown hashes. fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + if self.peers.get_mut(&peer_id).unwrap().asking != PeerAsking::Nothing { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); return Ok(()); } @@ -515,7 +515,7 @@ impl ChainSync { BlockStatus::Unknown => { if d > max_height { trace!(target: "sync", "New unknown block hash {:?}", h); - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = h.clone(); max_height = d; } @@ -575,7 +575,7 @@ impl ChainSync { /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { let (peer_latest, peer_difficulty) = { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { return; } @@ -595,7 +595,7 @@ impl ChainSync { self.state = SyncState::Blocks; } trace!(target: "sync", "Starting sync with better chain"); - self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking_hash = Some(peer_latest.clone()); + self.peers.get_mut(&peer_id).unwrap().asking_hash = Some(peer_latest.clone()); self.downloading_hashes.insert(peer_latest.clone()); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); } @@ -698,7 +698,7 @@ impl ChainSync { /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: PeerId) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(hash) = peer.asking_hash.take() { self.downloading_hashes.remove(&hash); } @@ -834,7 +834,7 @@ impl ChainSync { /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != asking { warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); } @@ -846,7 +846,7 @@ impl ChainSync { /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } @@ -1029,6 +1029,11 @@ impl ChainSync { /// Dispatch incoming requests and responses pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); + + if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { + warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); + return; + } let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), @@ -1089,7 +1094,7 @@ impl ChainSync { let mut rlp_stream = RlpStream::new_list(route.blocks.len()); for block_hash in route.blocks { let mut hash_rlp = RlpStream::new_list(2); - let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).expect("Mallformed block without a difficulty on the chain!"); + let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).unwrap(); hash_rlp.append(&block_hash); hash_rlp.append(&difficulty); @@ -1106,7 +1111,7 @@ impl ChainSync { /// creates latest block rlp for the given client fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { let mut rlp_stream = RlpStream::new_list(2); - rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).expect("Creating latest block when there is none"), 1); + rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).unwrap(), 1); rlp_stream.append(&chain.chain_info().total_difficulty); rlp_stream.out() } @@ -1149,7 +1154,7 @@ impl ChainSync { for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); - self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone(); + self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone(); sent = sent + 1; } sent @@ -1160,10 +1165,10 @@ impl ChainSync { let updated_peers = self.get_lagging_peers(io); let mut sent = 0; for peer_id in updated_peers { - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) { + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) { Some(rlp) => { { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = local_best.clone(); } self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); @@ -1276,9 +1281,9 @@ mod tests { // the length of two rlp-encoded receipts assert_eq!(597, rlp_result.unwrap().1.out().len()); - let mut sync = ChainSync::new(); + let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); - sync.on_packet(&mut io, 1usize, super::GET_RECEIPTS_PACKET, &receipts_request); + sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.queue.len()); } @@ -1304,9 +1309,9 @@ mod tests { // the length of one rlp-encoded hashe assert_eq!(34, rlp_result.unwrap().1.out().len()); - let mut sync = ChainSync::new(); + let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); - sync.on_packet(&mut io, 1usize, super::GET_NODE_DATA_PACKET, &node_request); + sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); }