propagate proposal to all peers

This commit is contained in:
keorn 2016-12-12 13:00:51 +01:00
parent 3c5d5856d2
commit 8ed89bb74f

View File

@ -249,15 +249,6 @@ enum PeerAsking {
SnapshotData, SnapshotData,
} }
/// Peer type semantic boolean.
#[derive(Clone)]
enum PeerStatus {
/// Have the same latest_hash as we.
Current,
/// Is lagging in blocks.
Lagging
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)] #[derive(PartialEq, Eq, Debug, Clone, Copy)]
/// Block downloader channel. /// Block downloader channel.
enum BlockSet { enum BlockSet {
@ -1830,18 +1821,16 @@ impl ChainSync {
) )
} }
/// returns peer ids that have different blocks than our chain
/// Returns peer ids that either have less blocks than our (Lagging) chain or are Current. fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<PeerId> {
fn get_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo, peer_status: PeerStatus) -> Vec<PeerId> {
let latest_hash = chain_info.best_block_hash; let latest_hash = chain_info.best_block_hash;
self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)|
match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) { match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) {
BlockStatus::InChain => { BlockStatus::InChain => {
match (peer_info.latest_hash == latest_hash, peer_status.clone()) { if peer_info.latest_hash != latest_hash {
(false, PeerStatus::Lagging) => Some(id), Some(id)
(true, PeerStatus::Lagging) => None, } else {
(false, PeerStatus::Current) => None, None
(true, PeerStatus::Current) => Some(id),
} }
}, },
_ => None _ => None
@ -1849,11 +1838,10 @@ impl ChainSync {
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
fn select_random_peers(&mut self, peers: &[PeerId]) -> Vec<PeerId> { fn select_random_peers(peers: &[PeerId]) -> Vec<PeerId> {
use rand::Rng;
// take sqrt(x) peers // take sqrt(x) peers
let mut peers = peers.to_vec(); let mut peers = peers.to_vec();
let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; let mut count = (peers.len() as f64).powf(0.5).round() as usize;
count = min(count, MAX_PEERS_PROPAGATION); count = min(count, MAX_PEERS_PROPAGATION);
count = max(count, MIN_PEERS_PROPAGATION); count = max(count, MIN_PEERS_PROPAGATION);
::rand::thread_rng().shuffle(&mut peers); ::rand::thread_rng().shuffle(&mut peers);
@ -1861,6 +1849,10 @@ impl ChainSync {
peers peers
} }
fn get_consensus_peers(&self) -> Vec<PeerId> {
self.peers.iter().filter_map(|(id, p)| if p.protocol_version == PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect()
}
/// propagates latest block to a set of peers /// propagates latest block to a set of peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize { fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
trace!(target: "sync", "Sending NewBlocks to {:?}", peers); trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
@ -1988,10 +1980,10 @@ impl ChainSync {
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let mut peers = self.get_peers(&chain_info, io, PeerStatus::Lagging); let mut peers = self.get_lagging_peers(&chain_info, io);
if sealed.is_empty() { if sealed.is_empty() {
let hashes = self.propagate_new_hashes(&chain_info, io, &peers); let hashes = self.propagate_new_hashes(&chain_info, io, &peers);
peers = self.select_random_peers(&peers); peers = ChainSync::select_random_peers(&peers);
let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers); let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers);
if blocks != 0 || hashes != 0 { if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
@ -2008,13 +2000,12 @@ impl ChainSync {
/// Distribute valid proposed blocks to subset of current peers. /// Distribute valid proposed blocks to subset of current peers.
fn propagate_proposed_blocks(&mut self, io: &mut SyncIo, proposed: &[Bytes]) { fn propagate_proposed_blocks(&mut self, io: &mut SyncIo, proposed: &[Bytes]) {
let chain_info = io.chain().chain_info(); let peers = self.get_consensus_peers();
let mut peers = self.get_peers(&chain_info, io, PeerStatus::Current); trace!(target: "sync", "Sending proposed blocks to {:?}", peers);
peers = self.select_random_peers(&peers);
for block in proposed { for block in proposed {
let rlp = ChainSync::create_block_rlp( let rlp = ChainSync::create_block_rlp(
block, block,
chain_info.total_difficulty io.chain().chain_info().total_difficulty
); );
for peer_id in &peers { for peer_id in &peers {
self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
@ -2049,7 +2040,7 @@ impl ChainSync {
/// Broadcast consensus message to peers. /// Broadcast consensus message to peers.
pub fn propagate_consensus_packet(&mut self, io: &mut SyncIo, packet: Bytes) { pub fn propagate_consensus_packet(&mut self, io: &mut SyncIo, packet: Bytes) {
let lucky_peers: Vec<_> = self.peers.iter().filter_map(|(id, p)| if p.protocol_version == PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect(); let lucky_peers = ChainSync::select_random_peers(&self.get_consensus_peers());
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
for peer_id in lucky_peers { for peer_id in lucky_peers {
self.send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); self.send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
@ -2069,7 +2060,7 @@ mod tests {
use rlp::{Rlp, RlpStream, UntrustedRlp, View, Stream}; use rlp::{Rlp, RlpStream, UntrustedRlp, View, Stream};
use super::*; use super::*;
use ::SyncConfig; use ::SyncConfig;
use super::{PeerInfo, PeerAsking, PeerStatus}; use super::{PeerInfo, PeerAsking};
use ethcore::views::BlockView; use ethcore::views::BlockView;
use ethcore::header::*; use ethcore::header::*;
use ethcore::client::*; use ethcore::client::*;
@ -2287,28 +2278,9 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None); let io = TestIo::new(&mut client, &ss, &queue, None);
let lagging_peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); let lagging_peers = sync.get_lagging_peers(&chain_info, &io);
let current_peers = sync.get_peers(&chain_info, &io, PeerStatus::Current);
assert_eq!(1, lagging_peers.len()); assert_eq!(1, lagging_peers.len());
assert!(current_peers.is_empty());
}
#[test]
fn finds_current_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
let queue = RwLock::new(VecDeque::new());
let mut sync = dummy_sync_with_peer(client.block_hash(BlockId::Latest).unwrap(), &client);
let chain_info = client.chain_info();
let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None);
let current_peers = sync.get_peers(&chain_info, &io, PeerStatus::Current);
let lagging_peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
assert_eq!(1, current_peers.len());
assert!(lagging_peers.is_empty());
} }
#[test] #[test]
@ -2338,7 +2310,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); let peers = sync.get_lagging_peers(&chain_info, &io);
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers); let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
// 1 message should be send // 1 message should be send
@ -2358,7 +2330,7 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); let peers = sync.get_lagging_peers(&chain_info, &io);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
// 1 message should be send // 1 message should be send
@ -2379,7 +2351,7 @@ mod tests {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); let peers = sync.get_lagging_peers(&chain_info, &io);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
// 1 message should be send // 1 message should be send
@ -2610,7 +2582,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); let peers = sync.get_lagging_peers(&chain_info, &io);
sync.propagate_new_hashes(&chain_info, &mut io, &peers); sync.propagate_new_hashes(&chain_info, &mut io, &peers);
let data = &io.packets[0].data.clone(); let data = &io.packets[0].data.clone();
@ -2630,7 +2602,7 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); let peers = sync.get_lagging_peers(&chain_info, &io);
sync.propagate_blocks(&chain_info, &mut io, &[], &peers); sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
let data = &io.packets[0].data.clone(); let data = &io.packets[0].data.clone();