From 5bddcd8003fde9b7829653b7c6ac5d969f868b4b Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 16 Nov 2016 19:34:12 +0100 Subject: [PATCH] Sync bandwidth optimization --- sync/src/block_sync.rs | 46 +++++++++++++++++++++++++++++------------- sync/src/blocks.rs | 7 ++++++- sync/src/chain.rs | 38 +++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 27 deletions(-) diff --git a/sync/src/block_sync.rs b/sync/src/block_sync.rs index 277067537..7c3cbf2d7 100644 --- a/sync/src/block_sync.rs +++ b/sync/src/block_sync.rs @@ -33,6 +33,7 @@ const MAX_BODIES_TO_REQUEST: usize = 64; const MAX_RECEPITS_TO_REQUEST: usize = 128; const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 32; +const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state @@ -62,6 +63,14 @@ pub enum BlockRequest { }, } +/// Indicates sync action +pub enum DownloadAction { + /// Do nothing + None, + /// Reset downloads for all peers + Reset +} + #[derive(Eq, PartialEq, Debug)] pub enum BlockDownloaderImportError { /// Imported data is rejected as invalid. @@ -175,11 +184,11 @@ impl BlockDownloader { } /// Add new block headers. - pub fn import_headers(&mut self, io: &mut SyncIo, r: &UntrustedRlp, expected_hash: Option) -> Result<(), BlockDownloaderImportError> { + pub fn import_headers(&mut self, io: &mut SyncIo, r: &UntrustedRlp, expected_hash: Option) -> Result { let item_count = r.item_count(); if self.state == State::Idle { trace!(target: "sync", "Ignored unexpected block headers"); - return Ok(()) + return Ok(DownloadAction::None) } if item_count == 0 && (self.state == State::Blocks) { return Err(BlockDownloaderImportError::Invalid); @@ -188,6 +197,7 @@ impl BlockDownloader { let mut headers = Vec::new(); let mut hashes = Vec::new(); let mut valid_response = item_count == 0; //empty response is valid + let mut any_known = false; for i in 0..item_count { let info: BlockHeader = try!(r.val_at(i).map_err(|e| { trace!(target: "sync", "Error decoding block header RLP: {:?}", e); @@ -200,6 +210,7 @@ impl BlockDownloader { valid_response = expected == info.hash() } } + any_known = any_known || self.blocks.contains_head(&info.hash()); if self.blocks.contains(&info.hash()) { trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); continue; @@ -245,17 +256,22 @@ impl BlockDownloader { trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; + return Ok(DownloadAction::Reset); } }, State::Blocks => { let count = headers.len(); + // At least one of the heades must advance the subchain. Otherwise they are all useless. + if !any_known { + return Err(BlockDownloaderImportError::Useless); + } self.blocks.insert_headers(headers); trace!(target: "sync", "Inserted {} headers", count); }, _ => trace!(target: "sync", "Unexpected headers({})", headers.len()), } - Ok(()) + Ok(DownloadAction::None) } /// Called by peer once it has new block bodies @@ -342,22 +358,24 @@ impl BlockDownloader { } /// Find some headers or blocks to download for a peer. - pub fn request_blocks(&mut self, io: &mut SyncIo) -> Option { + pub fn request_blocks(&mut self, io: &mut SyncIo, num_active_peers: usize) -> Option { match self.state { State::Idle => { self.start_sync_round(io); - return self.request_blocks(io); + return self.request_blocks(io, num_active_peers); }, State::ChainHead => { - // Request subchain headers - trace!(target: "sync", "Starting sync with better chain"); - // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that - // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains - return Some(BlockRequest::Headers { - start: self.last_imported_hash.clone(), - count: SUBCHAIN_SIZE, - skip: (MAX_HEADERS_TO_REQUEST - 2) as u64, - }); + if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD { + // Request subchain headers + trace!(target: "sync", "Starting sync with better chain"); + // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that + // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains + return Some(BlockRequest::Headers { + start: self.last_imported_hash.clone(), + count: SUBCHAIN_SIZE, + skip: (MAX_HEADERS_TO_REQUEST - 2) as u64, + }); + } }, State::Blocks => { // check to see if we need to download any block bodies first diff --git a/sync/src/blocks.rs b/sync/src/blocks.rs index ed608d9c1..bf0c4b244 100644 --- a/sync/src/blocks.rs +++ b/sync/src/blocks.rs @@ -301,11 +301,16 @@ impl BlockCollection { self.heads.len() == 0 || (self.heads.len() == 1 && self.head.map_or(false, |h| h == self.heads[0])) } - /// Chech is collection contains a block header. + /// Check if collection contains a block header. pub fn contains(&self, hash: &H256) -> bool { self.blocks.contains_key(hash) } + /// Check if collection contains a block header. + pub fn contains_head(&self, hash: &H256) -> bool { + self.heads.contains(hash) + } + /// Return used heap size. pub fn heap_size(&self) -> usize { self.heads.heap_size_of_children() diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 2f810e754..d780eb446 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -98,7 +98,7 @@ use ethcore::snapshot::{ManifestData, RestorationStatus}; use sync_io::SyncIo; use time; use super::SyncConfig; -use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError}; +use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction}; use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; @@ -306,6 +306,15 @@ impl PeerInfo { fn is_allowed(&self) -> bool { self.confirmation != ForkConfirmation::Unconfirmed && !self.expired } + + fn reset_asking(&mut self) { + self.asking_blocks.clear(); + self.asking_hash = None; + // mark any pending requests as expired + if self.asking != PeerAsking::Nothing && self.is_allowed() { + self.expired = true; + } + } } /// Blockchain sync handler. @@ -425,12 +434,7 @@ impl ChainSync { } for (_, ref mut p) in &mut self.peers { if p.block_set != Some(BlockSet::OldBlocks) { - p.asking_blocks.clear(); - p.asking_hash = None; - // mark any pending requests as expired - if p.asking != PeerAsking::Nothing && p.is_allowed() { - p.expired = true; - } + p.reset_asking(); } } self.state = SyncState::Idle; @@ -641,8 +645,9 @@ impl ChainSync { self.clear_peer_download(peer_id); let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); + let allowed = self.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); let block_set = self.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); - if !self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() { + if !self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed { trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash); self.continue_sync(io); return Ok(()); @@ -687,7 +692,15 @@ impl ChainSync { self.continue_sync(io); return Ok(()); }, - Ok(()) => (), + Ok(DownloadAction::Reset) => { + // mark all outstanding requests as expired + trace!("Resetting downloads for {:?}", block_set); + for (_, ref mut p) in self.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { + p.reset_asking(); + } + + } + Ok(DownloadAction::None) => {}, } self.collect_blocks(io, block_set); @@ -979,7 +992,7 @@ impl ChainSync { return Ok(()); } self.clear_peer_download(peer_id); - if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || self.state != SyncState::SnapshotData { + if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (self.state != SyncState::SnapshotData && self.state != SyncState::SnapshotWaiting) { trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id); self.continue_sync(io); return Ok(()); @@ -1111,6 +1124,7 @@ impl ChainSync { }; let chain_info = io.chain().chain_info(); let syncing_difficulty = chain_info.pending_total_difficulty; + let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(); let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty); if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() { @@ -1128,7 +1142,7 @@ impl ChainSync { let have_latest = io.chain().block_status(BlockID::Hash(peer_latest)) != BlockStatus::Unknown; if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) { // check if got new blocks to download - if let Some(request) = self.new_blocks.request_blocks(io) { + if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) { self.request_blocks(io, peer_id, request, BlockSet::NewBlocks); if self.state == SyncState::Idle { self.state = SyncState::Blocks; @@ -1137,7 +1151,7 @@ impl ChainSync { } } - if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io)) { + if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) { self.request_blocks(io, peer_id, request, BlockSet::OldBlocks); return; }