Snapshot sync improvements (#2960)

* Status packet timeout

* Snapshot collection state

* Pause snapshot download

* Updated tests
This commit is contained in:
Arkadiy Paronyan 2016-10-29 13:07:06 +02:00 committed by Gav Wood
parent 290ed3343f
commit 9e82eeccfe
6 changed files with 174 additions and 56 deletions

View File

@ -58,7 +58,7 @@ impl Default for SyncConfig {
network_id: U256::from(1),
subprotocol_name: *b"eth",
fork_block: None,
warp_sync: true,
warp_sync: false,
}
}
}

View File

@ -123,6 +123,7 @@ const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
const MAX_TRANSACTION_SIZE: usize = 300*1024;
// Min number of blocks to be behind for a snapshot sync
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 100000;
const SNAPSHOT_MIN_PEERS: usize = 3;
const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
@ -147,21 +148,27 @@ const SNAPSHOT_DATA_PACKET: u8 = 0x14;
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15;
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 10f64;
const RECEIPTS_TIMEOUT_SEC: f64 = 10f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64;
const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 60f64;
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
const STATUS_TIMEOUT_SEC: u64 = 5;
const HEADERS_TIMEOUT_SEC: u64 = 15;
const BODIES_TIMEOUT_SEC: u64 = 10;
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 3;
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 60;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
pub enum SyncState {
/// Waiting for pv64 peers to start snapshot syncing
/// Collecting enough peers to start syncing.
WaitingPeers,
/// Waiting for snapshot manifest download
SnapshotManifest,
/// Downloading snapshot data
SnapshotData,
/// Waiting for snapshot restoration to complete
/// Waiting for snapshot restoration progress.
SnapshotWaiting,
/// Downloading new blocks
Blocks,
@ -276,7 +283,7 @@ struct PeerInfo {
/// Holds requested snapshot chunk hash if any.
asking_snapshot_data: Option<H256>,
/// Request timestamp
ask_time: f64,
ask_time: u64,
/// Holds a set of transactions recently sent to this peer to avoid spamming.
last_sent_transactions: HashSet<H256>,
/// Pending request is expired and result should be ignored
@ -324,10 +331,13 @@ pub struct ChainSync {
network_id: U256,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
/// Snapshot sync allowed.
snapshot_sync_enabled: bool,
/// Snapshot downloader.
snapshot: Snapshot,
/// Connected peers pending Status message.
/// Value is request timestamp.
handshaking_peers: HashMap<PeerId, u64>,
/// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<u64>,
}
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -337,20 +347,21 @@ impl ChainSync {
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain_info = chain.chain_info();
let mut sync = ChainSync {
state: SyncState::Idle,
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
starting_block: chain.chain_info().best_block_number,
highest_block: None,
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
active_peers: HashSet::new(),
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number),
old_blocks: None,
last_sent_block_number: 0,
network_id: config.network_id,
fork_block: config.fork_block,
snapshot_sync_enabled: config.warp_sync,
snapshot: Snapshot::new(),
sync_start_time: None,
};
sync.init_downloaders(chain);
sync.update_targets(chain);
sync
}
@ -442,20 +453,67 @@ impl ChainSync {
self.active_peers.remove(&peer_id);
}
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) {
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
if self.state != SyncState::WaitingPeers {
return;
}
let best_block = io.chain().chain_info().best_block_number;
let (best_hash, max_peers, snapshot_peers) = {
//collect snapshot infos from peers
let snapshots = self.peers.iter()
.filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn| best_block < sn && (sn - best_block) > SNAPSHOT_RESTORE_THRESHOLD))
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())));
let mut snapshot_peers = HashMap::new();
let mut max_peers: usize = 0;
let mut best_hash = None;
for (p, hash) in snapshots {
let peers = snapshot_peers.entry(hash).or_insert_with(Vec::new);
peers.push(*p);
if peers.len() > max_peers {
max_peers = peers.len();
best_hash = Some(hash);
}
}
(best_hash, max_peers, snapshot_peers)
};
let timeout = self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
if max_peers >= SNAPSHOT_MIN_PEERS {
trace!(target: "sync", "Starting confirmed snapshot sync {:?} with {:?}", hash, peers);
self.start_snapshot_sync(io, peers);
} else if timeout {
trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers);
self.start_snapshot_sync(io, peers);
}
} else if timeout {
trace!(target: "sync", "No snapshots found, starting full sync");
self.state = SyncState::Idle;
self.continue_sync(io);
}
}
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
self.snapshot.clear();
self.request_snapshot_manifest(io, peer_id);
for p in peers {
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
self.request_snapshot_manifest(io, *p);
}
}
self.state = SyncState::SnapshotManifest;
}
/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
pub fn restart(&mut self, io: &mut SyncIo) {
self.init_downloaders(io.chain());
self.update_targets(io.chain());
self.reset_and_continue(io);
}
/// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks
fn init_downloaders(&mut self, chain: &BlockChainClient) {
/// Update sync after the blockchain has been changed externally.
pub fn update_targets(&mut self, chain: &BlockChainClient) {
// Do not assume that the block queue/chain still has our last_imported_block
let chain = chain.chain_info();
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number);
@ -475,6 +533,7 @@ impl ChainSync {
/// Called by peer to report status
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
self.handshaking_peers.remove(&peer_id);
let protocol_version: u8 = try!(r.val_at(0));
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
let peer = PeerInfo {
@ -486,7 +545,7 @@ impl ChainSync {
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
ask_time: 0,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
@ -496,7 +555,12 @@ impl ChainSync {
block_set: None,
};
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if self.sync_start_time.is_none() {
self.sync_start_time = Some(time::precise_time_ns());
}
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
@ -578,7 +642,7 @@ impl ChainSync {
}
let item_count = r.item_count();
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, self.state, block_set);
if self.state == SyncState::Idle && self.old_blocks.is_none() {
if (self.state == SyncState::Idle || self.state == SyncState::WaitingPeers) && self.old_blocks.is_none() {
trace!(target: "sync", "Ignored unexpected block headers");
self.continue_sync(io);
return Ok(());
@ -875,7 +939,7 @@ impl ChainSync {
}
self.clear_peer_download(peer_id);
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
trace!(target: "sync", "{}: Ignored unexpected manifest", peer_id);
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
self.continue_sync(io);
return Ok(());
}
@ -918,7 +982,7 @@ impl ChainSync {
match io.snapshot_service().status() {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
self.state = SyncState::Idle;
self.state = SyncState::WaitingPeers;
self.snapshot.clear();
self.continue_sync(io);
return Ok(());
@ -960,6 +1024,7 @@ impl ChainSync {
/// Called by peer when it is disconnecting
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
self.handshaking_peers.remove(&peer);
if self.peers.contains_key(&peer) {
debug!(target: "sync", "Disconnected {}", peer);
self.clear_peer_download(peer);
@ -975,12 +1040,14 @@ impl ChainSync {
if let Err(e) = self.send_status(io, peer) {
debug!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer);
} else {
self.handshaking_peers.insert(peer, time::precise_time_ns());
}
}
/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting
if (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks || self.state == SyncState::Idle)
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) {
self.complete_sync(io);
}
@ -1040,11 +1107,9 @@ impl ChainSync {
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() {
match self.state {
SyncState::Idle if self.snapshot_sync_enabled
&& chain_info.best_block_number < peer_snapshot_number
&& (peer_snapshot_number - chain_info.best_block_number) > SNAPSHOT_RESTORE_THRESHOLD => {
trace!(target: "sync", "Starting snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
self.start_snapshot_sync(io, peer_id);
SyncState::WaitingPeers => {
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
self.maybe_start_snapshot_sync(io);
},
SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => {
if io.chain().queue_info().is_full() {
@ -1070,6 +1135,13 @@ impl ChainSync {
}
},
SyncState::SnapshotData => {
if let RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } = io.snapshot_service().status() {
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target: "sync", "Snapshot queue full, pausing sync");
self.state = SyncState::SnapshotWaiting;
return;
}
}
if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() {
self.request_snapshot_data(io, peer_id);
}
@ -1253,7 +1325,7 @@ impl ChainSync {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
}
peer.asking = asking;
peer.ask_time = time::precise_time_s();
peer.ask_time = time::precise_time_ns();
let result = if packet_id >= ETH_PACKET_COUNT {
sync.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
} else {
@ -1590,17 +1662,18 @@ impl ChainSync {
#[cfg_attr(feature="dev", allow(match_same_arms))]
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
let tick = time::precise_time_s();
let tick = time::precise_time_ns();
let mut aborting = Vec::new();
for (peer_id, peer) in &self.peers {
let elapsed = (tick - peer.ask_time) / 1_000_000_000;
let timeout = match peer.asking {
PeerAsking::BlockHeaders => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
PeerAsking::BlockReceipts => (tick - peer.ask_time) > RECEIPTS_TIMEOUT_SEC,
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT_SEC,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT_SEC,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
PeerAsking::SnapshotData => (tick - peer.ask_time) > SNAPSHOT_DATA_TIMEOUT_SEC,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT_SEC,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT_SEC,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
@ -1611,16 +1684,42 @@ impl ChainSync {
for p in aborting {
self.on_peer_aborting(io, p);
}
// Check for handshake timeouts
for (peer, ask_time) in &self.handshaking_peers {
let elapsed = (tick - ask_time) / 1_000_000_000;
if elapsed > STATUS_TIMEOUT_SEC {
trace!(target:"sync", "Status timeout {}", peer);
io.disconnect_peer(*peer);
}
}
}
fn check_resume(&mut self, io: &mut SyncIo) {
if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
self.state = SyncState::Blocks;
self.continue_sync(io);
} else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive {
trace!(target:"sync", "Snapshot restoration is complete");
self.restart(io);
self.continue_sync(io);
} else if self.state == SyncState::SnapshotWaiting {
match io.snapshot_service().status() {
RestorationStatus::Inactive => {
trace!(target:"sync", "Snapshot restoration is complete");
self.restart(io);
self.continue_sync(io);
},
RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } => {
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target:"sync", "Resuming snapshot sync");
self.state = SyncState::SnapshotData;
self.continue_sync(io);
}
},
RestorationStatus::Failed => {
trace!(target: "sync", "Snapshot restoration aborted");
self.state = SyncState::WaitingPeers;
self.snapshot.clear();
self.continue_sync(io);
},
}
}
}
@ -1828,6 +1927,7 @@ impl ChainSync {
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
self.maybe_start_snapshot_sync(io);
self.check_resume(io);
}
@ -2050,7 +2150,7 @@ mod tests {
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
ask_time: 0,
last_sent_transactions: HashSet::new(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,

View File

@ -113,7 +113,7 @@ impl Snapshot {
}
pub fn done_chunks(&self) -> usize {
self.total_chunks() - self.completed_chunks.len()
self.completed_chunks.len()
}
pub fn is_complete(&self) -> bool {
@ -165,6 +165,7 @@ mod test {
let mut snapshot = Snapshot::new();
let (manifest, mhash, state_chunks, block_chunks) = test_manifest();
snapshot.reset_to(&manifest, &mhash);
assert_eq!(snapshot.done_chunks(), 0);
assert!(snapshot.validate_chunk(&H256::random().to_vec()).is_err());
let requested: Vec<H256> = (0..40).map(|_| snapshot.needed_chunk().unwrap()).collect();
@ -194,6 +195,8 @@ mod test {
}
assert!(snapshot.is_complete());
assert_eq!(snapshot.done_chunks(), 40);
assert_eq!(snapshot.done_chunks(), snapshot.total_chunks());
assert_eq!(snapshot.snapshot_hash(), Some(manifest.into_rlp().sha3()));
}
}

View File

@ -18,6 +18,7 @@ use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockID, EachBlockWith};
use chain::{SyncState};
use super::helpers::*;
use SyncConfig;
#[test]
fn two_peers() {
@ -156,6 +157,10 @@ fn restart() {
fn status_empty() {
let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle);
let mut config = SyncConfig::default();
config.warp_sync = true;
let net = TestNet::new_with_config(2, config);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::WaitingPeers);
}
#[test]

View File

@ -127,20 +127,24 @@ pub struct TestNet {
impl TestNet {
pub fn new(n: usize) -> TestNet {
Self::new_with_fork(n, None)
Self::new_with_config(n, SyncConfig::default())
}
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet {
let mut config = SyncConfig::default();
config.fork_block = fork;
Self::new_with_config(n, config)
}
pub fn new_with_config(n: usize, config: SyncConfig) -> TestNet {
let mut net = TestNet {
peers: Vec::new(),
started: false,
};
for _ in 0..n {
let chain = TestBlockChainClient::new();
let mut config = SyncConfig::default();
config.fork_block = fork;
let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config, &chain);
let sync = ChainSync::new(config.clone(), &chain);
net.peers.push(TestPeer {
sync: RwLock::new(sync),
snapshot_service: ss,
@ -164,7 +168,7 @@ impl TestNet {
for client in 0..self.peers.len() {
if peer != client {
let mut p = self.peers.get_mut(peer).unwrap();
p.sync.write().restart(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)));
p.sync.write().update_targets(&mut p.chain);
p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)), client as PeerId);
}
}

View File

@ -19,6 +19,7 @@ use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus};
use ethcore::header::BlockNumber;
use ethcore::client::{EachBlockWith};
use super::helpers::*;
use SyncConfig;
pub struct TestSnapshotService {
manifest: Option<ManifestData>,
@ -122,11 +123,16 @@ impl SnapshotService for TestSnapshotService {
#[test]
fn snapshot_sync() {
::env_logger::init().ok();
let mut net = TestNet::new(2);
net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000));
net.peer_mut(0).chain.add_blocks(1, EachBlockWith::Nothing);
net.sync_steps(19); // status + manifest + chunks
assert_eq!(net.peer(1).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len());
assert_eq!(net.peer(1).snapshot_service.block_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().block_hashes.len());
let mut config = SyncConfig::default();
config.warp_sync = true;
let mut net = TestNet::new_with_config(5, config);
let snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000));
for i in 0..4 {
net.peer_mut(i).snapshot_service = snapshot_service.clone();
net.peer_mut(i).chain.add_blocks(1, EachBlockWith::Nothing);
}
net.sync_steps(50);
assert_eq!(net.peer(4).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len());
assert_eq!(net.peer(4).snapshot_service.block_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().block_hashes.len());
}