From a6204ab7bff67b64ada2e48741022094b6ce31c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 6 Jan 2017 12:16:13 +0100 Subject: [PATCH] Re-broadcast transactions to few random peers on each new block. (#4054) (#4061) * Introduce predictable randomness in tests * Re-broadcasting to few peers Conflicts: sync/src/chain.rs Former-commit-id: ac6a62768f9453af1ea25508b1efb58bf86d1a23 --- sync/src/chain.rs | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f87c3b104..71a097c88 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -101,8 +101,8 @@ use sync_io::SyncIo; use time; use super::SyncConfig; use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction}; +use rand::Rng; use snapshot::{Snapshot, ChunkType}; -use rand::{thread_rng, Rng}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; known_heap_size!(0, PeerInfo); @@ -326,6 +326,17 @@ impl PeerInfo { } } +#[cfg(not(test))] +mod random { + use rand; + pub fn new() -> rand::ThreadRng { rand::thread_rng() } +} +#[cfg(test)] +mod random { + use rand::{self, SeedableRng}; + pub fn new() -> rand::XorShiftRng { rand::XorShiftRng::from_seed([0, 1, 2, 3]) } +} + /// Blockchain sync handler. /// See module documentation for more details. pub struct ChainSync { @@ -1095,7 +1106,7 @@ impl ChainSync { fn continue_sync(&mut self, io: &mut SyncIo) { let mut peers: Vec<(PeerId, U256, u8)> = self.peers.iter().filter_map(|(k, p)| if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect(); - thread_rng().shuffle(&mut peers); //TODO: sort by rating + random::new().shuffle(&mut peers); //TODO: sort by rating // prefer peers with higher protocol version peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2)); trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len()); @@ -1843,7 +1854,7 @@ impl ChainSync { let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; count = min(count, MAX_PEERS_PROPAGATION); count = max(count, MIN_PEERS_PROPAGATION); - ::rand::thread_rng().shuffle(&mut peers); + random::new().shuffle(&mut peers); peers.truncate(count); peers } @@ -1916,8 +1927,9 @@ impl ChainSync { let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let small = self.peers.len() < MIN_PEERS_PROPAGATION; + let mut random = random::new(); let lucky_peers = self.peers.iter_mut() - .filter(|_| small || ::rand::random::() < fraction) + .filter(|_| small || random.next_u32() < fraction) .take(MAX_PEERS_PROPAGATION) .filter_map(|(peer_id, mut peer_info)| { // Send all transactions @@ -1985,7 +1997,7 @@ impl ChainSync { } /// called when block is imported to chain - propagates the blocks and updates transactions sent to peers - pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) { + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], enacted: &[H256], _retracted: &[H256], sealed: &[H256]) { let queue_info = io.chain().queue_info(); if !self.status().is_syncing(queue_info) || !sealed.is_empty() { trace!(target: "sync", "Propagating blocks, state={:?}", self.state); @@ -1995,6 +2007,21 @@ impl ChainSync { trace!(target: "sync", "Bad blocks in the queue, restarting"); self.restart(io); } + + if !enacted.is_empty() { + // Select random peers to re-broadcast transactions to. + let mut random = random::new(); + let len = self.peers.len(); + let peers = random.gen_range(0, min(len, 3)); + trace!(target: "sync", "Re-broadcasting transactions to {} random peers.", peers); + + for _ in 0..peers { + let peer = random.gen_range(0, len); + self.peers.values_mut().nth(peer).map(|mut peer_info| { + peer_info.last_sent_transactions.clear() + }); + } + } } }