Backports for stable (#6116)
* remove chunk to restore from pending set only upon successful import * blacklist bad manifest hashes upon failure * more checks before snapshot syncing * Reverted tests * revert submodule change
This commit is contained in:
parent
f48058725c
commit
1a5b17626c
@ -46,6 +46,9 @@ struct Guard(bool, PathBuf);
|
|||||||
impl Guard {
|
impl Guard {
|
||||||
fn new(path: PathBuf) -> Self { Guard(true, path) }
|
fn new(path: PathBuf) -> Self { Guard(true, path) }
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
fn benign() -> Self { Guard(false, PathBuf::default()) }
|
||||||
|
|
||||||
fn disarm(mut self) { self.0 = false }
|
fn disarm(mut self) { self.0 = false }
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,7 +123,7 @@ impl Restoration {
|
|||||||
|
|
||||||
// feeds a state chunk, aborts early if `flag` becomes false.
|
// feeds a state chunk, aborts early if `flag` becomes false.
|
||||||
fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> {
|
fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> {
|
||||||
if self.state_chunks_left.remove(&hash) {
|
if self.state_chunks_left.contains(&hash) {
|
||||||
let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?;
|
let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?;
|
||||||
|
|
||||||
self.state.feed(&self.snappy_buffer[..len], flag)?;
|
self.state.feed(&self.snappy_buffer[..len], flag)?;
|
||||||
@ -128,6 +131,8 @@ impl Restoration {
|
|||||||
if let Some(ref mut writer) = self.writer.as_mut() {
|
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||||
writer.write_state_chunk(hash, chunk)?;
|
writer.write_state_chunk(hash, chunk)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.state_chunks_left.remove(&hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -135,13 +140,15 @@ impl Restoration {
|
|||||||
|
|
||||||
// feeds a block chunk
|
// feeds a block chunk
|
||||||
fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine, flag: &AtomicBool) -> Result<(), Error> {
|
fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine, flag: &AtomicBool) -> Result<(), Error> {
|
||||||
if self.block_chunks_left.remove(&hash) {
|
if self.block_chunks_left.contains(&hash) {
|
||||||
let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?;
|
let len = snappy::decompress_into(chunk, &mut self.snappy_buffer)?;
|
||||||
|
|
||||||
self.blocks.feed(&self.snappy_buffer[..len], engine, flag)?;
|
self.blocks.feed(&self.snappy_buffer[..len], engine, flag)?;
|
||||||
if let Some(ref mut writer) = self.writer.as_mut() {
|
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||||
writer.write_block_chunk(hash, chunk)?;
|
writer.write_block_chunk(hash, chunk)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.block_chunks_left.remove(&hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -669,4 +676,49 @@ mod tests {
|
|||||||
service.restore_state_chunk(Default::default(), vec![]);
|
service.restore_state_chunk(Default::default(), vec![]);
|
||||||
service.restore_block_chunk(Default::default(), vec![]);
|
service.restore_block_chunk(Default::default(), vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cannot_finish_with_invalid_chunks() {
|
||||||
|
use util::{H256, FixedHash};
|
||||||
|
use util::kvdb::DatabaseConfig;
|
||||||
|
|
||||||
|
let spec = get_test_spec();
|
||||||
|
let dir = RandomTempPath::new();
|
||||||
|
|
||||||
|
let state_hashes: Vec<_> = (0..5).map(|_| H256::random()).collect();
|
||||||
|
let block_hashes: Vec<_> = (0..5).map(|_| H256::random()).collect();
|
||||||
|
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||||
|
let gb = spec.genesis_block();
|
||||||
|
let flag = ::std::sync::atomic::AtomicBool::new(true);
|
||||||
|
|
||||||
|
let params = RestorationParams {
|
||||||
|
manifest: ManifestData {
|
||||||
|
version: 2,
|
||||||
|
state_hashes: state_hashes.clone(),
|
||||||
|
block_hashes: block_hashes.clone(),
|
||||||
|
state_root: H256::default(),
|
||||||
|
block_number: 100000,
|
||||||
|
block_hash: H256::default(),
|
||||||
|
},
|
||||||
|
pruning: Algorithm::Archive,
|
||||||
|
db_path: dir.as_path().to_owned(),
|
||||||
|
db_config: &db_config,
|
||||||
|
writer: None,
|
||||||
|
genesis: &gb,
|
||||||
|
guard: Guard::benign(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut restoration = Restoration::new(params).unwrap();
|
||||||
|
let definitely_bad_chunk = [1, 2, 3, 4, 5];
|
||||||
|
|
||||||
|
for hash in state_hashes {
|
||||||
|
assert!(restoration.feed_state(hash, &definitely_bad_chunk, &flag).is_err());
|
||||||
|
assert!(!restoration.is_done());
|
||||||
|
}
|
||||||
|
|
||||||
|
for hash in block_hashes {
|
||||||
|
assert!(restoration.feed_blocks(hash, &definitely_bad_chunk, &*spec.engine, &flag).is_err());
|
||||||
|
assert!(!restoration.is_done());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,6 +159,7 @@ pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16;
|
|||||||
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
||||||
|
|
||||||
const MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 1;
|
const MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 1;
|
||||||
|
const MAX_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 2;
|
||||||
|
|
||||||
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
|
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
|
||||||
const STATUS_TIMEOUT_SEC: u64 = 5;
|
const STATUS_TIMEOUT_SEC: u64 = 5;
|
||||||
@ -523,7 +524,8 @@ impl ChainSync {
|
|||||||
sn > fork_block &&
|
sn > fork_block &&
|
||||||
self.highest_block.map_or(true, |highest| highest >= sn && (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD)
|
self.highest_block.map_or(true, |highest| highest >= sn && (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD)
|
||||||
))
|
))
|
||||||
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())));
|
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())))
|
||||||
|
.filter(|&(_, ref hash)| !self.snapshot.is_known_bad(hash));
|
||||||
|
|
||||||
let mut snapshot_peers = HashMap::new();
|
let mut snapshot_peers = HashMap::new();
|
||||||
let mut max_peers: usize = 0;
|
let mut max_peers: usize = 0;
|
||||||
@ -1020,6 +1022,7 @@ impl ChainSync {
|
|||||||
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
|
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.clear_peer_download(peer_id);
|
self.clear_peer_download(peer_id);
|
||||||
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
|
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
|
||||||
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
|
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
|
||||||
@ -1037,13 +1040,32 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
Ok(manifest) => manifest,
|
Ok(manifest) => manifest,
|
||||||
};
|
};
|
||||||
if manifest.version < MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION {
|
|
||||||
trace!(target: "sync", "{}: Snapshot manifest version too low: {}", peer_id, manifest.version);
|
let manifest_hash = manifest_rlp.as_raw().sha3();
|
||||||
|
let is_usable_version = manifest.version >= MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION
|
||||||
|
&& manifest.version <= MAX_SUPPORTED_SNAPSHOT_MANIFEST_VERSION;
|
||||||
|
|
||||||
|
if !self.peers.get(&peer_id).map_or(false, |peer| peer.snapshot_hash == Some(manifest_hash)) {
|
||||||
|
trace!(target: "sync", "{}: Snapshot manifest hash {} mismatched with advertised", peer_id, manifest_hash);
|
||||||
io.disable_peer(peer_id);
|
io.disable_peer(peer_id);
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if !is_usable_version {
|
||||||
|
trace!(target: "sync", "{}: Snapshot manifest version incompatible: {}", peer_id, manifest.version);
|
||||||
|
self.snapshot.note_bad(manifest_hash);
|
||||||
|
|
||||||
|
// temporarily disable the peer while we tune our peer set to those
|
||||||
|
// with usable snapshots. we don't try and download any rejected manifest
|
||||||
|
// again, so when we reconnect we can still full sync.
|
||||||
|
io.disable_peer(peer_id);;
|
||||||
|
self.continue_sync(io);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
self.snapshot.reset_to(&manifest, &manifest_rlp.as_raw().sha3());
|
|
||||||
|
self.snapshot.reset_to(&manifest, &manifest_hash);
|
||||||
io.snapshot_service().begin_restore(manifest);
|
io.snapshot_service().begin_restore(manifest);
|
||||||
self.state = SyncState::SnapshotData;
|
self.state = SyncState::SnapshotData;
|
||||||
|
|
||||||
@ -1068,10 +1090,18 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check service status
|
// check service status
|
||||||
match io.snapshot_service().status() {
|
let status = io.snapshot_service().status();
|
||||||
|
match status {
|
||||||
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
||||||
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
|
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
|
||||||
self.state = SyncState::WaitingPeers;
|
self.state = SyncState::WaitingPeers;
|
||||||
|
|
||||||
|
// only note bad if restoration failed.
|
||||||
|
if let (Some(hash), RestorationStatus::Failed) = (self.snapshot.snapshot_hash(), status) {
|
||||||
|
trace!(target: "sync", "Noting snapshot hash {} as bad", hash);
|
||||||
|
self.snapshot.note_bad(hash);
|
||||||
|
}
|
||||||
|
|
||||||
self.snapshot.clear();
|
self.snapshot.clear();
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -31,6 +31,7 @@ pub struct Snapshot {
|
|||||||
downloading_chunks: HashSet<H256>,
|
downloading_chunks: HashSet<H256>,
|
||||||
completed_chunks: HashSet<H256>,
|
completed_chunks: HashSet<H256>,
|
||||||
snapshot_hash: Option<H256>,
|
snapshot_hash: Option<H256>,
|
||||||
|
bad_hashes: HashSet<H256>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Snapshot {
|
impl Snapshot {
|
||||||
@ -42,6 +43,7 @@ impl Snapshot {
|
|||||||
downloading_chunks: HashSet::new(),
|
downloading_chunks: HashSet::new(),
|
||||||
completed_chunks: HashSet::new(),
|
completed_chunks: HashSet::new(),
|
||||||
snapshot_hash: None,
|
snapshot_hash: None,
|
||||||
|
bad_hashes: HashSet::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,6 +106,16 @@ impl Snapshot {
|
|||||||
self.downloading_chunks.remove(hash);
|
self.downloading_chunks.remove(hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// note snapshot hash as bad.
|
||||||
|
pub fn note_bad(&mut self, hash: H256) {
|
||||||
|
self.bad_hashes.insert(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
// whether snapshot hash is known to be bad.
|
||||||
|
pub fn is_known_bad(&self, hash: &H256) -> bool {
|
||||||
|
self.bad_hashes.contains(hash)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn snapshot_hash(&self) -> Option<H256> {
|
pub fn snapshot_hash(&self) -> Option<H256> {
|
||||||
self.snapshot_hash
|
self.snapshot_hash
|
||||||
}
|
}
|
||||||
@ -200,5 +212,15 @@ mod test {
|
|||||||
assert_eq!(snapshot.done_chunks(), snapshot.total_chunks());
|
assert_eq!(snapshot.done_chunks(), snapshot.total_chunks());
|
||||||
assert_eq!(snapshot.snapshot_hash(), Some(manifest.into_rlp().sha3()));
|
assert_eq!(snapshot.snapshot_hash(), Some(manifest.into_rlp().sha3()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tracks_known_bad() {
|
||||||
|
let mut snapshot = Snapshot::new();
|
||||||
|
let hash = H256::random();
|
||||||
|
|
||||||
|
assert_eq!(snapshot.is_known_bad(&hash), false);
|
||||||
|
snapshot.note_bad(hash);
|
||||||
|
assert_eq!(snapshot.is_known_bad(&hash), true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user