From 2e5a6ea1ffbc8e02eeb0c2569a8d421ec0eb0fc3 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Mon, 8 Aug 2016 17:18:10 +0200 Subject: [PATCH] Send new block hashes to all peers (#1875) --- sync/src/chain.rs | 60 +++++++++++++++++++++++------------------ sync/src/tests/chain.rs | 9 ++++--- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index b7212a116..71d27127b 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1376,27 +1376,23 @@ impl ChainSync { .collect::>() } - fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> { + fn select_random_lagging_peers(&mut self, peers: &[(PeerId, BlockNumber)]) -> Vec<(PeerId, BlockNumber)> { use rand::Rng; - let mut lagging_peers = self.get_lagging_peers(chain_info, io); // take sqrt(x) peers + let mut peers = peers.to_vec(); let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; count = min(count, MAX_PEERS_PROPAGATION); count = max(count, MIN_PEERS_PROPAGATION); - ::rand::thread_rng().shuffle(&mut lagging_peers); - lagging_peers.into_iter().take(count).collect::>() + ::rand::thread_rng().shuffle(&mut peers); + peers.truncate(count); + peers } /// propagates latest block to lagging peers - fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256]) -> usize { - let lucky_peers: Vec<_> = if sealed.is_empty() { - self.select_lagging_peers(chain_info, io).iter().map(|&(id, _)| id).collect() - } else { - self.peers.keys().cloned().collect() - }; - trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers); + fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[(PeerId, BlockNumber)]) -> usize { + trace!(target: "sync", "Sending NewBlocks to {:?}", peers); let mut sent = 0; - for peer_id in lucky_peers { + for &(peer_id, _) in peers { if sealed.is_empty() { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); @@ -1414,12 +1410,11 @@ impl ChainSync { } /// propagates new known hashes to all peers - fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { - let lucky_peers = self.select_lagging_peers(chain_info, io); - trace!(target: "sync", "Sending NewHashes to {:?}", lucky_peers); + fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[(PeerId, BlockNumber)]) -> usize { + trace!(target: "sync", "Sending NewHashes to {:?}", peers); let mut sent = 0; let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); - for (peer_id, peer_number) in lucky_peers { + for &(peer_id, peer_number) in peers { let peer_best = if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber { // If we think peer is too far behind just send one latest hash last_parent.clone() @@ -1485,11 +1480,19 @@ impl ChainSync { fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let hashes = self.propagate_new_hashes(&chain_info, io); - let blocks = self.propagate_blocks(&chain_info, io, sealed); - if blocks != 0 || hashes != 0 { - trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); - } + let mut peers = self.get_lagging_peers(&chain_info, io); + if sealed.is_empty() { + let hashes = self.propagate_new_hashes(&chain_info, io, &peers); + peers = self.select_random_lagging_peers(&peers); + let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers); + if blocks != 0 || hashes != 0 { + trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); + } + } else { + self.propagate_blocks(&chain_info, io, sealed, &peers); + self.propagate_new_hashes(&chain_info, io, &peers); + trace!(target: "sync", "Sent sealed block to all peers"); + }; } self.propagate_new_transactions(io); self.last_sent_block_number = chain_info.best_block_number; @@ -1757,7 +1760,8 @@ mod tests { let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagate_new_hashes(&chain_info, &mut io); + let peers = sync.get_lagging_peers(&chain_info, &mut io); + let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1775,7 +1779,8 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[]); + let peers = sync.get_lagging_peers(&chain_info, &mut io); + let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1794,7 +1799,8 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()]); + let peers = sync.get_lagging_peers(&chain_info, &mut io); + let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1900,7 +1906,8 @@ mod tests { let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagate_new_hashes(&chain_info, &mut io); + let peers = sync.get_lagging_peers(&chain_info, &mut io); + sync.propagate_new_hashes(&chain_info, &mut io, &peers); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(data)); @@ -1918,7 +1925,8 @@ mod tests { let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagate_blocks(&chain_info, &mut io, &[]); + let peers = sync.get_lagging_peers(&chain_info, &mut io); + sync.propagate_blocks(&chain_info, &mut io, &[], &peers); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data)); diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 2a84b0f99..94dcc2a9d 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -161,11 +161,11 @@ fn propagate_hashes() { net.trigger_chain_new_blocks(0); //first event just sets the marker net.trigger_chain_new_blocks(0); - // 5 peers to sync - assert_eq!(5, net.peer(0).queue.len()); + // 5 peers with NewHahses, 4 with blocks + assert_eq!(9, net.peer(0).queue.len()); let mut hashes = 0; let mut blocks = 0; - for i in 0..5 { + for i in 0..net.peer(0).queue.len() { if net.peer(0).queue[i].packet_id == 0x1 { hashes += 1; } @@ -173,7 +173,8 @@ fn propagate_hashes() { blocks += 1; } } - assert!(blocks + hashes == 5); + assert_eq!(blocks, 4); + assert_eq!(hashes, 5); } #[test]