diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 671e2241e..0de0bdcaf 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -155,7 +155,9 @@ struct PeerInfo { /// Peer network id network_id: U256, /// Peer best block hash - latest: H256, + latest_hash: H256, + /// Peer best block number if known + latest_number: Option, /// Peer total difficulty difficulty: U256, /// Type of data currenty being requested from peer. @@ -282,7 +284,8 @@ impl ChainSync { protocol_version: try!(r.val_at(0)), network_id: try!(r.val_at(1)), difficulty: try!(r.val_at(2)), - latest: try!(r.val_at(3)), + latest_hash: try!(r.val_at(3)), + latest_number: None, genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), @@ -290,7 +293,7 @@ impl ChainSync { ask_time: 0f64, }; - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); if self.peers.contains_key(&peer_id) { warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); @@ -450,7 +453,8 @@ impl ChainSync { let mut unknown = false; { let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest = header.hash(); + peer.latest_hash = header.hash(); + peer.latest_number = Some(header.number()); } // TODO: Decompose block and add to self.headers and self.bodies instead if header.number == From::from(self.current_base_block() + 1) { @@ -516,7 +520,8 @@ impl ChainSync { if d > max_height { trace!(target: "sync", "New unknown block hash {:?}", h); let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest = h.clone(); + peer.latest_hash = h.clone(); + peer.latest_number = Some(d); max_height = d; } }, @@ -583,7 +588,7 @@ impl ChainSync { trace!(target: "sync", "Waiting for block queue"); return; } - (peer.latest.clone(), peer.difficulty.clone()) + (peer.latest_hash.clone(), peer.difficulty.clone()) }; let td = io.chain().chain_info().pending_total_difficulty; @@ -1117,25 +1122,28 @@ impl ChainSync { } /// returns peer ids that have less blocks than our chain - fn get_lagging_peers(&self, io: &SyncIo) -> Vec { + fn get_lagging_peers(&mut self, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { let chain = io.chain(); let chain_info = chain.chain_info(); let latest_hash = chain_info.best_block_hash; let latest_number = chain_info.best_block_number; - self.peers.iter().filter(|&(_, peer_info)| - match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) { + self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| + match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) { BlockStatus::InChain => { - let peer_number = HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest.clone())).unwrap()).number(); - peer_info.latest != latest_hash && latest_number > peer_number + if peer_info.latest_number.is_none() { + peer_info.latest_number = Some(HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest_hash.clone())).unwrap()).number()); + } + if peer_info.latest_hash != latest_hash && latest_number > peer_info.latest_number.unwrap() { + Some((id, peer_info.latest_number.unwrap())) + } else { None } }, - _ => false + _ => None }) - .map(|(peer_id, _)| peer_id) - .cloned().collect::>() + .collect::>() } /// propagades latest block to lagging peers - fn propagade_blocks(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { + fn propagade_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize { let updated_peers = { let lagging_peers = self.get_lagging_peers(io); @@ -1143,33 +1151,41 @@ impl ChainSync { let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let lucky_peers = match lagging_peers.len() { 0 ... MIN_PEERS_PROPAGATION => lagging_peers, - _ => lagging_peers.iter().filter(|_| ::rand::random::() < fraction).cloned().collect::>() + _ => lagging_peers.into_iter().filter(|_| ::rand::random::() < fraction).collect::>() }; // taking at max of MAX_PEERS_PROPAGATION - lucky_peers.iter().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).cloned().collect::>() + lucky_peers.iter().map(|&(id, _)| id.clone()).take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).collect::>() }; let mut sent = 0; for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); - self.peers.get_mut(&peer_id).unwrap().latest = local_best.clone(); + self.peers.get_mut(&peer_id).unwrap().latest_hash = local_best.clone(); + self.peers.get_mut(&peer_id).unwrap().latest_number = Some(best_number); sent = sent + 1; } sent } /// propagades new known hashes to all peers - fn propagade_new_hashes(&mut self, local_best: &H256, io: &mut SyncIo) -> usize { + fn propagade_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize { let updated_peers = self.get_lagging_peers(io); let mut sent = 0; - for peer_id in updated_peers { - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).unwrap().latest, &local_best) { + let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash(); + for (peer_id, peer_number) in updated_peers { + let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone(); + if best_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { + // If we think peer is too far behind just end one latest hash + peer_best = last_parent.clone(); + } + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &local_best) { Some(rlp) => { { let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest = local_best.clone(); + peer.latest_hash = local_best.clone(); + peer.latest_number = Some(best_number); } self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); 1 @@ -1189,8 +1205,8 @@ impl ChainSync { pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { let chain = io.chain().chain_info(); if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagade_blocks(&chain.best_block_hash, io); - let hashes = self.propagade_new_hashes(&chain.best_block_hash, io); + let blocks = self.propagade_blocks(&chain.best_block_hash, chain.best_block_number, io); + let hashes = self.propagade_new_hashes(&chain.best_block_hash, chain.best_block_number, io); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); } @@ -1322,7 +1338,8 @@ mod tests { protocol_version: 0, genesis: H256::zero(), network_id: U256::zero(), - latest: peer_latest_hash, + latest_hash: peer_latest_hash, + latest_number: None, difficulty: U256::zero(), asking: PeerAsking::Nothing, asking_blocks: Vec::::new(), @@ -1337,7 +1354,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.add_blocks(100, false); let mut queue = VecDeque::new(); - let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let io = TestIo::new(&mut client, &mut queue, None); let lagging_peers = sync.get_lagging_peers(&io); @@ -1369,9 +1386,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_new_hashes(&best_hash, &mut io); + let peer_count = sync.propagade_new_hashes(&best_hash, best_number, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1388,9 +1406,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagade_blocks(&best_hash, &mut io); + let peer_count = sync.propagade_blocks(&best_hash, best_number, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1493,9 +1512,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_new_hashes(&best_hash, &mut io); + sync.propagade_new_hashes(&best_hash, best_number, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); @@ -1511,9 +1531,10 @@ mod tests { let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); + let best_number = client.chain_info().best_block_number; let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagade_blocks(&best_hash, &mut io); + sync.propagade_blocks(&best_hash, best_number, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data));