Sync bandwidth optimization

This commit is contained in:
arkpar 2016-11-16 19:34:12 +01:00
parent df1fbf50d8
commit 5bddcd8003
3 changed files with 64 additions and 27 deletions

View File

@ -33,6 +33,7 @@ const MAX_BODIES_TO_REQUEST: usize = 64;
const MAX_RECEPITS_TO_REQUEST: usize = 128;
const SUBCHAIN_SIZE: u64 = 256;
const MAX_ROUND_PARENTS: usize = 32;
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Downloader state
@ -62,6 +63,14 @@ pub enum BlockRequest {
},
}
/// Indicates sync action
pub enum DownloadAction {
/// Do nothing
None,
/// Reset downloads for all peers
Reset
}
#[derive(Eq, PartialEq, Debug)]
pub enum BlockDownloaderImportError {
/// Imported data is rejected as invalid.
@ -175,11 +184,11 @@ impl BlockDownloader {
}
/// Add new block headers.
pub fn import_headers(&mut self, io: &mut SyncIo, r: &UntrustedRlp, expected_hash: Option<H256>) -> Result<(), BlockDownloaderImportError> {
pub fn import_headers(&mut self, io: &mut SyncIo, r: &UntrustedRlp, expected_hash: Option<H256>) -> Result<DownloadAction, BlockDownloaderImportError> {
let item_count = r.item_count();
if self.state == State::Idle {
trace!(target: "sync", "Ignored unexpected block headers");
return Ok(())
return Ok(DownloadAction::None)
}
if item_count == 0 && (self.state == State::Blocks) {
return Err(BlockDownloaderImportError::Invalid);
@ -188,6 +197,7 @@ impl BlockDownloader {
let mut headers = Vec::new();
let mut hashes = Vec::new();
let mut valid_response = item_count == 0; //empty response is valid
let mut any_known = false;
for i in 0..item_count {
let info: BlockHeader = try!(r.val_at(i).map_err(|e| {
trace!(target: "sync", "Error decoding block header RLP: {:?}", e);
@ -200,6 +210,7 @@ impl BlockDownloader {
valid_response = expected == info.hash()
}
}
any_known = any_known || self.blocks.contains_head(&info.hash());
if self.blocks.contains(&info.hash()) {
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash());
continue;
@ -245,17 +256,22 @@ impl BlockDownloader {
trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len());
self.blocks.reset_to(hashes);
self.state = State::Blocks;
return Ok(DownloadAction::Reset);
}
},
State::Blocks => {
let count = headers.len();
// At least one of the heades must advance the subchain. Otherwise they are all useless.
if !any_known {
return Err(BlockDownloaderImportError::Useless);
}
self.blocks.insert_headers(headers);
trace!(target: "sync", "Inserted {} headers", count);
},
_ => trace!(target: "sync", "Unexpected headers({})", headers.len()),
}
Ok(())
Ok(DownloadAction::None)
}
/// Called by peer once it has new block bodies
@ -342,13 +358,14 @@ impl BlockDownloader {
}
/// Find some headers or blocks to download for a peer.
pub fn request_blocks(&mut self, io: &mut SyncIo) -> Option<BlockRequest> {
pub fn request_blocks(&mut self, io: &mut SyncIo, num_active_peers: usize) -> Option<BlockRequest> {
match self.state {
State::Idle => {
self.start_sync_round(io);
return self.request_blocks(io);
return self.request_blocks(io, num_active_peers);
},
State::ChainHead => {
if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD {
// Request subchain headers
trace!(target: "sync", "Starting sync with better chain");
// Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that
@ -358,6 +375,7 @@ impl BlockDownloader {
count: SUBCHAIN_SIZE,
skip: (MAX_HEADERS_TO_REQUEST - 2) as u64,
});
}
},
State::Blocks => {
// check to see if we need to download any block bodies first

View File

@ -301,11 +301,16 @@ impl BlockCollection {
self.heads.len() == 0 || (self.heads.len() == 1 && self.head.map_or(false, |h| h == self.heads[0]))
}
/// Chech is collection contains a block header.
/// Check if collection contains a block header.
pub fn contains(&self, hash: &H256) -> bool {
self.blocks.contains_key(hash)
}
/// Check if collection contains a block header.
pub fn contains_head(&self, hash: &H256) -> bool {
self.heads.contains(hash)
}
/// Return used heap size.
pub fn heap_size(&self) -> usize {
self.heads.heap_size_of_children()

View File

@ -98,7 +98,7 @@ use ethcore::snapshot::{ManifestData, RestorationStatus};
use sync_io::SyncIo;
use time;
use super::SyncConfig;
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError};
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction};
use snapshot::{Snapshot, ChunkType};
use rand::{thread_rng, Rng};
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
@ -306,6 +306,15 @@ impl PeerInfo {
fn is_allowed(&self) -> bool {
self.confirmation != ForkConfirmation::Unconfirmed && !self.expired
}
fn reset_asking(&mut self) {
self.asking_blocks.clear();
self.asking_hash = None;
// mark any pending requests as expired
if self.asking != PeerAsking::Nothing && self.is_allowed() {
self.expired = true;
}
}
}
/// Blockchain sync handler.
@ -425,12 +434,7 @@ impl ChainSync {
}
for (_, ref mut p) in &mut self.peers {
if p.block_set != Some(BlockSet::OldBlocks) {
p.asking_blocks.clear();
p.asking_hash = None;
// mark any pending requests as expired
if p.asking != PeerAsking::Nothing && p.is_allowed() {
p.expired = true;
}
p.reset_asking();
}
}
self.state = SyncState::Idle;
@ -641,8 +645,9 @@ impl ChainSync {
self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let allowed = self.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false);
let block_set = self.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
if !self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() {
if !self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed {
trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash);
self.continue_sync(io);
return Ok(());
@ -687,7 +692,15 @@ impl ChainSync {
self.continue_sync(io);
return Ok(());
},
Ok(()) => (),
Ok(DownloadAction::Reset) => {
// mark all outstanding requests as expired
trace!("Resetting downloads for {:?}", block_set);
for (_, ref mut p) in self.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) {
p.reset_asking();
}
}
Ok(DownloadAction::None) => {},
}
self.collect_blocks(io, block_set);
@ -979,7 +992,7 @@ impl ChainSync {
return Ok(());
}
self.clear_peer_download(peer_id);
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || self.state != SyncState::SnapshotData {
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (self.state != SyncState::SnapshotData && self.state != SyncState::SnapshotWaiting) {
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
self.continue_sync(io);
return Ok(());
@ -1111,6 +1124,7 @@ impl ChainSync {
};
let chain_info = io.chain().chain_info();
let syncing_difficulty = chain_info.pending_total_difficulty;
let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count();
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() {
@ -1128,7 +1142,7 @@ impl ChainSync {
let have_latest = io.chain().block_status(BlockID::Hash(peer_latest)) != BlockStatus::Unknown;
if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) {
// check if got new blocks to download
if let Some(request) = self.new_blocks.request_blocks(io) {
if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) {
self.request_blocks(io, peer_id, request, BlockSet::NewBlocks);
if self.state == SyncState::Idle {
self.state = SyncState::Blocks;
@ -1137,7 +1151,7 @@ impl ChainSync {
}
}
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io)) {
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) {
self.request_blocks(io, peer_id, request, BlockSet::OldBlocks);
return;
}