From 37f49aac1bfc46e38c8e2b83bd9044bc110d0a97 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 13 Nov 2016 13:52:53 +0100 Subject: [PATCH] abort snapshot restoration faster (#3356) * abort snapshot restoration faster * flag-checking tests --- ethcore/src/snapshot/error.rs | 3 ++ ethcore/src/snapshot/mod.rs | 43 +++++++++++---------- ethcore/src/snapshot/service.rs | 19 +++++---- ethcore/src/snapshot/tests/blocks.rs | 50 +++++++++++++++++++++++- ethcore/src/snapshot/tests/state.rs | 58 +++++++++++++++++++++++++++- sync/src/api.rs | 3 +- 6 files changed, 144 insertions(+), 32 deletions(-) diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index d634057dc..d417695f0 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -45,6 +45,8 @@ pub enum Error { MissingCode(Vec), /// Unrecognized code encoding. UnrecognizedCodeState(u8), + /// Restoration aborted. + RestorationAborted, /// Trie error. Trie(TrieError), /// Decoder error. @@ -67,6 +69,7 @@ impl fmt::Display for Error { a pruned database. Please re-run with the --pruning archive flag."), Error::MissingCode(ref missing) => write!(f, "Incomplete snapshot: {} contract codes not found.", missing.len()), Error::UnrecognizedCodeState(state) => write!(f, "Unrecognized code encoding ({})", state), + Error::RestorationAborted => write!(f, "Snapshot restoration aborted."), Error::Io(ref err) => err.fmt(f), Error::Decoder(ref err) => err.fmt(f), Error::Trie(ref err) => err.fmt(f), diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 22c44ba3b..f4d791593 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -407,30 +407,28 @@ impl StateRebuilder { } /// Feed an uncompressed state chunk into the rebuilder. - pub fn feed(&mut self, chunk: &[u8]) -> Result<(), ::error::Error> { + pub fn feed(&mut self, chunk: &[u8], flag: &AtomicBool) -> Result<(), ::error::Error> { let rlp = UntrustedRlp::new(chunk); let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp(); - let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); let mut pairs = Vec::with_capacity(rlp.item_count()); // initialize the pairs vector with empty values so we have slots to write into. pairs.resize(rlp.item_count(), (H256::new(), Vec::new())); - let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1; + let status = try!(rebuild_accounts( + self.db.as_hashdb_mut(), + rlp, + &mut pairs, + &self.code_map, + flag + )); - // new code contained within this chunk. - let mut chunk_code = HashMap::new(); - - for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) { - let code_map = &self.code_map; - let status = try!(rebuild_accounts(self.db.as_hashdb_mut(), account_chunk, out_pairs_chunk, code_map)); - chunk_code.extend(status.new_code); - for (addr_hash, code_hash) in status.missing_code { - self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash); - } + for (addr_hash, code_hash) in status.missing_code { + self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash); } + // patch up all missing code. must be done after collecting all new missing code entries. - for (code_hash, code) in chunk_code { + for (code_hash, code) in status.new_code { for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) { let mut db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), addr_hash); db.emplace(code_hash, DBValue::from_slice(&code)); @@ -450,6 +448,8 @@ impl StateRebuilder { }; for (hash, thin_rlp) in pairs { + if !flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } + if &thin_rlp[..] != &empty_rlp[..] { self.bloom.set(&*hash); } @@ -487,17 +487,18 @@ struct RebuiltStatus { } // rebuild a set of accounts and their storage. -// returns +// returns a status detailing newly-loaded code and accounts missing code. fn rebuild_accounts( db: &mut HashDB, - account_chunk: &[&[u8]], + account_fat_rlps: UntrustedRlp, out_chunk: &mut [(H256, Bytes)], - code_map: &HashMap + code_map: &HashMap, + abort_flag: &AtomicBool ) -> Result { let mut status = RebuiltStatus::default(); - for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) { - let account_rlp = UntrustedRlp::new(account_pair); + for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk) { + if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } let hash: H256 = try!(account_rlp.val_at(0)); let fat_rlp = try!(account_rlp.at(1)); @@ -580,7 +581,7 @@ impl BlockRebuilder { /// Feed the rebuilder an uncompressed block chunk. /// Returns the number of blocks fed or any errors. - pub fn feed(&mut self, chunk: &[u8], engine: &Engine) -> Result { + pub fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result { use basic_types::Seal::With; use util::U256; use util::triehash::ordered_trie_root; @@ -601,6 +602,8 @@ impl BlockRebuilder { let parent_total_difficulty = try!(rlp.val_at::(2)); for idx in 3..item_count { + if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) } + let pair = try!(rlp.at(idx)); let abridged_rlp = try!(pair.at(0)).as_raw().to_owned(); let abridged_block = AbridgedBlock::from_raw(abridged_rlp); diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 16f7c6ec6..c0d34a6a9 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -118,12 +118,12 @@ impl Restoration { }) } - // feeds a state chunk - fn feed_state(&mut self, hash: H256, chunk: &[u8]) -> Result<(), Error> { + // feeds a state chunk, aborts early if `flag` becomes false. + fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> { if self.state_chunks_left.remove(&hash) { let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); - try!(self.state.feed(&self.snappy_buffer[..len])); + try!(self.state.feed(&self.snappy_buffer[..len], flag)); if let Some(ref mut writer) = self.writer.as_mut() { try!(writer.write_state_chunk(hash, chunk)); @@ -134,11 +134,11 @@ impl Restoration { } // feeds a block chunk - fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine) -> Result<(), Error> { + fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine, flag: &AtomicBool) -> Result<(), Error> { if self.block_chunks_left.remove(&hash) { let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); - try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); + try!(self.blocks.feed(&self.snappy_buffer[..len], engine, flag)); if let Some(ref mut writer) = self.writer.as_mut() { try!(writer.write_block_chunk(hash, chunk)); } @@ -224,6 +224,7 @@ pub struct Service { db_restore: Arc, progress: super::Progress, taking_snapshot: AtomicBool, + restoring_snapshot: AtomicBool, } impl Service { @@ -244,6 +245,7 @@ impl Service { db_restore: params.db_restore, progress: Default::default(), taking_snapshot: AtomicBool::new(false), + restoring_snapshot: AtomicBool::new(false), }; // create the root snapshot dir if it doesn't exist. @@ -436,6 +438,8 @@ impl Service { state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, }; + + self.restoring_snapshot.store(true, Ordering::SeqCst); Ok(()) } @@ -490,8 +494,8 @@ impl Service { }; (match is_state { - true => rest.feed_state(hash, chunk), - false => rest.feed_blocks(hash, chunk, &*self.engine), + true => rest.feed_state(hash, chunk, &self.restoring_snapshot), + false => rest.feed_blocks(hash, chunk, &*self.engine, &self.restoring_snapshot), }.map(|_| rest.is_done()), rest.db.clone()) }; @@ -573,6 +577,7 @@ impl SnapshotService for Service { } fn abort_restore(&self) { + self.restoring_snapshot.store(false, Ordering::SeqCst); *self.restoration.lock() = None; *self.status.lock() = RestorationStatus::Inactive; } diff --git a/ethcore/src/snapshot/tests/blocks.rs b/ethcore/src/snapshot/tests/blocks.rs index 12efcda77..18637bad1 100644 --- a/ethcore/src/snapshot/tests/blocks.rs +++ b/ethcore/src/snapshot/tests/blocks.rs @@ -17,10 +17,11 @@ //! Block chunker and rebuilder tests. use devtools::RandomTempPath; +use error::Error; use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer}; use blockchain::BlockChain; -use snapshot::{chunk_blocks, BlockRebuilder, Progress}; +use snapshot::{chunk_blocks, BlockRebuilder, Error as SnapshotError, Progress}; use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use util::{Mutex, snappy}; @@ -28,6 +29,7 @@ use util::kvdb::{Database, DatabaseConfig}; use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::AtomicBool; fn chunk_and_restore(amount: u64) { let mut canon_chain = ChainGenerator::default(); @@ -75,10 +77,11 @@ fn chunk_and_restore(amount: u64) { let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), &manifest).unwrap(); let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); let engine = ::engines::NullEngine::new(Default::default(), Default::default()); + let flag = AtomicBool::new(true); for chunk_hash in &reader.manifest().block_hashes { let compressed = reader.chunk(*chunk_hash).unwrap(); let chunk = snappy::decompress(&compressed).unwrap(); - rebuilder.feed(&chunk, &engine).unwrap(); + rebuilder.feed(&chunk, &engine, &flag).unwrap(); } rebuilder.finalize(HashMap::new()).unwrap(); @@ -93,3 +96,46 @@ fn chunk_and_restore_500() { chunk_and_restore(500) } #[test] fn chunk_and_restore_40k() { chunk_and_restore(40000) } + +#[test] +fn checks_flag() { + use ::rlp::{RlpStream, Stream}; + use util::H256; + + let mut stream = RlpStream::new_list(5); + + stream.append(&100u64) + .append(&H256::default()) + .append(&(!0u64)); + + stream.append_empty_data().append_empty_data(); + + let genesis = { + let mut canon_chain = ChainGenerator::default(); + let mut finalizer = BlockFinalizer::default(); + canon_chain.generate(&mut finalizer).unwrap() + }; + + let chunk = stream.out(); + let path = RandomTempPath::create_dir(); + + let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let db = Arc::new(Database::open(&db_cfg, path.as_str()).unwrap()); + let chain = BlockChain::new(Default::default(), &genesis, db.clone()); + let engine = ::engines::NullEngine::new(Default::default(), Default::default()); + + let manifest = ::snapshot::ManifestData { + state_hashes: Vec::new(), + block_hashes: Vec::new(), + state_root: ::util::sha3::SHA3_NULL_RLP, + block_number: 102, + block_hash: H256::default(), + }; + + let mut rebuilder = BlockRebuilder::new(chain, db.clone(), &manifest).unwrap(); + + match rebuilder.feed(&chunk, &engine, &AtomicBool::new(false)) { + Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {} + _ => panic!("Wrong result on abort flag set") + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index e1d4df5f9..05537fa96 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -16,10 +16,12 @@ //! State snapshotting tests. -use snapshot::{chunk_state, Progress, StateRebuilder}; +use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder}; use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use super::helpers::{compare_dbs, StateProducer}; +use error::Error; + use rand::{XorShiftRng, SeedableRng}; use util::hash::H256; use util::journaldb::{self, Algorithm}; @@ -29,6 +31,7 @@ use util::Mutex; use devtools::RandomTempPath; use std::sync::Arc; +use std::sync::atomic::AtomicBool; #[test] fn snap_and_restore() { @@ -65,11 +68,13 @@ fn snap_and_restore() { let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::Archive); let reader = PackedReader::new(&snap_file).unwrap().unwrap(); + let flag = AtomicBool::new(true); + for chunk_hash in &reader.manifest().state_hashes { let raw = reader.chunk(*chunk_hash).unwrap(); let chunk = ::util::snappy::decompress(&raw).unwrap(); - rebuilder.feed(&chunk).unwrap(); + rebuilder.feed(&chunk, &flag).unwrap(); } assert_eq!(rebuilder.state_root(), state_root); @@ -82,3 +87,52 @@ fn snap_and_restore() { compare_dbs(&old_db, new_db.as_hashdb()); } + +#[test] +fn checks_flag() { + let mut producer = StateProducer::new(); + let mut rng = XorShiftRng::from_seed([5, 6, 7, 8]); + let mut old_db = MemoryDB::new(); + let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + for _ in 0..10 { + producer.tick(&mut rng, &mut old_db); + } + + let snap_dir = RandomTempPath::create_dir(); + let mut snap_file = snap_dir.as_path().to_owned(); + snap_file.push("SNAP"); + + let state_root = producer.state_root(); + let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); + + let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap(); + + writer.into_inner().finish(::snapshot::ManifestData { + state_hashes: state_hashes, + block_hashes: Vec::new(), + state_root: state_root, + block_number: 0, + block_hash: H256::default(), + }).unwrap(); + + let mut db_path = snap_dir.as_path().to_owned(); + db_path.push("db"); + { + let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap()); + let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::Archive); + let reader = PackedReader::new(&snap_file).unwrap().unwrap(); + + let flag = AtomicBool::new(false); + + for chunk_hash in &reader.manifest().state_hashes { + let raw = reader.chunk(*chunk_hash).unwrap(); + let chunk = ::util::snappy::decompress(&raw).unwrap(); + + match rebuilder.feed(&chunk, &flag) { + Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {}, + _ => panic!("unexpected result when feeding with flag off"), + } + } + } +} \ No newline at end of file diff --git a/sync/src/api.rs b/sync/src/api.rs index 1a33bc727..d9dbbd263 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -73,7 +73,7 @@ pub trait SyncProvider: Send + Sync { /// Get peers information fn peers(&self) -> Vec; - + /// Get the enode if available. fn enode(&self) -> Option; } @@ -231,6 +231,7 @@ impl ChainNotify for EthSync { } fn stop(&self) { + self.handler.snapshot_service.abort_restore(); self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } }