Send new block hashes to all peers (#1875)

This commit is contained in:
Arkadiy Paronyan 2016-08-08 17:18:10 +02:00 committed by Gav Wood
parent 6762447229
commit 2e5a6ea1ff
2 changed files with 39 additions and 30 deletions

View File

@ -1376,27 +1376,23 @@ impl ChainSync {
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> { fn select_random_lagging_peers(&mut self, peers: &[(PeerId, BlockNumber)]) -> Vec<(PeerId, BlockNumber)> {
use rand::Rng; use rand::Rng;
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
// take sqrt(x) peers // take sqrt(x) peers
let mut peers = peers.to_vec();
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; let mut count = (self.peers.len() as f64).powf(0.5).round() as usize;
count = min(count, MAX_PEERS_PROPAGATION); count = min(count, MAX_PEERS_PROPAGATION);
count = max(count, MIN_PEERS_PROPAGATION); count = max(count, MIN_PEERS_PROPAGATION);
::rand::thread_rng().shuffle(&mut lagging_peers); ::rand::thread_rng().shuffle(&mut peers);
lagging_peers.into_iter().take(count).collect::<Vec<_>>() peers.truncate(count);
peers
} }
/// propagates latest block to lagging peers /// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256]) -> usize { fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[(PeerId, BlockNumber)]) -> usize {
let lucky_peers: Vec<_> = if sealed.is_empty() { trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
self.select_lagging_peers(chain_info, io).iter().map(|&(id, _)| id).collect()
} else {
self.peers.keys().cloned().collect()
};
trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers);
let mut sent = 0; let mut sent = 0;
for peer_id in lucky_peers { for &(peer_id, _) in peers {
if sealed.is_empty() { if sealed.is_empty() {
let rlp = ChainSync::create_latest_block_rlp(io.chain()); let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
@ -1414,12 +1410,11 @@ impl ChainSync {
} }
/// propagates new known hashes to all peers /// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[(PeerId, BlockNumber)]) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io); trace!(target: "sync", "Sending NewHashes to {:?}", peers);
trace!(target: "sync", "Sending NewHashes to {:?}", lucky_peers);
let mut sent = 0; let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
for (peer_id, peer_number) in lucky_peers { for &(peer_id, peer_number) in peers {
let peer_best = if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber { let peer_best = if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber {
// If we think peer is too far behind just send one latest hash // If we think peer is too far behind just send one latest hash
last_parent.clone() last_parent.clone()
@ -1485,11 +1480,19 @@ impl ChainSync {
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let hashes = self.propagate_new_hashes(&chain_info, io); let mut peers = self.get_lagging_peers(&chain_info, io);
let blocks = self.propagate_blocks(&chain_info, io, sealed); if sealed.is_empty() {
if blocks != 0 || hashes != 0 { let hashes = self.propagate_new_hashes(&chain_info, io, &peers);
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); peers = self.select_random_lagging_peers(&peers);
} let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
} else {
self.propagate_blocks(&chain_info, io, sealed, &peers);
self.propagate_new_hashes(&chain_info, io, &peers);
trace!(target: "sync", "Sent sealed block to all peers");
};
} }
self.propagate_new_transactions(io); self.propagate_new_transactions(io);
self.last_sent_block_number = chain_info.best_block_number; self.last_sent_block_number = chain_info.best_block_number;
@ -1757,7 +1760,8 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io); let peers = sync.get_lagging_peers(&chain_info, &mut io);
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
@ -1775,7 +1779,8 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[]); let peers = sync.get_lagging_peers(&chain_info, &mut io);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
@ -1794,7 +1799,8 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()]); let peers = sync.get_lagging_peers(&chain_info, &mut io);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
@ -1900,7 +1906,8 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagate_new_hashes(&chain_info, &mut io); let peers = sync.get_lagging_peers(&chain_info, &mut io);
sync.propagate_new_hashes(&chain_info, &mut io, &peers);
let data = &io.queue[0].data.clone(); let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(data)); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(data));
@ -1918,7 +1925,8 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagate_blocks(&chain_info, &mut io, &[]); let peers = sync.get_lagging_peers(&chain_info, &mut io);
sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
let data = &io.queue[0].data.clone(); let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data)); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data));

View File

@ -161,11 +161,11 @@ fn propagate_hashes() {
net.trigger_chain_new_blocks(0); //first event just sets the marker net.trigger_chain_new_blocks(0); //first event just sets the marker
net.trigger_chain_new_blocks(0); net.trigger_chain_new_blocks(0);
// 5 peers to sync // 5 peers with NewHahses, 4 with blocks
assert_eq!(5, net.peer(0).queue.len()); assert_eq!(9, net.peer(0).queue.len());
let mut hashes = 0; let mut hashes = 0;
let mut blocks = 0; let mut blocks = 0;
for i in 0..5 { for i in 0..net.peer(0).queue.len() {
if net.peer(0).queue[i].packet_id == 0x1 { if net.peer(0).queue[i].packet_id == 0x1 {
hashes += 1; hashes += 1;
} }
@ -173,7 +173,8 @@ fn propagate_hashes() {
blocks += 1; blocks += 1;
} }
} }
assert!(blocks + hashes == 5); assert_eq!(blocks, 4);
assert_eq!(hashes, 5);
} }
#[test] #[test]