Fix ancient blocks sync (#9531)
* Log block set in block_sync for easier debugging * logging macros * Match no args in sync logging macros * Add QueueFull error * Only allow importing headers if the first matches requested * WIP * Test for chain head gaps and log * Calc distance even with 2 heads * Revert previous commits, preparing simple fix This reverts commit 5f38aa885b22ebb0e3a1d60120cea69f9f322628. * Reject headers with no gaps when ChainHead * Reset block sync download when queue full * Simplify check for subchain heads * Add comment to explain subchain heads filter * Fix is_subchain_heads check and comment * Prevent premature round completion after restart This is a problem on mainnet where multiple stale peer requests will force many rounds to complete quickly, forcing the retraction. * Reset stale old blocks request after queue full * Revert "Reject headers with no gaps when ChainHead" This reverts commit 0eb865539e5dee37ab34f168f5fb643300de5ace. * Add BlockSet to BlockDownloader logging Currently it is difficult to debug this because there are two instances, one for OldBlocks and one for NewBlocks. This adds the BlockSet to all log messages for easy log filtering. * Reset OldBlocks download from last enqueued Previously when the ancient block queue was full it would restart the download from the last imported block, so the ones still in the queue would be redownloaded. Keeping the existing downloader instance and just resetting it will start again from the last enqueued block.:wq * Ignore expired Body and Receipt requests * Log when ancient block download being restarted * Only request old blocks from peers with >= difficulty https://github.com/paritytech/parity-ethereum/pull/9226 might be too permissive and causing the behaviour of the retraction soon after the fork block. With this change the peer difficulty has to be greater than or euqal to our syncing difficulty, so should still fix https://github.com/paritytech/parity-ethereum/issues/9225 * Some logging and clear stalled blocks head * Revert "Some logging and clear stalled blocks head" This reverts commit 757641d9b817ae8b63fec684759b0815af9c4d0e. * Reset stalled header if useless more than once * Store useless headers in HashSet * Add sync target to logging macro * Don't disable useless peer and fix log macro * Clear useless headers on reset and comments * Use custom error for collecting blocks Previously we resued BlockImportError, however only the Invalid case and this made little sense with the QueueFull error. * Remove blank line * Test for reset sync after consecutive useless headers * Don't reset after consecutive headers when chain head * Delete commented out imports * Return DownloadAction from collect_blocks instead of error * Don't reset after round complete, was causing test hangs * Add comment explaining reset after useless * Replace HashSet with counter for useless headers * Refactor sync reset on bad block/queue full * Add missing target for log message * Fix compiler errors and test after merge * ethcore: revert ethereum tests submodule update
This commit is contained in:
parent
bc056c41bc
commit
4b6ebcbb61
@ -28,6 +28,7 @@ use ethcore::client::{BlockStatus, BlockId};
|
|||||||
use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError, Error as EthcoreError, ErrorKind as EthcoreErrorKind};
|
use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError, Error as EthcoreError, ErrorKind as EthcoreErrorKind};
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
use blocks::{BlockCollection, SyncBody, SyncHeader};
|
use blocks::{BlockCollection, SyncBody, SyncHeader};
|
||||||
|
use chain::BlockSet;
|
||||||
|
|
||||||
const MAX_HEADERS_TO_REQUEST: usize = 128;
|
const MAX_HEADERS_TO_REQUEST: usize = 128;
|
||||||
const MAX_BODIES_TO_REQUEST: usize = 32;
|
const MAX_BODIES_TO_REQUEST: usize = 32;
|
||||||
@ -35,6 +36,26 @@ const MAX_RECEPITS_TO_REQUEST: usize = 128;
|
|||||||
const SUBCHAIN_SIZE: u64 = 256;
|
const SUBCHAIN_SIZE: u64 = 256;
|
||||||
const MAX_ROUND_PARENTS: usize = 16;
|
const MAX_ROUND_PARENTS: usize = 16;
|
||||||
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
|
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
|
||||||
|
const MAX_USELESS_HEADERS_PER_ROUND: usize = 3;
|
||||||
|
|
||||||
|
// logging macros prepend BlockSet context for log filtering
|
||||||
|
macro_rules! trace_sync {
|
||||||
|
($self:ident, $fmt:expr, $($arg:tt)+) => {
|
||||||
|
trace!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set, $($arg)+);
|
||||||
|
};
|
||||||
|
($self:ident, $fmt:expr) => {
|
||||||
|
trace!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! debug_sync {
|
||||||
|
($self:ident, $fmt:expr, $($arg:tt)+) => {
|
||||||
|
debug!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set, $($arg)+);
|
||||||
|
};
|
||||||
|
($self:ident, $fmt:expr) => {
|
||||||
|
debug!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||||
/// Downloader state
|
/// Downloader state
|
||||||
@ -65,6 +86,7 @@ pub enum BlockRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Indicates sync action
|
/// Indicates sync action
|
||||||
|
#[derive(Eq, PartialEq, Debug)]
|
||||||
pub enum DownloadAction {
|
pub enum DownloadAction {
|
||||||
/// Do nothing
|
/// Do nothing
|
||||||
None,
|
None,
|
||||||
@ -89,15 +111,17 @@ impl From<rlp::DecoderError> for BlockDownloaderImportError {
|
|||||||
/// Block downloader strategy.
|
/// Block downloader strategy.
|
||||||
/// Manages state and block data for a block download process.
|
/// Manages state and block data for a block download process.
|
||||||
pub struct BlockDownloader {
|
pub struct BlockDownloader {
|
||||||
|
/// Which set of blocks to download
|
||||||
|
block_set: BlockSet,
|
||||||
/// Downloader state
|
/// Downloader state
|
||||||
state: State,
|
state: State,
|
||||||
/// Highest block number seen
|
/// Highest block number seen
|
||||||
highest_block: Option<BlockNumber>,
|
highest_block: Option<BlockNumber>,
|
||||||
/// Downloaded blocks, holds `H`, `B` and `S`
|
/// Downloaded blocks, holds `H`, `B` and `S`
|
||||||
blocks: BlockCollection,
|
blocks: BlockCollection,
|
||||||
/// Last impoted block number
|
/// Last imported block number
|
||||||
last_imported_block: BlockNumber,
|
last_imported_block: BlockNumber,
|
||||||
/// Last impoted block hash
|
/// Last imported block hash
|
||||||
last_imported_hash: H256,
|
last_imported_hash: H256,
|
||||||
/// Number of blocks imported this round
|
/// Number of blocks imported this round
|
||||||
imported_this_round: Option<usize>,
|
imported_this_round: Option<usize>,
|
||||||
@ -114,13 +138,20 @@ pub struct BlockDownloader {
|
|||||||
retract_step: u64,
|
retract_step: u64,
|
||||||
/// Whether reorg should be limited.
|
/// Whether reorg should be limited.
|
||||||
limit_reorg: bool,
|
limit_reorg: bool,
|
||||||
|
/// consecutive useless headers this round
|
||||||
|
useless_headers_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockDownloader {
|
impl BlockDownloader {
|
||||||
/// Create a new instance of syncing strategy. This won't reorganize to before the
|
/// Create a new instance of syncing strategy.
|
||||||
/// last kept state.
|
/// For BlockSet::NewBlocks this won't reorganize to before the last kept state.
|
||||||
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self {
|
pub fn new(block_set: BlockSet, start_hash: &H256, start_number: BlockNumber) -> Self {
|
||||||
|
let (limit_reorg, sync_receipts) = match block_set {
|
||||||
|
BlockSet::NewBlocks => (true, false),
|
||||||
|
BlockSet::OldBlocks => (false, true)
|
||||||
|
};
|
||||||
BlockDownloader {
|
BlockDownloader {
|
||||||
|
block_set: block_set,
|
||||||
state: State::Idle,
|
state: State::Idle,
|
||||||
highest_block: None,
|
highest_block: None,
|
||||||
last_imported_block: start_number,
|
last_imported_block: start_number,
|
||||||
@ -133,32 +164,15 @@ impl BlockDownloader {
|
|||||||
download_receipts: sync_receipts,
|
download_receipts: sync_receipts,
|
||||||
target_hash: None,
|
target_hash: None,
|
||||||
retract_step: 1,
|
retract_step: 1,
|
||||||
limit_reorg: true,
|
limit_reorg: limit_reorg,
|
||||||
}
|
useless_headers_count: 0,
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a new instance of sync with unlimited reorg allowed.
|
|
||||||
pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self {
|
|
||||||
BlockDownloader {
|
|
||||||
state: State::Idle,
|
|
||||||
highest_block: None,
|
|
||||||
last_imported_block: start_number,
|
|
||||||
last_imported_hash: start_hash.clone(),
|
|
||||||
last_round_start: start_number,
|
|
||||||
last_round_start_hash: start_hash.clone(),
|
|
||||||
blocks: BlockCollection::new(sync_receipts),
|
|
||||||
imported_this_round: None,
|
|
||||||
round_parents: VecDeque::new(),
|
|
||||||
download_receipts: sync_receipts,
|
|
||||||
target_hash: None,
|
|
||||||
retract_step: 1,
|
|
||||||
limit_reorg: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reset sync. Clear all local downloaded data.
|
/// Reset sync. Clear all local downloaded data.
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self) {
|
||||||
self.blocks.clear();
|
self.blocks.clear();
|
||||||
|
self.useless_headers_count = 0;
|
||||||
self.state = State::Idle;
|
self.state = State::Idle;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,7 +237,7 @@ impl BlockDownloader {
|
|||||||
pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: H256) -> Result<DownloadAction, BlockDownloaderImportError> {
|
pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: H256) -> Result<DownloadAction, BlockDownloaderImportError> {
|
||||||
let item_count = r.item_count().unwrap_or(0);
|
let item_count = r.item_count().unwrap_or(0);
|
||||||
if self.state == State::Idle {
|
if self.state == State::Idle {
|
||||||
trace!(target: "sync", "Ignored unexpected block headers");
|
trace_sync!(self, "Ignored unexpected block headers");
|
||||||
return Ok(DownloadAction::None)
|
return Ok(DownloadAction::None)
|
||||||
}
|
}
|
||||||
if item_count == 0 && (self.state == State::Blocks) {
|
if item_count == 0 && (self.state == State::Blocks) {
|
||||||
@ -270,15 +284,15 @@ impl BlockDownloader {
|
|||||||
|
|
||||||
last_header = Some((number, hash));
|
last_header = Some((number, hash));
|
||||||
if self.blocks.contains(&hash) {
|
if self.blocks.contains(&hash) {
|
||||||
trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash);
|
trace_sync!(self, "Skipping existing block header {} ({:?})", number, hash);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match io.chain().block_status(BlockId::Hash(hash.clone())) {
|
match io.chain().block_status(BlockId::Hash(hash.clone())) {
|
||||||
BlockStatus::InChain | BlockStatus::Queued => {
|
BlockStatus::InChain | BlockStatus::Queued => {
|
||||||
match self.state {
|
match self.state {
|
||||||
State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash),
|
State::Blocks => trace_sync!(self, "Header already in chain {} ({})", number, hash),
|
||||||
_ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state),
|
_ => trace_sync!(self, "Header already in chain {} ({}), state = {:?}", number, hash, self.state),
|
||||||
}
|
}
|
||||||
headers.push(info);
|
headers.push(info);
|
||||||
hashes.push(hash);
|
hashes.push(hash);
|
||||||
@ -302,7 +316,7 @@ impl BlockDownloader {
|
|||||||
match self.state {
|
match self.state {
|
||||||
State::ChainHead => {
|
State::ChainHead => {
|
||||||
if !headers.is_empty() {
|
if !headers.is_empty() {
|
||||||
trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len());
|
trace_sync!(self, "Received {} subchain heads, proceeding to download", headers.len());
|
||||||
self.blocks.reset_to(hashes);
|
self.blocks.reset_to(hashes);
|
||||||
self.state = State::Blocks;
|
self.state = State::Blocks;
|
||||||
return Ok(DownloadAction::Reset);
|
return Ok(DownloadAction::Reset);
|
||||||
@ -311,21 +325,29 @@ impl BlockDownloader {
|
|||||||
let oldest_reorg = io.chain().pruning_info().earliest_state;
|
let oldest_reorg = io.chain().pruning_info().earliest_state;
|
||||||
let last = self.last_imported_block;
|
let last = self.last_imported_block;
|
||||||
if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) {
|
if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) {
|
||||||
trace!(target: "sync", "No common block, disabling peer");
|
trace_sync!(self, "No common block, disabling peer");
|
||||||
return Err(BlockDownloaderImportError::Invalid);
|
return Err(BlockDownloaderImportError::Invalid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
State::Blocks => {
|
State::Blocks => {
|
||||||
let count = headers.len();
|
let count = headers.len();
|
||||||
|
// At least one of the headers must advance the subchain. Otherwise they are all useless.
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
trace!(target: "sync", "No useful headers");
|
self.useless_headers_count += 1;
|
||||||
|
trace_sync!(self, "No useful headers ({:?} this round), expected hash {:?}", self.useless_headers_count, expected_hash);
|
||||||
|
// only reset download if we have multiple subchain heads, to avoid unnecessary resets
|
||||||
|
// when we are at the head of the chain when we may legitimately receive no useful headers
|
||||||
|
if self.blocks.heads_len() > 1 && self.useless_headers_count >= MAX_USELESS_HEADERS_PER_ROUND {
|
||||||
|
trace_sync!(self, "Received {:?} useless responses this round. Resetting sync", MAX_USELESS_HEADERS_PER_ROUND);
|
||||||
|
self.reset();
|
||||||
|
}
|
||||||
return Err(BlockDownloaderImportError::Useless);
|
return Err(BlockDownloaderImportError::Useless);
|
||||||
}
|
}
|
||||||
self.blocks.insert_headers(headers);
|
self.blocks.insert_headers(headers);
|
||||||
trace!(target: "sync", "Inserted {} headers", count);
|
trace_sync!(self, "Inserted {} headers", count);
|
||||||
},
|
},
|
||||||
_ => trace!(target: "sync", "Unexpected headers({})", headers.len()),
|
_ => trace_sync!(self, "Unexpected headers({})", headers.len()),
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(DownloadAction::None)
|
Ok(DownloadAction::None)
|
||||||
@ -337,7 +359,7 @@ impl BlockDownloader {
|
|||||||
if item_count == 0 {
|
if item_count == 0 {
|
||||||
return Err(BlockDownloaderImportError::Useless);
|
return Err(BlockDownloaderImportError::Useless);
|
||||||
} else if self.state != State::Blocks {
|
} else if self.state != State::Blocks {
|
||||||
trace!(target: "sync", "Ignored unexpected block bodies");
|
trace_sync!(self, "Ignored unexpected block bodies");
|
||||||
} else {
|
} else {
|
||||||
let mut bodies = Vec::with_capacity(item_count);
|
let mut bodies = Vec::with_capacity(item_count);
|
||||||
for i in 0..item_count {
|
for i in 0..item_count {
|
||||||
@ -347,11 +369,11 @@ impl BlockDownloader {
|
|||||||
|
|
||||||
let hashes = self.blocks.insert_bodies(bodies);
|
let hashes = self.blocks.insert_bodies(bodies);
|
||||||
if hashes.len() != item_count {
|
if hashes.len() != item_count {
|
||||||
trace!(target: "sync", "Deactivating peer for giving invalid block bodies");
|
trace_sync!(self, "Deactivating peer for giving invalid block bodies");
|
||||||
return Err(BlockDownloaderImportError::Invalid);
|
return Err(BlockDownloaderImportError::Invalid);
|
||||||
}
|
}
|
||||||
if !all_expected(hashes.as_slice(), expected_hashes, |&a, &b| a == b) {
|
if !all_expected(hashes.as_slice(), expected_hashes, |&a, &b| a == b) {
|
||||||
trace!(target: "sync", "Deactivating peer for giving unexpected block bodies");
|
trace_sync!(self, "Deactivating peer for giving unexpected block bodies");
|
||||||
return Err(BlockDownloaderImportError::Invalid);
|
return Err(BlockDownloaderImportError::Invalid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -365,24 +387,24 @@ impl BlockDownloader {
|
|||||||
return Err(BlockDownloaderImportError::Useless);
|
return Err(BlockDownloaderImportError::Useless);
|
||||||
}
|
}
|
||||||
else if self.state != State::Blocks {
|
else if self.state != State::Blocks {
|
||||||
trace!(target: "sync", "Ignored unexpected block receipts");
|
trace_sync!(self, "Ignored unexpected block receipts");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
let mut receipts = Vec::with_capacity(item_count);
|
let mut receipts = Vec::with_capacity(item_count);
|
||||||
for i in 0..item_count {
|
for i in 0..item_count {
|
||||||
let receipt = r.at(i).map_err(|e| {
|
let receipt = r.at(i).map_err(|e| {
|
||||||
trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e);
|
trace_sync!(self, "Error decoding block receipts RLP: {:?}", e);
|
||||||
BlockDownloaderImportError::Invalid
|
BlockDownloaderImportError::Invalid
|
||||||
})?;
|
})?;
|
||||||
receipts.push(receipt.as_raw().to_vec());
|
receipts.push(receipt.as_raw().to_vec());
|
||||||
}
|
}
|
||||||
let hashes = self.blocks.insert_receipts(receipts);
|
let hashes = self.blocks.insert_receipts(receipts);
|
||||||
if hashes.len() != item_count {
|
if hashes.len() != item_count {
|
||||||
trace!(target: "sync", "Deactivating peer for giving invalid block receipts");
|
trace_sync!(self, "Deactivating peer for giving invalid block receipts");
|
||||||
return Err(BlockDownloaderImportError::Invalid);
|
return Err(BlockDownloaderImportError::Invalid);
|
||||||
}
|
}
|
||||||
if !all_expected(hashes.as_slice(), expected_hashes, |a, b| a.contains(b)) {
|
if !all_expected(hashes.as_slice(), expected_hashes, |a, b| a.contains(b)) {
|
||||||
trace!(target: "sync", "Deactivating peer for giving unexpected block receipts");
|
trace_sync!(self, "Deactivating peer for giving unexpected block receipts");
|
||||||
return Err(BlockDownloaderImportError::Invalid);
|
return Err(BlockDownloaderImportError::Invalid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -391,7 +413,7 @@ impl BlockDownloader {
|
|||||||
|
|
||||||
fn start_sync_round(&mut self, io: &mut SyncIo) {
|
fn start_sync_round(&mut self, io: &mut SyncIo) {
|
||||||
self.state = State::ChainHead;
|
self.state = State::ChainHead;
|
||||||
trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block);
|
trace_sync!(self, "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block);
|
||||||
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
|
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
|
||||||
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
|
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
|
||||||
let start = self.last_round_start;
|
let start = self.last_round_start;
|
||||||
@ -403,12 +425,12 @@ impl BlockDownloader {
|
|||||||
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) {
|
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) {
|
||||||
self.last_imported_block = start - 1;
|
self.last_imported_block = start - 1;
|
||||||
self.last_imported_hash = p.clone();
|
self.last_imported_hash = p.clone();
|
||||||
trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
|
trace_sync!(self, "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
|
||||||
} else {
|
} else {
|
||||||
let best = io.chain().chain_info().best_block_number;
|
let best = io.chain().chain_info().best_block_number;
|
||||||
let oldest_reorg = io.chain().pruning_info().earliest_state;
|
let oldest_reorg = io.chain().pruning_info().earliest_state;
|
||||||
if self.limit_reorg && best > start && start < oldest_reorg {
|
if self.limit_reorg && best > start && start < oldest_reorg {
|
||||||
debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
|
debug_sync!(self, "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
|
||||||
self.reset();
|
self.reset();
|
||||||
} else {
|
} else {
|
||||||
let n = start - cmp::min(self.retract_step, start);
|
let n = start - cmp::min(self.retract_step, start);
|
||||||
@ -417,10 +439,10 @@ impl BlockDownloader {
|
|||||||
Some(h) => {
|
Some(h) => {
|
||||||
self.last_imported_block = n;
|
self.last_imported_block = n;
|
||||||
self.last_imported_hash = h;
|
self.last_imported_hash = h;
|
||||||
trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
|
trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
|
debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
|
||||||
self.reset();
|
self.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -448,7 +470,7 @@ impl BlockDownloader {
|
|||||||
State::ChainHead => {
|
State::ChainHead => {
|
||||||
if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD {
|
if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD {
|
||||||
// Request subchain headers
|
// Request subchain headers
|
||||||
trace!(target: "sync", "Starting sync with better chain");
|
trace_sync!(self, "Starting sync with better chain");
|
||||||
// Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that
|
// Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that
|
||||||
// MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains
|
// MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains
|
||||||
return Some(BlockRequest::Headers {
|
return Some(BlockRequest::Headers {
|
||||||
@ -491,8 +513,9 @@ impl BlockDownloader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
|
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
|
||||||
pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> {
|
/// Returns DownloadAction::Reset if it is imported all the the blocks it can and all downloading peers should be reset
|
||||||
let mut bad = false;
|
pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> DownloadAction {
|
||||||
|
let mut download_action = DownloadAction::None;
|
||||||
let mut imported = HashSet::new();
|
let mut imported = HashSet::new();
|
||||||
let blocks = self.blocks.drain();
|
let blocks = self.blocks.drain();
|
||||||
let count = blocks.len();
|
let count = blocks.len();
|
||||||
@ -506,8 +529,8 @@ impl BlockDownloader {
|
|||||||
|
|
||||||
if self.target_hash.as_ref().map_or(false, |t| t == &h) {
|
if self.target_hash.as_ref().map_or(false, |t| t == &h) {
|
||||||
self.state = State::Complete;
|
self.state = State::Complete;
|
||||||
trace!(target: "sync", "Sync target reached");
|
trace_sync!(self, "Sync target reached");
|
||||||
return Ok(());
|
return download_action;
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = if let Some(receipts) = receipts {
|
let result = if let Some(receipts) = receipts {
|
||||||
@ -518,15 +541,15 @@ impl BlockDownloader {
|
|||||||
|
|
||||||
match result {
|
match result {
|
||||||
Err(EthcoreError(EthcoreErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => {
|
Err(EthcoreError(EthcoreErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => {
|
||||||
trace!(target: "sync", "Block already in chain {:?}", h);
|
trace_sync!(self, "Block already in chain {:?}", h);
|
||||||
self.block_imported(&h, number, &parent);
|
self.block_imported(&h, number, &parent);
|
||||||
},
|
},
|
||||||
Err(EthcoreError(EthcoreErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => {
|
Err(EthcoreError(EthcoreErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => {
|
||||||
trace!(target: "sync", "Block already queued {:?}", h);
|
trace_sync!(self, "Block already queued {:?}", h);
|
||||||
self.block_imported(&h, number, &parent);
|
self.block_imported(&h, number, &parent);
|
||||||
},
|
},
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!(target: "sync", "Block queued {:?}", h);
|
trace_sync!(self, "Block queued {:?}", h);
|
||||||
imported.insert(h.clone());
|
imported.insert(h.clone());
|
||||||
self.block_imported(&h, number, &parent);
|
self.block_imported(&h, number, &parent);
|
||||||
},
|
},
|
||||||
@ -534,37 +557,34 @@ impl BlockDownloader {
|
|||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
Err(EthcoreError(EthcoreErrorKind::Block(BlockError::UnknownParent(_)), _)) => {
|
Err(EthcoreError(EthcoreErrorKind::Block(BlockError::UnknownParent(_)), _)) => {
|
||||||
trace!(target: "sync", "Unknown new block parent, restarting sync");
|
trace_sync!(self, "Unknown new block parent, restarting sync");
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
Err(EthcoreError(EthcoreErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => {
|
Err(EthcoreError(EthcoreErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => {
|
||||||
debug!(target: "sync", "Block temporarily invalid, restarting sync");
|
debug_sync!(self, "Block temporarily invalid: {:?}, restarting sync", h);
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
Err(EthcoreError(EthcoreErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => {
|
Err(EthcoreError(EthcoreErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => {
|
||||||
debug!(target: "sync", "Block import queue full ({}), restarting sync", limit);
|
debug_sync!(self, "Block import queue full ({}), restarting sync", limit);
|
||||||
|
download_action = DownloadAction::Reset;
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(target: "sync", "Bad block {:?} : {:?}", h, e);
|
debug_sync!(self, "Bad block {:?} : {:?}", h, e);
|
||||||
bad = true;
|
download_action = DownloadAction::Reset;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!(target: "sync", "Imported {} of {}", imported.len(), count);
|
trace_sync!(self, "Imported {} of {}", imported.len(), count);
|
||||||
self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len());
|
self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len());
|
||||||
|
|
||||||
if bad {
|
|
||||||
return Err(BlockDownloaderImportError::Invalid);
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.blocks.is_empty() {
|
if self.blocks.is_empty() {
|
||||||
// complete sync round
|
// complete sync round
|
||||||
trace!(target: "sync", "Sync round complete");
|
trace_sync!(self, "Sync round complete");
|
||||||
self.reset();
|
download_action = DownloadAction::Reset;
|
||||||
}
|
}
|
||||||
Ok(())
|
download_action
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
|
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
|
||||||
@ -623,6 +643,20 @@ mod tests {
|
|||||||
Transaction::default().sign(keypair.secret(), None)
|
Transaction::default().sign(keypair.secret(), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn import_headers(headers: &[BlockHeader], downloader: &mut BlockDownloader, io: &mut SyncIo) -> Result<DownloadAction, BlockDownloaderImportError> {
|
||||||
|
let mut stream = RlpStream::new();
|
||||||
|
stream.append_list(headers);
|
||||||
|
let bytes = stream.out();
|
||||||
|
let rlp = Rlp::new(&bytes);
|
||||||
|
let expected_hash = headers.first().unwrap().hash();
|
||||||
|
downloader.import_headers(io, &rlp, expected_hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn import_headers_ok(headers: &[BlockHeader], downloader: &mut BlockDownloader, io: &mut SyncIo) {
|
||||||
|
let res = import_headers(headers, downloader, io);
|
||||||
|
assert!(res.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn import_headers_in_chain_head_state() {
|
fn import_headers_in_chain_head_state() {
|
||||||
::env_logger::try_init().ok();
|
::env_logger::try_init().ok();
|
||||||
@ -630,7 +664,7 @@ mod tests {
|
|||||||
let spec = Spec::new_test();
|
let spec = Spec::new_test();
|
||||||
let genesis_hash = spec.genesis_header().hash();
|
let genesis_hash = spec.genesis_header().hash();
|
||||||
|
|
||||||
let mut downloader = BlockDownloader::new(false, &genesis_hash, 0);
|
let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0);
|
||||||
downloader.state = State::ChainHead;
|
downloader.state = State::ChainHead;
|
||||||
|
|
||||||
let mut chain = TestBlockChainClient::new();
|
let mut chain = TestBlockChainClient::new();
|
||||||
@ -712,7 +746,7 @@ mod tests {
|
|||||||
let parent_hash = headers[1].hash();
|
let parent_hash = headers[1].hash();
|
||||||
headers.push(dummy_header(129, parent_hash));
|
headers.push(dummy_header(129, parent_hash));
|
||||||
|
|
||||||
let mut downloader = BlockDownloader::new(false, &H256::random(), 0);
|
let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &H256::random(), 0);
|
||||||
downloader.state = State::Blocks;
|
downloader.state = State::Blocks;
|
||||||
downloader.blocks.reset_to(vec![headers[0].hash()]);
|
downloader.blocks.reset_to(vec![headers[0].hash()]);
|
||||||
|
|
||||||
@ -782,7 +816,7 @@ mod tests {
|
|||||||
headers.push(header);
|
headers.push(header);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut downloader = BlockDownloader::new(false, &headers[0].hash(), 0);
|
let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &headers[0].hash(), 0);
|
||||||
downloader.state = State::Blocks;
|
downloader.state = State::Blocks;
|
||||||
downloader.blocks.reset_to(vec![headers[0].hash()]);
|
downloader.blocks.reset_to(vec![headers[0].hash()]);
|
||||||
|
|
||||||
@ -846,7 +880,7 @@ mod tests {
|
|||||||
headers.push(header);
|
headers.push(header);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut downloader = BlockDownloader::new(true, &headers[0].hash(), 0);
|
let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &headers[0].hash(), 0);
|
||||||
downloader.state = State::Blocks;
|
downloader.state = State::Blocks;
|
||||||
downloader.blocks.reset_to(vec![headers[0].hash()]);
|
downloader.blocks.reset_to(vec![headers[0].hash()]);
|
||||||
|
|
||||||
@ -871,4 +905,84 @@ mod tests {
|
|||||||
_ => panic!("expected BlockDownloaderImportError"),
|
_ => panic!("expected BlockDownloaderImportError"),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reset_after_multiple_sets_of_useless_headers() {
|
||||||
|
::env_logger::try_init().ok();
|
||||||
|
|
||||||
|
let spec = Spec::new_test();
|
||||||
|
let genesis_hash = spec.genesis_header().hash();
|
||||||
|
|
||||||
|
let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0);
|
||||||
|
downloader.state = State::ChainHead;
|
||||||
|
|
||||||
|
let mut chain = TestBlockChainClient::new();
|
||||||
|
let snapshot_service = TestSnapshotService::new();
|
||||||
|
let queue = RwLock::new(VecDeque::new());
|
||||||
|
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None);
|
||||||
|
|
||||||
|
let heads = [
|
||||||
|
spec.genesis_header(),
|
||||||
|
dummy_header(127, H256::random()),
|
||||||
|
dummy_header(254, H256::random()),
|
||||||
|
];
|
||||||
|
|
||||||
|
let short_subchain = [dummy_header(1, genesis_hash)];
|
||||||
|
|
||||||
|
import_headers_ok(&heads, &mut downloader, &mut io);
|
||||||
|
import_headers_ok(&short_subchain, &mut downloader, &mut io);
|
||||||
|
|
||||||
|
assert_eq!(downloader.state, State::Blocks);
|
||||||
|
assert!(!downloader.blocks.is_empty());
|
||||||
|
|
||||||
|
// simulate receiving useless headers
|
||||||
|
let head = vec![short_subchain.last().unwrap().clone()];
|
||||||
|
for _ in 0..MAX_USELESS_HEADERS_PER_ROUND {
|
||||||
|
let res = import_headers(&head, &mut downloader, &mut io);
|
||||||
|
assert!(res.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(downloader.state, State::Idle);
|
||||||
|
assert!(downloader.blocks.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dont_reset_after_multiple_sets_of_useless_headers_for_chain_head() {
|
||||||
|
::env_logger::try_init().ok();
|
||||||
|
|
||||||
|
let spec = Spec::new_test();
|
||||||
|
let genesis_hash = spec.genesis_header().hash();
|
||||||
|
|
||||||
|
let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0);
|
||||||
|
downloader.state = State::ChainHead;
|
||||||
|
|
||||||
|
let mut chain = TestBlockChainClient::new();
|
||||||
|
let snapshot_service = TestSnapshotService::new();
|
||||||
|
let queue = RwLock::new(VecDeque::new());
|
||||||
|
let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None);
|
||||||
|
|
||||||
|
let heads = [
|
||||||
|
spec.genesis_header()
|
||||||
|
];
|
||||||
|
|
||||||
|
let short_subchain = [dummy_header(1, genesis_hash)];
|
||||||
|
|
||||||
|
import_headers_ok(&heads, &mut downloader, &mut io);
|
||||||
|
import_headers_ok(&short_subchain, &mut downloader, &mut io);
|
||||||
|
|
||||||
|
assert_eq!(downloader.state, State::Blocks);
|
||||||
|
assert!(!downloader.blocks.is_empty());
|
||||||
|
|
||||||
|
// simulate receiving useless headers
|
||||||
|
let head = vec![short_subchain.last().unwrap().clone()];
|
||||||
|
for _ in 0..MAX_USELESS_HEADERS_PER_ROUND {
|
||||||
|
let res = import_headers(&head, &mut downloader, &mut io);
|
||||||
|
assert!(res.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
// download shouldn't be reset since this is the chain head for a single subchain.
|
||||||
|
// this state usually occurs for NewBlocks when it has reached the chain head.
|
||||||
|
assert_eq!(downloader.state, State::Blocks);
|
||||||
|
assert!(!downloader.blocks.is_empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -394,6 +394,11 @@ impl BlockCollection {
|
|||||||
self.blocks.contains_key(hash)
|
self.blocks.contains_key(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check the number of heads
|
||||||
|
pub fn heads_len(&self) -> usize {
|
||||||
|
self.heads.len()
|
||||||
|
}
|
||||||
|
|
||||||
/// Return used heap size.
|
/// Return used heap size.
|
||||||
pub fn heap_size(&self) -> usize {
|
pub fn heap_size(&self) -> usize {
|
||||||
self.heads.heap_size_of_children()
|
self.heads.heap_size_of_children()
|
||||||
|
@ -293,7 +293,9 @@ impl SyncHandler {
|
|||||||
let block_set = sync.peers.get(&peer_id)
|
let block_set = sync.peers.get(&peer_id)
|
||||||
.and_then(|p| p.block_set)
|
.and_then(|p| p.block_set)
|
||||||
.unwrap_or(BlockSet::NewBlocks);
|
.unwrap_or(BlockSet::NewBlocks);
|
||||||
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) {
|
let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false);
|
||||||
|
|
||||||
|
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) || !allowed {
|
||||||
trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id);
|
trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -420,12 +422,8 @@ impl SyncHandler {
|
|||||||
downloader.import_headers(io, r, expected_hash)?
|
downloader.import_headers(io, r, expected_hash)?
|
||||||
};
|
};
|
||||||
|
|
||||||
if let DownloadAction::Reset = result {
|
if result == DownloadAction::Reset {
|
||||||
// mark all outstanding requests as expired
|
sync.reset_downloads(block_set);
|
||||||
trace!("Resetting downloads for {:?}", block_set);
|
|
||||||
for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) {
|
|
||||||
p.reset_asking();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sync.collect_blocks(io, block_set);
|
sync.collect_blocks(io, block_set);
|
||||||
@ -436,7 +434,8 @@ impl SyncHandler {
|
|||||||
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||||
sync.clear_peer_download(peer_id);
|
sync.clear_peer_download(peer_id);
|
||||||
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
|
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
|
||||||
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) {
|
let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false);
|
||||||
|
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) || !allowed {
|
||||||
trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id);
|
trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -109,7 +109,7 @@ use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, Bl
|
|||||||
use ethcore::snapshot::{RestorationStatus};
|
use ethcore::snapshot::{RestorationStatus};
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
use super::{WarpSync, SyncConfig};
|
use super::{WarpSync, SyncConfig};
|
||||||
use block_sync::{BlockDownloader, BlockDownloaderImportError as DownloaderImportError};
|
use block_sync::{BlockDownloader, DownloadAction};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use snapshot::{Snapshot};
|
use snapshot::{Snapshot};
|
||||||
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
|
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
|
||||||
@ -429,7 +429,7 @@ impl ChainSync {
|
|||||||
peers: HashMap::new(),
|
peers: HashMap::new(),
|
||||||
handshaking_peers: HashMap::new(),
|
handshaking_peers: HashMap::new(),
|
||||||
active_peers: HashSet::new(),
|
active_peers: HashSet::new(),
|
||||||
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number),
|
new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number),
|
||||||
old_blocks: None,
|
old_blocks: None,
|
||||||
last_sent_block_number: 0,
|
last_sent_block_number: 0,
|
||||||
network_id: config.network_id,
|
network_id: config.network_id,
|
||||||
@ -638,13 +638,13 @@ impl ChainSync {
|
|||||||
pub fn update_targets(&mut self, chain: &BlockChainClient) {
|
pub fn update_targets(&mut self, chain: &BlockChainClient) {
|
||||||
// Do not assume that the block queue/chain still has our last_imported_block
|
// Do not assume that the block queue/chain still has our last_imported_block
|
||||||
let chain = chain.chain_info();
|
let chain = chain.chain_info();
|
||||||
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number);
|
self.new_blocks = BlockDownloader::new(BlockSet::NewBlocks, &chain.best_block_hash, chain.best_block_number);
|
||||||
self.old_blocks = None;
|
self.old_blocks = None;
|
||||||
if self.download_old_blocks {
|
if self.download_old_blocks {
|
||||||
if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) {
|
if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) {
|
||||||
|
|
||||||
trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number);
|
trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number);
|
||||||
let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number);
|
let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &ancient_block_hash, ancient_block_number);
|
||||||
if let Some(hash) = chain.first_block_hash {
|
if let Some(hash) = chain.first_block_hash {
|
||||||
trace!(target: "sync", "Downloader target set to {:?}", hash);
|
trace!(target: "sync", "Downloader target set to {:?}", hash);
|
||||||
downloader.set_target(&hash);
|
downloader.set_target(&hash);
|
||||||
@ -763,12 +763,10 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only ask for old blocks if the peer has a higher difficulty than the last imported old block
|
// Only ask for old blocks if the peer has an equal or higher difficulty
|
||||||
let last_imported_old_block_difficulty = self.old_blocks.as_mut().and_then(|d| {
|
let equal_or_higher_difficulty = peer_difficulty.map_or(false, |pd| pd >= syncing_difficulty);
|
||||||
io.chain().block_total_difficulty(BlockId::Number(d.last_imported_block_number()))
|
|
||||||
});
|
|
||||||
|
|
||||||
if force || last_imported_old_block_difficulty.map_or(true, |ld| peer_difficulty.map_or(true, |pd| pd > ld)) {
|
if force || equal_or_higher_difficulty {
|
||||||
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) {
|
if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) {
|
||||||
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks);
|
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks);
|
||||||
return;
|
return;
|
||||||
@ -776,9 +774,9 @@ impl ChainSync {
|
|||||||
} else {
|
} else {
|
||||||
trace!(
|
trace!(
|
||||||
target: "sync",
|
target: "sync",
|
||||||
"peer {:?} is not suitable for requesting old blocks, last_imported_old_block_difficulty={:?}, peer_difficulty={:?}",
|
"peer {:?} is not suitable for requesting old blocks, syncing_difficulty={:?}, peer_difficulty={:?}",
|
||||||
peer_id,
|
peer_id,
|
||||||
last_imported_old_block_difficulty,
|
syncing_difficulty,
|
||||||
peer_difficulty
|
peer_difficulty
|
||||||
);
|
);
|
||||||
self.deactivate_peer(io, peer_id);
|
self.deactivate_peer(io, peer_id);
|
||||||
@ -856,18 +854,39 @@ impl ChainSync {
|
|||||||
fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) {
|
fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) {
|
||||||
match block_set {
|
match block_set {
|
||||||
BlockSet::NewBlocks => {
|
BlockSet::NewBlocks => {
|
||||||
if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == Err(DownloaderImportError::Invalid) {
|
if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == DownloadAction::Reset {
|
||||||
self.restart(io);
|
self.reset_downloads(block_set);
|
||||||
|
self.new_blocks.reset();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
BlockSet::OldBlocks => {
|
BlockSet::OldBlocks => {
|
||||||
if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) {
|
let mut is_complete = false;
|
||||||
self.restart(io);
|
let mut download_action = DownloadAction::None;
|
||||||
} else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) {
|
if let Some(downloader) = self.old_blocks.as_mut() {
|
||||||
|
download_action = downloader.collect_blocks(io, false);
|
||||||
|
is_complete = downloader.is_complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
if download_action == DownloadAction::Reset {
|
||||||
|
self.reset_downloads(block_set);
|
||||||
|
if let Some(downloader) = self.old_blocks.as_mut() {
|
||||||
|
downloader.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_complete {
|
||||||
trace!(target: "sync", "Background block download is complete");
|
trace!(target: "sync", "Background block download is complete");
|
||||||
self.old_blocks = None;
|
self.old_blocks = None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark all outstanding requests as expired
|
||||||
|
fn reset_downloads(&mut self, block_set: BlockSet) {
|
||||||
|
trace!(target: "sync", "Resetting downloads for {:?}", block_set);
|
||||||
|
for (_, ref mut p) in self.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) {
|
||||||
|
p.reset_asking();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user