diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 06e659bc1..f6951fed5 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -46,6 +46,9 @@ struct Guard(bool, PathBuf); impl Guard { fn new(path: PathBuf) -> Self { Guard(true, path) } + #[cfg(test)] + fn benign() -> Self { Guard(false, PathBuf::default()) } + fn disarm(mut self) { self.0 = false } } @@ -120,7 +123,7 @@ impl Restoration { // feeds a state chunk, aborts early if `flag` becomes false. fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> { - if self.state_chunks_left.remove(&hash) { + if self.state_chunks_left.contains(&hash) { let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?; self.state.feed(&self.snappy_buffer[..len], flag)?; @@ -128,6 +131,8 @@ impl Restoration { if let Some(ref mut writer) = self.writer.as_mut() { writer.write_state_chunk(hash, chunk)?; } + + self.state_chunks_left.remove(&hash); } Ok(()) @@ -135,13 +140,15 @@ impl Restoration { // feeds a block chunk fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine, flag: &AtomicBool) -> Result<(), Error> { - if self.block_chunks_left.remove(&hash) { + if self.block_chunks_left.contains(&hash) { let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?; self.blocks.feed(&self.snappy_buffer[..len], engine, flag)?; if let Some(ref mut writer) = self.writer.as_mut() { writer.write_block_chunk(hash, chunk)?; } + + self.block_chunks_left.remove(&hash); } Ok(()) @@ -669,4 +676,49 @@ mod tests { service.restore_state_chunk(Default::default(), vec![]); service.restore_block_chunk(Default::default(), vec![]); } + + #[test] + fn cannot_finish_with_invalid_chunks() { + use util::{H256, FixedHash}; + use util::kvdb::DatabaseConfig; + + let spec = get_test_spec(); + let dir = RandomTempPath::new(); + + let state_hashes: Vec<_> = (0..5).map(|_| H256::random()).collect(); + let block_hashes: Vec<_> = (0..5).map(|_| H256::random()).collect(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let gb = spec.genesis_block(); + let flag = ::std::sync::atomic::AtomicBool::new(true); + + let params = RestorationParams { + manifest: ManifestData { + version: 2, + state_hashes: state_hashes.clone(), + block_hashes: block_hashes.clone(), + state_root: H256::default(), + block_number: 100000, + block_hash: H256::default(), + }, + pruning: Algorithm::Archive, + db_path: dir.as_path().to_owned(), + db_config: &db_config, + writer: None, + genesis: &gb, + guard: Guard::benign(), + }; + + let mut restoration = Restoration::new(params).unwrap(); + let definitely_bad_chunk = [1, 2, 3, 4, 5]; + + for hash in state_hashes { + assert!(restoration.feed_state(hash, &definitely_bad_chunk, &flag).is_err()); + assert!(!restoration.is_done()); + } + + for hash in block_hashes { + assert!(restoration.feed_blocks(hash, &definitely_bad_chunk, &*spec.engine, &flag).is_err()); + assert!(!restoration.is_done()); + } + } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6aca99fc5..0e44e2c6c 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -159,6 +159,7 @@ pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16; const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; const MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 1; +const MAX_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 2; const WAIT_PEERS_TIMEOUT_SEC: u64 = 5; const STATUS_TIMEOUT_SEC: u64 = 5; @@ -523,7 +524,8 @@ impl ChainSync { sn > fork_block && self.highest_block.map_or(true, |highest| highest >= sn && (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD) )) - .filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone()))); + .filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone()))) + .filter(|&(_, ref hash)| !self.snapshot.is_known_bad(hash)); let mut snapshot_peers = HashMap::new(); let mut max_peers: usize = 0; @@ -1020,6 +1022,7 @@ impl ChainSync { trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id); return Ok(()); } + self.clear_peer_download(peer_id); if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest { trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id); @@ -1037,13 +1040,32 @@ impl ChainSync { } Ok(manifest) => manifest, }; - if manifest.version < MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION { - trace!(target: "sync", "{}: Snapshot manifest version too low: {}", peer_id, manifest.version); + + let manifest_hash = manifest_rlp.as_raw().sha3(); + let is_usable_version = manifest.version >= MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION + && manifest.version <= MAX_SUPPORTED_SNAPSHOT_MANIFEST_VERSION; + + if !self.peers.get(&peer_id).map_or(false, |peer| peer.snapshot_hash == Some(manifest_hash)) { + trace!(target: "sync", "{}: Snapshot manifest hash {} mismatched with advertised", peer_id, manifest_hash); io.disable_peer(peer_id); self.continue_sync(io); + + return Ok(()); + } + + if !is_usable_version { + trace!(target: "sync", "{}: Snapshot manifest version incompatible: {}", peer_id, manifest.version); + self.snapshot.note_bad(manifest_hash); + + // temporarily disable the peer while we tune our peer set to those + // with usable snapshots. we don't try and download any rejected manifest + // again, so when we reconnect we can still full sync. + io.disable_peer(peer_id);; + self.continue_sync(io); return Ok(()); } - self.snapshot.reset_to(&manifest, &manifest_rlp.as_raw().sha3()); + + self.snapshot.reset_to(&manifest, &manifest_hash); io.snapshot_service().begin_restore(manifest); self.state = SyncState::SnapshotData; @@ -1068,10 +1090,18 @@ impl ChainSync { } // check service status - match io.snapshot_service().status() { + let status = io.snapshot_service().status(); + match status { RestorationStatus::Inactive | RestorationStatus::Failed => { trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id); self.state = SyncState::WaitingPeers; + + // only note bad if restoration failed. + if let (Some(hash), RestorationStatus::Failed) = (self.snapshot.snapshot_hash(), status) { + trace!(target: "sync", "Noting snapshot hash {} as bad", hash); + self.snapshot.note_bad(hash); + } + self.snapshot.clear(); self.continue_sync(io); return Ok(()); diff --git a/sync/src/snapshot.rs b/sync/src/snapshot.rs index a585520d5..fee5d7b4a 100644 --- a/sync/src/snapshot.rs +++ b/sync/src/snapshot.rs @@ -31,6 +31,7 @@ pub struct Snapshot { downloading_chunks: HashSet, completed_chunks: HashSet, snapshot_hash: Option, + bad_hashes: HashSet, } impl Snapshot { @@ -42,6 +43,7 @@ impl Snapshot { downloading_chunks: HashSet::new(), completed_chunks: HashSet::new(), snapshot_hash: None, + bad_hashes: HashSet::new(), } } @@ -104,6 +106,16 @@ impl Snapshot { self.downloading_chunks.remove(hash); } + // note snapshot hash as bad. + pub fn note_bad(&mut self, hash: H256) { + self.bad_hashes.insert(hash); + } + + // whether snapshot hash is known to be bad. + pub fn is_known_bad(&self, hash: &H256) -> bool { + self.bad_hashes.contains(hash) + } + pub fn snapshot_hash(&self) -> Option { self.snapshot_hash } @@ -200,5 +212,15 @@ mod test { assert_eq!(snapshot.done_chunks(), snapshot.total_chunks()); assert_eq!(snapshot.snapshot_hash(), Some(manifest.into_rlp().sha3())); } + + #[test] + fn tracks_known_bad() { + let mut snapshot = Snapshot::new(); + let hash = H256::random(); + + assert_eq!(snapshot.is_known_bad(&hash), false); + snapshot.note_bad(hash); + assert_eq!(snapshot.is_known_bad(&hash), true); + } }