diff --git a/sync/src/chain.rs b/sync/src/chain.rs index b1fa0d2a9..e30a60595 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -249,15 +249,6 @@ enum PeerAsking { SnapshotData, } -/// Peer type semantic boolean. -#[derive(Clone)] -enum PeerStatus { - /// Have the same latest_hash as we. - Current, - /// Is lagging in blocks. - Lagging -} - #[derive(PartialEq, Eq, Debug, Clone, Copy)] /// Block downloader channel. enum BlockSet { @@ -1830,18 +1821,16 @@ impl ChainSync { ) } - - /// Returns peer ids that either have less blocks than our (Lagging) chain or are Current. - fn get_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo, peer_status: PeerStatus) -> Vec { + /// returns peer ids that have different blocks than our chain + fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec { let latest_hash = chain_info.best_block_hash; self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| match io.chain().block_status(BlockId::Hash(peer_info.latest_hash.clone())) { BlockStatus::InChain => { - match (peer_info.latest_hash == latest_hash, peer_status.clone()) { - (false, PeerStatus::Lagging) => Some(id), - (true, PeerStatus::Lagging) => None, - (false, PeerStatus::Current) => None, - (true, PeerStatus::Current) => Some(id), + if peer_info.latest_hash != latest_hash { + Some(id) + } else { + None } }, _ => None @@ -1849,11 +1838,10 @@ impl ChainSync { .collect::>() } - fn select_random_peers(&mut self, peers: &[PeerId]) -> Vec { - use rand::Rng; + fn select_random_peers(peers: &[PeerId]) -> Vec { // take sqrt(x) peers let mut peers = peers.to_vec(); - let mut count = (self.peers.len() as f64).powf(0.5).round() as usize; + let mut count = (peers.len() as f64).powf(0.5).round() as usize; count = min(count, MAX_PEERS_PROPAGATION); count = max(count, MIN_PEERS_PROPAGATION); ::rand::thread_rng().shuffle(&mut peers); @@ -1861,6 +1849,10 @@ impl ChainSync { peers } + fn get_consensus_peers(&self) -> Vec { + self.peers.iter().filter_map(|(id, p)| if p.protocol_version == PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect() + } + /// propagates latest block to a set of peers fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewBlocks to {:?}", peers); @@ -1988,10 +1980,10 @@ impl ChainSync { fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let mut peers = self.get_peers(&chain_info, io, PeerStatus::Lagging); + let mut peers = self.get_lagging_peers(&chain_info, io); if sealed.is_empty() { let hashes = self.propagate_new_hashes(&chain_info, io, &peers); - peers = self.select_random_peers(&peers); + peers = ChainSync::select_random_peers(&peers); let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); @@ -2008,13 +2000,12 @@ impl ChainSync { /// Distribute valid proposed blocks to subset of current peers. fn propagate_proposed_blocks(&mut self, io: &mut SyncIo, proposed: &[Bytes]) { - let chain_info = io.chain().chain_info(); - let mut peers = self.get_peers(&chain_info, io, PeerStatus::Current); - peers = self.select_random_peers(&peers); + let peers = self.get_consensus_peers(); + trace!(target: "sync", "Sending proposed blocks to {:?}", peers); for block in proposed { let rlp = ChainSync::create_block_rlp( block, - chain_info.total_difficulty + io.chain().chain_info().total_difficulty ); for peer_id in &peers { self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); @@ -2049,7 +2040,7 @@ impl ChainSync { /// Broadcast consensus message to peers. pub fn propagate_consensus_packet(&mut self, io: &mut SyncIo, packet: Bytes) { - let lucky_peers: Vec<_> = self.peers.iter().filter_map(|(id, p)| if p.protocol_version == PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect(); + let lucky_peers = ChainSync::select_random_peers(&self.get_consensus_peers()); trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); for peer_id in lucky_peers { self.send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); @@ -2069,7 +2060,7 @@ mod tests { use rlp::{Rlp, RlpStream, UntrustedRlp, View, Stream}; use super::*; use ::SyncConfig; - use super::{PeerInfo, PeerAsking, PeerStatus}; + use super::{PeerInfo, PeerAsking}; use ethcore::views::BlockView; use ethcore::header::*; use ethcore::client::*; @@ -2287,28 +2278,9 @@ mod tests { let ss = TestSnapshotService::new(); let io = TestIo::new(&mut client, &ss, &queue, None); - let lagging_peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); - let current_peers = sync.get_peers(&chain_info, &io, PeerStatus::Current); + let lagging_peers = sync.get_lagging_peers(&chain_info, &io); assert_eq!(1, lagging_peers.len()); - assert!(current_peers.is_empty()); - } - - #[test] - fn finds_current_peers() { - let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); - let queue = RwLock::new(VecDeque::new()); - let mut sync = dummy_sync_with_peer(client.block_hash(BlockId::Latest).unwrap(), &client); - let chain_info = client.chain_info(); - let ss = TestSnapshotService::new(); - let io = TestIo::new(&mut client, &ss, &queue, None); - - let current_peers = sync.get_peers(&chain_info, &io, PeerStatus::Current); - let lagging_peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); - - assert_eq!(1, current_peers.len()); - assert!(lagging_peers.is_empty()); } #[test] @@ -2338,7 +2310,7 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); + let peers = sync.get_lagging_peers(&chain_info, &io); let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers); // 1 message should be send @@ -2358,7 +2330,7 @@ mod tests { let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); + let peers = sync.get_lagging_peers(&chain_info, &io); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers); // 1 message should be send @@ -2379,7 +2351,7 @@ mod tests { let chain_info = client.chain_info(); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); + let peers = sync.get_lagging_peers(&chain_info, &io); let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers); // 1 message should be send @@ -2610,7 +2582,7 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); + let peers = sync.get_lagging_peers(&chain_info, &io); sync.propagate_new_hashes(&chain_info, &mut io, &peers); let data = &io.packets[0].data.clone(); @@ -2630,7 +2602,7 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging); + let peers = sync.get_lagging_peers(&chain_info, &io); sync.propagate_blocks(&chain_info, &mut io, &[], &peers); let data = &io.packets[0].data.clone();