diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f0c0347a9..f82162b79 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -162,6 +162,8 @@ struct PeerInfo { asking: PeerAsking, /// A set of block numbers being requested asking_blocks: Vec, + /// Holds requested header hash if currently requesting block header by hash + asking_hash: Option, /// Request timestamp ask_time: f64, } @@ -179,6 +181,8 @@ pub struct ChainSync { downloading_headers: HashSet, /// Set of block body numbers being downloaded downloading_bodies: HashSet, + /// Set of block headers being downloaded by hash + downloading_hashes: HashSet, /// Downloaded headers. headers: Vec<(BlockNumber, Vec
)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order /// Downloaded bodies @@ -195,6 +199,8 @@ pub struct ChainSync { syncing_difficulty: U256, /// True if common block for our and remote chain has been found have_common_block: bool, + /// Last propagated block number + last_send_block_number: BlockNumber, } type RlpResponseResult = Result, PacketDecodeError>; @@ -208,6 +214,7 @@ impl ChainSync { highest_block: None, downloading_headers: HashSet::new(), downloading_bodies: HashSet::new(), + downloading_hashes: HashSet::new(), headers: Vec::new(), bodies: Vec::new(), peers: HashMap::new(), @@ -216,6 +223,7 @@ impl ChainSync { last_imported_hash: None, syncing_difficulty: U256::from(0u64), have_common_block: false, + last_send_block_number: 0, } } @@ -248,6 +256,7 @@ impl ChainSync { self.bodies.clear(); for (_, ref mut p) in &mut self.peers { p.asking_blocks.clear(); + p.asking_hash = None; } self.header_ids.clear(); self.syncing_difficulty = From::from(0u64); @@ -277,11 +286,16 @@ impl ChainSync { genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), + asking_hash: None, ask_time: 0f64, }; trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + if self.peers.contains_key(&peer_id) { + warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); + return Ok(()); + } let chain_info = io.chain().chain_info(); if peer.genesis != chain_info.genesis_hash { io.disable_peer(peer_id); @@ -294,10 +308,7 @@ impl ChainSync { return Ok(()); } - let old = self.peers.insert(peer_id.clone(), peer); - if old.is_some() { - panic!("ChainSync: new peer already exists"); - } + self.peers.insert(peer_id.clone(), peer); info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); self.sync_peer(io, peer_id, false); Ok(()) @@ -437,6 +448,10 @@ impl ChainSync { trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; + { + let peer = self.peers.get_mut(&peer_id).unwrap(); + peer.latest = header.hash(); + } // TODO: Decompose block and add to self.headers and self.bodies instead if header.number == From::from(self.current_base_block() + 1) { match io.chain().import_block(block_rlp.as_raw().to_vec()) { @@ -466,13 +481,9 @@ impl ChainSync { trace!(target: "sync", "New block unknown {:?}", h); //TODO: handle too many unknown blocks let difficulty: U256 = try!(r.val_at(1)); - let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + let peer_difficulty = self.peers.get_mut(&peer_id).unwrap().difficulty; if difficulty > peer_difficulty { trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = header.hash(); - } self.sync_peer(io, peer_id, true); } } @@ -481,16 +492,19 @@ impl ChainSync { /// Handles NewHashes packet. Initiates headers download for any unknown hashes. fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + if self.peers.get_mut(&peer_id).unwrap().asking != PeerAsking::Nothing { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); - let mut max_height: U256 = From::from(0); + let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let mut max_height: BlockNumber = 0; for (rh, rd) in hashes { let h = try!(rh); let d = try!(rd); + if self.downloading_hashes.contains(&h) { + continue; + } match io.chain().block_status(BlockId::Hash(h.clone())) { BlockStatus::InChain => { trace!(target: "sync", "New block hash already in chain {:?}", h); @@ -499,9 +513,9 @@ impl ChainSync { trace!(target: "sync", "New hash block already queued {:?}", h); }, BlockStatus::Unknown => { - trace!(target: "sync", "New unknown block hash {:?}", h); if d > max_height { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + trace!(target: "sync", "New unknown block hash {:?}", h); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = h.clone(); max_height = d; } @@ -513,7 +527,7 @@ impl ChainSync { } } }; - if max_height != x!(0) { + if max_height != 0 { self.sync_peer(io, peer_id, true); } Ok(()) @@ -523,7 +537,7 @@ impl ChainSync { pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Disconnecting {}", peer); if self.peers.contains_key(&peer) { - info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); + info!(target: "sync", "Disconnected {}", peer); self.clear_peer_download(peer); self.peers.remove(&peer); self.continue_sync(io); @@ -561,7 +575,7 @@ impl ChainSync { /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { let (peer_latest, peer_difficulty) = { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { return; } @@ -581,6 +595,8 @@ impl ChainSync { self.state = SyncState::Blocks; } trace!(target: "sync", "Starting sync with better chain"); + self.peers.get_mut(&peer_id).unwrap().asking_hash = Some(peer_latest.clone()); + self.downloading_hashes.insert(peer_latest.clone()); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); } else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown { @@ -673,6 +689,8 @@ impl ChainSync { } } else { + // continue search for common block + self.downloading_headers.insert(start as BlockNumber); self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); } } @@ -680,7 +698,10 @@ impl ChainSync { /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: PeerId) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); + if let Some(hash) = peer.asking_hash.take() { + self.downloading_hashes.remove(&hash); + } for b in &peer.asking_blocks { self.downloading_headers.remove(&b); self.downloading_bodies.remove(&b); @@ -813,7 +834,7 @@ impl ChainSync { /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != asking { warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); } @@ -825,9 +846,9 @@ impl ChainSync { /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); + warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } } match sync.send(peer_id, packet_id, packet) { @@ -844,6 +865,14 @@ impl ChainSync { } } + /// Generic packet sender + fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + if let Err(e) = sync.send(peer_id, packet_id, packet) { + warn!(target:"sync", "Error sending packet: {:?}", e); + sync.disable_peer(peer_id); + self.on_peer_aborting(sync, peer_id); + } + } /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { Ok(()) @@ -1000,6 +1029,11 @@ impl ChainSync { /// Dispatch incoming requests and responses pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); + + if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { + warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); + return; + } let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), @@ -1060,7 +1094,7 @@ impl ChainSync { let mut rlp_stream = RlpStream::new_list(route.blocks.len()); for block_hash in route.blocks { let mut hash_rlp = RlpStream::new_list(2); - let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).expect("Mallformed block without a difficulty on the chain!"); + let difficulty = chain.block_total_difficulty(BlockId::Hash(block_hash.clone())).unwrap(); hash_rlp.append(&block_hash); hash_rlp.append(&difficulty); @@ -1077,7 +1111,7 @@ impl ChainSync { /// creates latest block rlp for the given client fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { let mut rlp_stream = RlpStream::new_list(2); - rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).expect("Creating latest block when there is none"), 1); + rlp_stream.append_raw(&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).unwrap(), 1); rlp_stream.append(&chain.chain_info().total_difficulty); rlp_stream.out() } @@ -1089,11 +1123,10 @@ impl ChainSync { let latest_hash = chain_info.best_block_hash; let latest_number = chain_info.best_block_number; self.peers.iter().filter(|&(_, peer_info)| - match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) - { + match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) { BlockStatus::InChain => { let peer_number = HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest.clone())).unwrap()).number(); - peer_info.latest != latest_hash && latest_number > peer_number && latest_number - peer_number < MAX_PEER_LAG_PROPAGATION + peer_info.latest != latest_hash && latest_number > peer_number }, _ => false }) @@ -1102,7 +1135,7 @@ impl ChainSync { } /// propagades latest block to lagging peers - fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize { + fn propagade_blocks(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { let updated_peers = { let lagging_peers = self.get_lagging_peers(io); @@ -1118,29 +1151,27 @@ impl ChainSync { }; let mut sent = 0; - let local_best = io.chain().chain_info().best_block_hash; for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); - self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_PACKET, rlp); - self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone(); + self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); + self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone(); sent = sent + 1; } sent } /// propagades new known hashes to all peers - fn propagade_new_hashes(&mut self, io: &mut SyncIo) -> usize { + fn propagade_new_hashes(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { let updated_peers = self.get_lagging_peers(io); let mut sent = 0; - let local_best = io.chain().chain_info().best_block_hash; for peer_id in updated_peers { - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) { + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) { Some(rlp) => { { - let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).unwrap(); peer.latest = local_best.clone(); } - self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp); + self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); 1 }, None => 0 @@ -1152,15 +1183,19 @@ impl ChainSync { /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { self.check_resume(io); - - let peers = self.propagade_new_hashes(io); - trace!(target: "sync", "Sent new hashes to peers: {:?}", peers); } /// should be called once chain has new block, triggers the latest block propagation pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { - let peers = self.propagade_blocks(io); - trace!(target: "sync", "Sent latest block to peers: {:?}", peers); + let chain = io.chain().chain_info(); + if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { + let blocks = self.propagade_blocks(&chain.best_block_hash, io); + let hashes = self.propagade_new_hashes(&chain.best_block_hash, io); + if blocks != 0 || hashes != 0 { + trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); + } + } + self.last_send_block_number = chain.best_block_number; } } @@ -1246,9 +1281,9 @@ mod tests { // the length of two rlp-encoded receipts assert_eq!(597, rlp_result.unwrap().1.out().len()); - let mut sync = ChainSync::new(); + let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); - sync.on_packet(&mut io, 1usize, super::GET_RECEIPTS_PACKET, &receipts_request); + sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.queue.len()); } @@ -1274,9 +1309,9 @@ mod tests { // the length of one rlp-encoded hashe assert_eq!(34, rlp_result.unwrap().1.out().len()); - let mut sync = ChainSync::new(); + let mut sync = dummy_sync_with_peer(H256::new()); io.sender = Some(2usize); - sync.on_packet(&mut io, 1usize, super::GET_NODE_DATA_PACKET, &node_request); + sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); } @@ -1291,6 +1326,7 @@ mod tests { difficulty: U256::zero(), asking: PeerAsking::Nothing, asking_blocks: Vec::::new(), + asking_hash: None, ask_time: 0f64, }); sync @@ -1332,9 +1368,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_new_hashes(&mut io); + let peer_count = sync.propagade_new_hashes(&best_hash, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1350,9 +1387,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_blocks(&mut io); + let peer_count = sync.propagade_blocks(&best_hash, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1454,9 +1492,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_new_hashes(&mut io); + sync.propagade_new_hashes(&best_hash, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); @@ -1471,9 +1510,10 @@ mod tests { client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let best_hash = client.chain_info().best_block_hash.clone(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_blocks(&mut io); + sync.propagade_blocks(&best_hash, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data)); diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index f560f4ca6..5ee5df831 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -109,7 +109,7 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(1000, false); + net.peer_mut(0).chain.add_blocks(100, false); net.peer_mut(1).chain.add_blocks(1, false); net.start(); @@ -122,18 +122,28 @@ fn status_packet() { #[test] fn propagade_hashes() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + let mut net = TestNet::new(6); + net.peer_mut(1).chain.add_blocks(10, false); net.sync(); net.peer_mut(0).chain.add_blocks(10, false); - net.sync_step_peer(0); + net.sync(); + net.trigger_block_verified(0); //first event just sets the marker + net.trigger_block_verified(0); - // 2 peers to sync - assert_eq!(2, net.peer(0).queue.len()); - // NEW_BLOCK_HASHES_PACKET - assert_eq!(0x01, net.peer(0).queue[0].packet_id); + // 5 peers to sync + assert_eq!(5, net.peer(0).queue.len()); + let mut hashes = 0; + let mut blocks = 0; + for i in 0..5 { + if net.peer(0).queue[i].packet_id == 0x1 { + hashes += 1; + } + if net.peer(0).queue[i].packet_id == 0x7 { + blocks += 1; + } + } + assert!(blocks + hashes == 5); } #[test] @@ -143,6 +153,7 @@ fn propagade_blocks() { net.sync(); net.peer_mut(0).chain.add_blocks(10, false); + net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); assert!(!net.peer(0).queue.is_empty()); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 2ad949642..05462be37 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -220,6 +220,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); }); //TODO: don't copy vector data + try!(self.io.update_registration(peer)); }, _ => warn!(target: "net", "Send: Peer is not connected yet") }