Check for peer registration

This commit is contained in:
arkpar 2016-02-12 14:20:18 +01:00
parent f74c5dc921
commit 34b465a125

View File

@ -449,7 +449,7 @@ impl ChainSync {
let header: BlockHeader = try!(header_rlp.as_val()); let header: BlockHeader = try!(header_rlp.as_val());
let mut unknown = false; let mut unknown = false;
{ {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest = header.hash(); peer.latest = header.hash();
} }
// TODO: Decompose block and add to self.headers and self.bodies instead // TODO: Decompose block and add to self.headers and self.bodies instead
@ -481,7 +481,7 @@ impl ChainSync {
trace!(target: "sync", "New block unknown {:?}", h); trace!(target: "sync", "New block unknown {:?}", h);
//TODO: handle too many unknown blocks //TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1)); let difficulty: U256 = try!(r.val_at(1));
let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; let peer_difficulty = self.peers.get_mut(&peer_id).unwrap().difficulty;
if difficulty > peer_difficulty { if difficulty > peer_difficulty {
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
self.sync_peer(io, peer_id, true); self.sync_peer(io, peer_id, true);
@ -492,7 +492,7 @@ impl ChainSync {
/// Handles NewHashes packet. Initiates headers download for any unknown hashes. /// Handles NewHashes packet. Initiates headers download for any unknown hashes.
fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { if self.peers.get_mut(&peer_id).unwrap().asking != PeerAsking::Nothing {
trace!(target: "sync", "Ignoring new hashes since we're already downloading."); trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
return Ok(()); return Ok(());
} }
@ -515,7 +515,7 @@ impl ChainSync {
BlockStatus::Unknown => { BlockStatus::Unknown => {
if d > max_height { if d > max_height {
trace!(target: "sync", "New unknown block hash {:?}", h); trace!(target: "sync", "New unknown block hash {:?}", h);
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest = h.clone(); peer.latest = h.clone();
max_height = d; max_height = d;
} }
@ -575,7 +575,7 @@ impl ChainSync {
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
let (peer_latest, peer_difficulty) = { let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing { if peer.asking != PeerAsking::Nothing {
return; return;
} }
@ -595,7 +595,7 @@ impl ChainSync {
self.state = SyncState::Blocks; self.state = SyncState::Blocks;
} }
trace!(target: "sync", "Starting sync with better chain"); trace!(target: "sync", "Starting sync with better chain");
self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking_hash = Some(peer_latest.clone()); self.peers.get_mut(&peer_id).unwrap().asking_hash = Some(peer_latest.clone());
self.downloading_hashes.insert(peer_latest.clone()); self.downloading_hashes.insert(peer_latest.clone());
self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false);
} }
@ -698,7 +698,7 @@ impl ChainSync {
/// Clear all blocks/headers marked as being downloaded by a peer. /// Clear all blocks/headers marked as being downloaded by a peer.
fn clear_peer_download(&mut self, peer_id: PeerId) { fn clear_peer_download(&mut self, peer_id: PeerId) {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
if let Some(hash) = peer.asking_hash.take() { if let Some(hash) = peer.asking_hash.take() {
self.downloading_hashes.remove(&hash); self.downloading_hashes.remove(&hash);
} }
@ -834,7 +834,7 @@ impl ChainSync {
/// Reset peer status after request is complete. /// Reset peer status after request is complete.
fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != asking { if peer.asking != asking {
warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking);
} }
@ -846,7 +846,7 @@ impl ChainSync {
/// Generic request sender /// Generic request sender
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
{ {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing { if peer.asking != PeerAsking::Nothing {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
} }
@ -1029,6 +1029,11 @@ impl ChainSync {
/// Dispatch incoming requests and responses /// Dispatch incoming requests and responses
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data); let rlp = UntrustedRlp::new(data);
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return;
}
let result = match packet_id { let result = match packet_id {
STATUS_PACKET => self.on_peer_status(io, peer, &rlp), STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp),
@ -1089,7 +1094,7 @@ impl ChainSync {
let mut rlp_stream = RlpStream::new_list(route.blocks.len()); let mut rlp_stream = RlpStream::new_list(route.blocks.len());
for block_hash in route.blocks { for block_hash in route.blocks {
let mut hash_rlp = RlpStream::new_list(2); let mut hash_rlp = RlpStream::new_list(2);
let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).expect("Mallformed block without a difficulty on the chain!"); let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).unwrap();
hash_rlp.append(&block_hash); hash_rlp.append(&block_hash);
hash_rlp.append(&difficulty); hash_rlp.append(&difficulty);
@ -1106,7 +1111,7 @@ impl ChainSync {
/// creates latest block rlp for the given client /// creates latest block rlp for the given client
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2); let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).expect("Creating latest block when there is none"), 1); rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).unwrap(), 1);
rlp_stream.append(&chain.chain_info().total_difficulty); rlp_stream.append(&chain.chain_info().total_difficulty);
rlp_stream.out() rlp_stream.out()
} }
@ -1149,7 +1154,7 @@ impl ChainSync {
for peer_id in updated_peers { for peer_id in updated_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain()); let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone(); self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone();
sent = sent + 1; sent = sent + 1;
} }
sent sent
@ -1160,10 +1165,10 @@ impl ChainSync {
let updated_peers = self.get_lagging_peers(io); let updated_peers = self.get_lagging_peers(io);
let mut sent = 0; let mut sent = 0;
for peer_id in updated_peers { for peer_id in updated_peers {
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) { sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) {
Some(rlp) => { Some(rlp) => {
{ {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest = local_best.clone(); peer.latest = local_best.clone();
} }
self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
@ -1276,9 +1281,9 @@ mod tests {
// the length of two rlp-encoded receipts // the length of two rlp-encoded receipts
assert_eq!(597, rlp_result.unwrap().1.out().len()); assert_eq!(597, rlp_result.unwrap().1.out().len());
let mut sync = ChainSync::new(); let mut sync = dummy_sync_with_peer(H256::new());
io.sender = Some(2usize); io.sender = Some(2usize);
sync.on_packet(&mut io, 1usize, super::GET_RECEIPTS_PACKET, &receipts_request); sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
} }
@ -1304,9 +1309,9 @@ mod tests {
// the length of one rlp-encoded hashe // the length of one rlp-encoded hashe
assert_eq!(34, rlp_result.unwrap().1.out().len()); assert_eq!(34, rlp_result.unwrap().1.out().len());
let mut sync = ChainSync::new(); let mut sync = dummy_sync_with_peer(H256::new());
io.sender = Some(2usize); io.sender = Some(2usize);
sync.on_packet(&mut io, 1usize, super::GET_NODE_DATA_PACKET, &node_request); sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
} }