Resilient warp sync (#5018)
This commit is contained in:
parent
9efab789aa
commit
3b54b49b0b
@ -164,8 +164,8 @@ const HEADERS_TIMEOUT_SEC: u64 = 15;
|
|||||||
const BODIES_TIMEOUT_SEC: u64 = 10;
|
const BODIES_TIMEOUT_SEC: u64 = 10;
|
||||||
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
|
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
|
||||||
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
|
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
|
||||||
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 3;
|
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 5;
|
||||||
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 60;
|
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 120;
|
||||||
|
|
||||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||||
/// Sync state
|
/// Sync state
|
||||||
@ -463,12 +463,7 @@ impl ChainSync {
|
|||||||
/// Reset sync. Clear all downloaded data but keep the queue
|
/// Reset sync. Clear all downloaded data but keep the queue
|
||||||
fn reset(&mut self, io: &mut SyncIo) {
|
fn reset(&mut self, io: &mut SyncIo) {
|
||||||
self.new_blocks.reset();
|
self.new_blocks.reset();
|
||||||
self.snapshot.clear();
|
|
||||||
let chain_info = io.chain().chain_info();
|
let chain_info = io.chain().chain_info();
|
||||||
if self.state == SyncState::SnapshotData {
|
|
||||||
debug!(target:"sync", "Aborting snapshot restore");
|
|
||||||
io.snapshot_service().abort_restore();
|
|
||||||
}
|
|
||||||
for (_, ref mut p) in &mut self.peers {
|
for (_, ref mut p) in &mut self.peers {
|
||||||
if p.block_set != Some(BlockSet::OldBlocks) {
|
if p.block_set != Some(BlockSet::OldBlocks) {
|
||||||
p.reset_asking();
|
p.reset_asking();
|
||||||
@ -487,6 +482,11 @@ impl ChainSync {
|
|||||||
/// Restart sync
|
/// Restart sync
|
||||||
pub fn reset_and_continue(&mut self, io: &mut SyncIo) {
|
pub fn reset_and_continue(&mut self, io: &mut SyncIo) {
|
||||||
trace!(target: "sync", "Restarting");
|
trace!(target: "sync", "Restarting");
|
||||||
|
if self.state == SyncState::SnapshotData {
|
||||||
|
debug!(target:"sync", "Aborting snapshot restore");
|
||||||
|
io.snapshot_service().abort_restore();
|
||||||
|
}
|
||||||
|
self.snapshot.clear();
|
||||||
self.reset(io);
|
self.reset(io);
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
}
|
}
|
||||||
@ -499,7 +499,7 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
|
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
|
||||||
if self.state != SyncState::WaitingPeers {
|
if self.state != SyncState::WaitingPeers && self.state != SyncState::Blocks && self.state != SyncState::Waiting {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Make sure the snapshot block is not too far away from best block and network best block and
|
// Make sure the snapshot block is not too far away from best block and network best block and
|
||||||
@ -531,7 +531,7 @@ impl ChainSync {
|
|||||||
(best_hash, max_peers, snapshot_peers)
|
(best_hash, max_peers, snapshot_peers)
|
||||||
};
|
};
|
||||||
|
|
||||||
let timeout = self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);
|
let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);
|
||||||
|
|
||||||
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
|
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
|
||||||
if max_peers >= SNAPSHOT_MIN_PEERS {
|
if max_peers >= SNAPSHOT_MIN_PEERS {
|
||||||
@ -549,13 +549,18 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
|
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
|
||||||
self.snapshot.clear();
|
if !self.snapshot.have_manifest() {
|
||||||
for p in peers {
|
for p in peers {
|
||||||
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
|
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
|
||||||
self.request_snapshot_manifest(io, *p);
|
self.request_snapshot_manifest(io, *p);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
self.state = SyncState::SnapshotManifest;
|
||||||
|
trace!(target: "sync", "New snapshot sync with {:?}", peers);
|
||||||
|
} else {
|
||||||
|
self.state = SyncState::SnapshotData;
|
||||||
|
trace!(target: "sync", "Resumed snapshot sync with {:?}", peers);
|
||||||
}
|
}
|
||||||
self.state = SyncState::SnapshotManifest;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
|
/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
|
||||||
|
@ -54,6 +54,11 @@ impl Snapshot {
|
|||||||
self.snapshot_hash = None;
|
self.snapshot_hash = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if currently downloading a snapshot.
|
||||||
|
pub fn have_manifest(&self) -> bool {
|
||||||
|
self.snapshot_hash.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
/// Reset collection for a manifest RLP
|
/// Reset collection for a manifest RLP
|
||||||
pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) {
|
pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) {
|
||||||
self.clear();
|
self.clear();
|
||||||
|
@ -35,8 +35,8 @@ use stats::NetworkStats;
|
|||||||
use time;
|
use time;
|
||||||
|
|
||||||
// Timeout must be less than (interval - 1).
|
// Timeout must be less than (interval - 1).
|
||||||
const PING_TIMEOUT_SEC: u64 = 15;
|
const PING_TIMEOUT_SEC: u64 = 60;
|
||||||
const PING_INTERVAL_SEC: u64 = 30;
|
const PING_INTERVAL_SEC: u64 = 120;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
enum ProtocolState {
|
enum ProtocolState {
|
||||||
|
Loading…
Reference in New Issue
Block a user