From 1b93d79a90dd9f13499dccbeeab6e98604f99159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 6 Jan 2017 10:38:49 +0100 Subject: [PATCH] Re-broadcast transactions to few random peers on each new block. (#4054) * Introduce predictable randomness in tests * Re-broadcasting to few peers --- sync/src/chain.rs | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 1a4dc4197..df69dbb08 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -100,8 +100,8 @@ use sync_io::SyncIo; use time; use super::SyncConfig; use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction}; +use rand::Rng; use snapshot::{Snapshot, ChunkType}; -use rand::{thread_rng, Rng}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; use transactions_stats::{TransactionsStats, Stats as TransactionStats}; @@ -329,6 +329,17 @@ impl PeerInfo { } } +#[cfg(not(test))] +mod random { + use rand; + pub fn new() -> rand::ThreadRng { rand::thread_rng() } +} +#[cfg(test)] +mod random { + use rand::{self, SeedableRng}; + pub fn new() -> rand::XorShiftRng { rand::XorShiftRng::from_seed([0, 1, 2, 3]) } +} + /// Blockchain sync handler. /// See module documentation for more details. pub struct ChainSync { @@ -1120,7 +1131,7 @@ impl ChainSync { fn continue_sync(&mut self, io: &mut SyncIo) { let mut peers: Vec<(PeerId, U256, u8)> = self.peers.iter().filter_map(|(k, p)| if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect(); - thread_rng().shuffle(&mut peers); //TODO: sort by rating + random::new().shuffle(&mut peers); //TODO: sort by rating // prefer peers with higher protocol version peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2)); trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len()); @@ -1881,7 +1892,7 @@ impl ChainSync { let mut count = (peers.len() as f64).powf(0.5).round() as usize; count = min(count, MAX_PEERS_PROPAGATION); count = max(count, MIN_PEERS_PROPAGATION); - ::rand::thread_rng().shuffle(&mut peers); + random::new().shuffle(&mut peers); peers.truncate(count); peers } @@ -1961,10 +1972,11 @@ impl ChainSync { let small = self.peers.len() < MIN_PEERS_PROPAGATION; let block_number = io.chain().chain_info().best_block_number; + let mut random = random::new(); let lucky_peers = { let stats = &mut self.transactions_stats; self.peers.iter_mut() - .filter(|_| small || ::rand::random::() < fraction) + .filter(|_| small || random.next_u32() < fraction) .take(MAX_PEERS_PROPAGATION) .filter_map(|(peer_id, mut peer_info)| { // Send all transactions @@ -2056,7 +2068,7 @@ impl ChainSync { } /// called when block is imported to chain - propagates the blocks and updates transactions sent to peers - pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) { + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) { let queue_info = io.chain().queue_info(); if !self.status().is_syncing(queue_info) || !sealed.is_empty() { trace!(target: "sync", "Propagating blocks, state={:?}", self.state); @@ -2067,6 +2079,21 @@ impl ChainSync { trace!(target: "sync", "Bad blocks in the queue, restarting"); self.restart(io); } + + if !enacted.is_empty() { + // Select random peers to re-broadcast transactions to. + let mut random = random::new(); + let len = self.peers.len(); + let peers = random.gen_range(0, min(len, 3)); + trace!(target: "sync", "Re-broadcasting transactions to {} random peers.", peers); + + for _ in 0..peers { + let peer = random.gen_range(0, len); + self.peers.values_mut().nth(peer).map(|mut peer_info| { + peer_info.last_sent_transactions.clear() + }); + } + } } /// Called when peer sends us new consensus packet