diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e853cf4e3..b9d0ccddf 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -64,6 +64,7 @@ const MAX_HEADERS_TO_REQUEST: usize = 512; const MAX_BODIES_TO_REQUEST: usize = 256; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; +const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -136,7 +137,7 @@ pub struct SyncStatus { pub num_active_peers: usize, } -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] /// Peer data type requested enum PeerAsking { Nothing, @@ -144,6 +145,7 @@ enum PeerAsking { BlockBodies, } +#[derive(Clone)] /// Syncing peer information struct PeerInfo { /// eth protocol version @@ -162,6 +164,8 @@ struct PeerInfo { asking_blocks: Vec, /// Request timestamp ask_time: f64, + /// Latest block number + latest_number: BlockNumber } /// Blockchain sync handler. @@ -267,7 +271,7 @@ impl ChainSync { /// Called by peer to report status fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let peer = PeerInfo { + let mut peer = PeerInfo { protocol_version: try!(r.val_at(0)), network_id: try!(r.val_at(1)), difficulty: try!(r.val_at(2)), @@ -276,8 +280,13 @@ impl ChainSync { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), ask_time: 0f64, + latest_number: 0, }; + if io.chain().block_status(&peer.latest) == BlockStatus::InChain { + peer.latest_number = HeaderView::new(&io.chain().block_header(&peer.latest).unwrap()).number(); + } + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); let chain_info = io.chain().chain_info(); @@ -441,6 +450,8 @@ impl ChainSync { match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "New block already in chain {:?}", h); + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest_number = max(peer.latest_number, header_view.number()); }, Err(ImportError::AlreadyQueued) => { trace!(target: "sync", "New block already queued {:?}", h); @@ -471,6 +482,7 @@ impl ChainSync { { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); peer.latest = header_view.sha3(); + peer.latest_number = header_view.number(); } self.sync_peer(io, peer_id, true); } @@ -638,6 +650,7 @@ impl ChainSync { if start == 0 { self.have_common_block = true; //reached genesis self.last_imported_hash = Some(chain_info.genesis_hash); + self.last_imported_block = Some(0); } } if self.have_common_block { @@ -1032,10 +1045,6 @@ impl ChainSync { }) } - /// Maintain other peers. Send out any new blocks and transactions - pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { - } - pub fn maintain_peers(&self, io: &mut SyncIo) { let tick = time::precise_time_s(); for (peer_id, peer) in &self.peers { @@ -1070,41 +1079,39 @@ impl ChainSync { } } - fn query_peer_latest_blocks(&self) -> Vec<(usize, H256)> { - self.peers.iter().map(|peer| (peer.0.clone(), peer.1.latest.clone())).collect() - } - - fn get_lagging_peers(&self, io: &SyncIo) -> Vec<(usize, H256)> { + fn get_lagging_peers(&self, io: &SyncIo) -> Vec { let chain = io.chain(); let chain_info = chain.chain_info(); let latest_hash = chain_info.best_block_hash; - self.query_peer_latest_blocks().iter().filter(|peer| - match io.chain().block_status(&peer.1) + let latest_number = chain_info.best_block_number; + self.peers.iter().filter(|&(peer_id, peer_info)| + match io.chain().block_status(&peer_info.latest) { - BlockStatus::InChain => peer.1 != latest_hash, + BlockStatus::InChain => peer_info.latest != latest_hash && latest_number - peer_info.latest_number < MAX_PEER_LAG_PROPAGATION, _ => false - }).cloned().collect::>() + }) + .map(|(peer_id, peer_info)| peer_id) + .cloned().collect::>() } fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize { let updated_peers = { - let lagging_peers = self.get_lagging_peers(io); - let lucky_peers = match lagging_peers.len() { + // sqrt(x)/x scaled to max u32 + let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; + let mut lucky_peers = match lagging_peers.len() { 0 ... MIN_PEERS_PROPAGATION => lagging_peers, - _ => lagging_peers.iter().filter(|_| ::rand::random::() < 64u8).cloned().collect::>() + _ => lagging_peers.iter().filter(|_| ::rand::random::() < fraction).cloned().collect::>() }; - match lucky_peers.len() { - 0 ... MAX_PEERS_PROPAGATION => lucky_peers, - _ => lucky_peers.iter().take(MAX_PEERS_PROPAGATION).cloned().collect::>() - } + // taking at max of MAX_PEERS_PROPAGATION + lucky_peers.iter().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).cloned().collect::>() }; let mut sent = 0; - for (peer_id, peer_hash) in updated_peers { - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_hash, &io.chain().chain_info().best_block_hash) { + for peer_id in updated_peers { + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers[&peer_id].latest, &io.chain().chain_info().best_block_hash) { Some(rlp) => { self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp); 1 @@ -1124,6 +1131,16 @@ impl ChainSync { trace!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded); } } + + #[cfg(test)] + pub fn get_peer_latet(&self, peer_id: usize) -> H256 { + self.peers[&peer_id].latest.clone() + } + + #[cfg(test)] + pub fn get_peer_latest_number(&self, peer_id: usize) -> BlockNumber { + self.peers[&peer_id].latest_number + } } #[cfg(test)] @@ -1205,16 +1222,17 @@ mod tests { fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { let mut sync = ChainSync::new(); sync.peers.insert(0, - PeerInfo { - protocol_version: 0, - genesis: H256::zero(), - network_id: U256::zero(), - latest: peer_latest_hash, - difficulty: U256::zero(), - asking: PeerAsking::Nothing, - asking_blocks: Vec::::new(), - ask_time: 0f64, - }); + PeerInfo { + protocol_version: 0, + genesis: H256::zero(), + network_id: U256::zero(), + latest: peer_latest_hash, + latest_number: 90, + difficulty: U256::zero(), + asking: PeerAsking::Nothing, + asking_blocks: Vec::::new(), + ask_time: 0f64, + }); sync } @@ -1251,17 +1269,17 @@ mod tests { #[test] fn sends_packet_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(20, false); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); - let block_count = sync.propagade_blocks(&mut io); + let peer_count = sync.propagade_blocks(&mut io); // 1 message should be send assert_eq!(1, io.queue.len()); // 1 peer should be updated - assert_eq!(1, block_count); + assert_eq!(1, peer_count); // NEW_BLOCK_HASHES_PACKET assert_eq!(0x01, io.queue[0].packet_id); } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index a9aeb2e34..22c677aa0 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -126,10 +126,18 @@ fn propagade() { net.peer_mut(1).chain.add_blocks(1000, false); net.peer_mut(2).chain.add_blocks(1000, false); net.sync(); + + let status = net.peer(0).sync.status(); assert_eq!(status.state, SyncState::Idle); net.peer_mut(0).chain.add_blocks(10, false); + assert_eq!(1010, net.peer(0).chain.chain_info().best_block_number); + assert_eq!(1000, net.peer(1).chain.chain_info().best_block_number); + assert_eq!(1000, net.peer(2).chain.chain_info().best_block_number); + + assert_eq!(net.peer(0).sync.get_peer_latest_number(1), 1000); + net.sync_step_peer(0); // 2 peers to sync