diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6ec0884b5..3b608610d 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1097,7 +1097,7 @@ impl ChainSync { Ok(Some((RECEIPTS_PACKET, rlp_result))) } - fn return_rlp(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> + fn return_rlp(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult, FError : FnOnce(UtilError) -> String { @@ -1114,13 +1114,41 @@ impl ChainSync { } /// Dispatch incoming requests and responses - pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); + let result = match packet_id { + GET_BLOCK_BODIES_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_block_bodies, + |e| format!("Error sending block bodies: {:?}", e)), + GET_BLOCK_HEADERS_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_block_headers, + |e| format!("Error sending block headers: {:?}", e)), + + GET_RECEIPTS_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_receipts, + |e| format!("Error sending receipts: {:?}", e)), + + GET_NODE_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_node_data, + |e| format!("Error sending nodes: {:?}", e)), + + _ => { + sync.write().unwrap().on_packet(io, peer, packet_id, data); + Ok(()) + } + }; + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } + + pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); return; } + let rlp = UntrustedRlp::new(data); let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), @@ -1128,23 +1156,6 @@ impl ChainSync { BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), - - GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_block_bodies, - |e| format!("Error sending block bodies: {:?}", e)), - - GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_block_headers, - |e| format!("Error sending block headers: {:?}", e)), - - GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_receipts, - |e| format!("Error sending receipts: {:?}", e)), - - GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_node_data, - |e| format!("Error sending nodes: {:?}", e)), - _ => { debug!(target: "sync", "Unknown packet {}", packet_id); Ok(()) @@ -1424,7 +1435,7 @@ mod tests { fn return_receipts() { let mut client = TestBlockChainClient::new(); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(H256::new(), &client); + let sync = dummy_sync_with_peer(H256::new(), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let mut receipt_list = RlpStream::new_list(4); @@ -1445,7 +1456,7 @@ mod tests { assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); + ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.queue.len()); } @@ -1517,7 +1528,7 @@ mod tests { fn return_nodes() { let mut client = TestBlockChainClient::new(); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(H256::new(), &client); + let sync = dummy_sync_with_peer(H256::new(), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let mut node_list = RlpStream::new_list(3); @@ -1537,7 +1548,8 @@ mod tests { assert_eq!(34, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); + + ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 86f70ff0a..27961e867 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -169,7 +169,7 @@ impl NetworkProtocolHandler for EthSync { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); + ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 7c7d70dde..2f2bde171 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -47,7 +47,7 @@ fn status_after_sync() { net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); - let status = net.peer(0).sync.status(); + let status = net.peer(0).sync.read().unwrap().status(); assert_eq!(status.state, SyncState::Idle); } @@ -107,14 +107,14 @@ fn restart() { assert!(net.peer(0).chain.chain_info().best_block_number > 100); net.restart_peer(0); - let status = net.peer(0).sync.status(); + let status = net.peer(0).sync.read().unwrap().status(); assert_eq!(status.state, SyncState::ChainHead); } #[test] fn status_empty() { let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.status().state, SyncState::Idle); + assert_eq!(net.peer(0).sync.read().unwrap().status().state, SyncState::Idle); } #[test] diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 6496a43d5..831976048 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -78,7 +78,7 @@ pub struct TestPacket { pub struct TestPeer { pub chain: TestBlockChainClient, - pub sync: ChainSync, + pub sync: RwLock, pub queue: VecDeque, } @@ -97,7 +97,7 @@ impl TestNet { let chain = TestBlockChainClient::new(); let sync = ChainSync::new(SyncConfig::default(), &chain); net.peers.push(TestPeer { - sync: sync, + sync: RwLock::new(sync), chain: chain, queue: VecDeque::new(), }); @@ -118,7 +118,7 @@ impl TestNet { for client in 0..self.peers.len() { if peer != client { let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); + p.sync.write().unwrap().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); } } } @@ -129,22 +129,22 @@ impl TestNet { if let Some(packet) = self.peers[peer].queue.pop_front() { let mut p = self.peers.get_mut(packet.recipient).unwrap(); trace!("--- {} -> {} ---", peer, packet.recipient); - p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); + ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); trace!("----------------"); } let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + p.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); } } pub fn sync_step_peer(&mut self, peer_num: usize) { let mut peer = self.peer_mut(peer_num); - peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn restart_peer(&mut self, i: usize) { let peer = self.peer_mut(i); - peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.write().unwrap().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn sync(&mut self) -> u32 { @@ -173,6 +173,6 @@ impl TestNet { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); + peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); } }