Re-broadcast transactions to few random peers on each new block. (#4054)

* Introduce predictable randomness in tests

* Re-broadcasting to few peers
This commit is contained in:
Tomasz Drwięga 2017-01-06 10:38:49 +01:00 committed by Gav Wood
parent 20aaf17389
commit 1b93d79a90

View File

@ -100,8 +100,8 @@ use sync_io::SyncIo;
use time;
use super::SyncConfig;
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot, ChunkType};
use rand::{thread_rng, Rng};
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
@ -329,6 +329,17 @@ impl PeerInfo {
}
}
#[cfg(not(test))]
mod random {
use rand;
pub fn new() -> rand::ThreadRng { rand::thread_rng() }
}
#[cfg(test)]
mod random {
use rand::{self, SeedableRng};
pub fn new() -> rand::XorShiftRng { rand::XorShiftRng::from_seed([0, 1, 2, 3]) }
}
/// Blockchain sync handler.
/// See module documentation for more details.
pub struct ChainSync {
@ -1120,7 +1131,7 @@ impl ChainSync {
fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256, u8)> = self.peers.iter().filter_map(|(k, p)|
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect();
thread_rng().shuffle(&mut peers); //TODO: sort by rating
random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2));
trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len());
@ -1881,7 +1892,7 @@ impl ChainSync {
let mut count = (peers.len() as f64).powf(0.5).round() as usize;
count = min(count, MAX_PEERS_PROPAGATION);
count = max(count, MIN_PEERS_PROPAGATION);
::rand::thread_rng().shuffle(&mut peers);
random::new().shuffle(&mut peers);
peers.truncate(count);
peers
}
@ -1961,10 +1972,11 @@ impl ChainSync {
let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let block_number = io.chain().chain_info().best_block_number;
let mut random = random::new();
let lucky_peers = {
let stats = &mut self.transactions_stats;
self.peers.iter_mut()
.filter(|_| small || ::rand::random::<u32>() < fraction)
.filter(|_| small || random.next_u32() < fraction)
.take(MAX_PEERS_PROPAGATION)
.filter_map(|(peer_id, mut peer_info)| {
// Send all transactions
@ -2056,7 +2068,7 @@ impl ChainSync {
}
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
let queue_info = io.chain().queue_info();
if !self.status().is_syncing(queue_info) || !sealed.is_empty() {
trace!(target: "sync", "Propagating blocks, state={:?}", self.state);
@ -2067,6 +2079,21 @@ impl ChainSync {
trace!(target: "sync", "Bad blocks in the queue, restarting");
self.restart(io);
}
if !enacted.is_empty() {
// Select random peers to re-broadcast transactions to.
let mut random = random::new();
let len = self.peers.len();
let peers = random.gen_range(0, min(len, 3));
trace!(target: "sync", "Re-broadcasting transactions to {} random peers.", peers);
for _ in 0..peers {
let peer = random.gen_range(0, len);
self.peers.values_mut().nth(peer).map(|mut peer_info| {
peer_info.last_sent_transactions.clear()
});
}
}
}
/// Called when peer sends us new consensus packet