abort snapshot restoration faster (#3356)
* abort snapshot restoration faster * flag-checking tests
This commit is contained in:
parent
0c302376b6
commit
37f49aac1b
@ -45,6 +45,8 @@ pub enum Error {
|
||||
MissingCode(Vec<H256>),
|
||||
/// Unrecognized code encoding.
|
||||
UnrecognizedCodeState(u8),
|
||||
/// Restoration aborted.
|
||||
RestorationAborted,
|
||||
/// Trie error.
|
||||
Trie(TrieError),
|
||||
/// Decoder error.
|
||||
@ -67,6 +69,7 @@ impl fmt::Display for Error {
|
||||
a pruned database. Please re-run with the --pruning archive flag."),
|
||||
Error::MissingCode(ref missing) => write!(f, "Incomplete snapshot: {} contract codes not found.", missing.len()),
|
||||
Error::UnrecognizedCodeState(state) => write!(f, "Unrecognized code encoding ({})", state),
|
||||
Error::RestorationAborted => write!(f, "Snapshot restoration aborted."),
|
||||
Error::Io(ref err) => err.fmt(f),
|
||||
Error::Decoder(ref err) => err.fmt(f),
|
||||
Error::Trie(ref err) => err.fmt(f),
|
||||
|
@ -407,30 +407,28 @@ impl StateRebuilder {
|
||||
}
|
||||
|
||||
/// Feed an uncompressed state chunk into the rebuilder.
|
||||
pub fn feed(&mut self, chunk: &[u8]) -> Result<(), ::error::Error> {
|
||||
pub fn feed(&mut self, chunk: &[u8], flag: &AtomicBool) -> Result<(), ::error::Error> {
|
||||
let rlp = UntrustedRlp::new(chunk);
|
||||
let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp();
|
||||
let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect();
|
||||
let mut pairs = Vec::with_capacity(rlp.item_count());
|
||||
|
||||
// initialize the pairs vector with empty values so we have slots to write into.
|
||||
pairs.resize(rlp.item_count(), (H256::new(), Vec::new()));
|
||||
|
||||
let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1;
|
||||
let status = try!(rebuild_accounts(
|
||||
self.db.as_hashdb_mut(),
|
||||
rlp,
|
||||
&mut pairs,
|
||||
&self.code_map,
|
||||
flag
|
||||
));
|
||||
|
||||
// new code contained within this chunk.
|
||||
let mut chunk_code = HashMap::new();
|
||||
|
||||
for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) {
|
||||
let code_map = &self.code_map;
|
||||
let status = try!(rebuild_accounts(self.db.as_hashdb_mut(), account_chunk, out_pairs_chunk, code_map));
|
||||
chunk_code.extend(status.new_code);
|
||||
for (addr_hash, code_hash) in status.missing_code {
|
||||
self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash);
|
||||
}
|
||||
for (addr_hash, code_hash) in status.missing_code {
|
||||
self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash);
|
||||
}
|
||||
|
||||
// patch up all missing code. must be done after collecting all new missing code entries.
|
||||
for (code_hash, code) in chunk_code {
|
||||
for (code_hash, code) in status.new_code {
|
||||
for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) {
|
||||
let mut db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), addr_hash);
|
||||
db.emplace(code_hash, DBValue::from_slice(&code));
|
||||
@ -450,6 +448,8 @@ impl StateRebuilder {
|
||||
};
|
||||
|
||||
for (hash, thin_rlp) in pairs {
|
||||
if !flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
|
||||
|
||||
if &thin_rlp[..] != &empty_rlp[..] {
|
||||
self.bloom.set(&*hash);
|
||||
}
|
||||
@ -487,17 +487,18 @@ struct RebuiltStatus {
|
||||
}
|
||||
|
||||
// rebuild a set of accounts and their storage.
|
||||
// returns
|
||||
// returns a status detailing newly-loaded code and accounts missing code.
|
||||
fn rebuild_accounts(
|
||||
db: &mut HashDB,
|
||||
account_chunk: &[&[u8]],
|
||||
account_fat_rlps: UntrustedRlp,
|
||||
out_chunk: &mut [(H256, Bytes)],
|
||||
code_map: &HashMap<H256, Bytes>
|
||||
code_map: &HashMap<H256, Bytes>,
|
||||
abort_flag: &AtomicBool
|
||||
) -> Result<RebuiltStatus, ::error::Error>
|
||||
{
|
||||
let mut status = RebuiltStatus::default();
|
||||
for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) {
|
||||
let account_rlp = UntrustedRlp::new(account_pair);
|
||||
for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk) {
|
||||
if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
|
||||
|
||||
let hash: H256 = try!(account_rlp.val_at(0));
|
||||
let fat_rlp = try!(account_rlp.at(1));
|
||||
@ -580,7 +581,7 @@ impl BlockRebuilder {
|
||||
|
||||
/// Feed the rebuilder an uncompressed block chunk.
|
||||
/// Returns the number of blocks fed or any errors.
|
||||
pub fn feed(&mut self, chunk: &[u8], engine: &Engine) -> Result<u64, ::error::Error> {
|
||||
pub fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result<u64, ::error::Error> {
|
||||
use basic_types::Seal::With;
|
||||
use util::U256;
|
||||
use util::triehash::ordered_trie_root;
|
||||
@ -601,6 +602,8 @@ impl BlockRebuilder {
|
||||
let parent_total_difficulty = try!(rlp.val_at::<U256>(2));
|
||||
|
||||
for idx in 3..item_count {
|
||||
if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
|
||||
|
||||
let pair = try!(rlp.at(idx));
|
||||
let abridged_rlp = try!(pair.at(0)).as_raw().to_owned();
|
||||
let abridged_block = AbridgedBlock::from_raw(abridged_rlp);
|
||||
|
@ -118,12 +118,12 @@ impl Restoration {
|
||||
})
|
||||
}
|
||||
|
||||
// feeds a state chunk
|
||||
fn feed_state(&mut self, hash: H256, chunk: &[u8]) -> Result<(), Error> {
|
||||
// feeds a state chunk, aborts early if `flag` becomes false.
|
||||
fn feed_state(&mut self, hash: H256, chunk: &[u8], flag: &AtomicBool) -> Result<(), Error> {
|
||||
if self.state_chunks_left.remove(&hash) {
|
||||
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
||||
|
||||
try!(self.state.feed(&self.snappy_buffer[..len]));
|
||||
try!(self.state.feed(&self.snappy_buffer[..len], flag));
|
||||
|
||||
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||
try!(writer.write_state_chunk(hash, chunk));
|
||||
@ -134,11 +134,11 @@ impl Restoration {
|
||||
}
|
||||
|
||||
// feeds a block chunk
|
||||
fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine) -> Result<(), Error> {
|
||||
fn feed_blocks(&mut self, hash: H256, chunk: &[u8], engine: &Engine, flag: &AtomicBool) -> Result<(), Error> {
|
||||
if self.block_chunks_left.remove(&hash) {
|
||||
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
||||
|
||||
try!(self.blocks.feed(&self.snappy_buffer[..len], engine));
|
||||
try!(self.blocks.feed(&self.snappy_buffer[..len], engine, flag));
|
||||
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||
try!(writer.write_block_chunk(hash, chunk));
|
||||
}
|
||||
@ -224,6 +224,7 @@ pub struct Service {
|
||||
db_restore: Arc<DatabaseRestore>,
|
||||
progress: super::Progress,
|
||||
taking_snapshot: AtomicBool,
|
||||
restoring_snapshot: AtomicBool,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
@ -244,6 +245,7 @@ impl Service {
|
||||
db_restore: params.db_restore,
|
||||
progress: Default::default(),
|
||||
taking_snapshot: AtomicBool::new(false),
|
||||
restoring_snapshot: AtomicBool::new(false),
|
||||
};
|
||||
|
||||
// create the root snapshot dir if it doesn't exist.
|
||||
@ -436,6 +438,8 @@ impl Service {
|
||||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
|
||||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
|
||||
};
|
||||
|
||||
self.restoring_snapshot.store(true, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -490,8 +494,8 @@ impl Service {
|
||||
};
|
||||
|
||||
(match is_state {
|
||||
true => rest.feed_state(hash, chunk),
|
||||
false => rest.feed_blocks(hash, chunk, &*self.engine),
|
||||
true => rest.feed_state(hash, chunk, &self.restoring_snapshot),
|
||||
false => rest.feed_blocks(hash, chunk, &*self.engine, &self.restoring_snapshot),
|
||||
}.map(|_| rest.is_done()), rest.db.clone())
|
||||
};
|
||||
|
||||
@ -573,6 +577,7 @@ impl SnapshotService for Service {
|
||||
}
|
||||
|
||||
fn abort_restore(&self) {
|
||||
self.restoring_snapshot.store(false, Ordering::SeqCst);
|
||||
*self.restoration.lock() = None;
|
||||
*self.status.lock() = RestorationStatus::Inactive;
|
||||
}
|
||||
|
@ -17,10 +17,11 @@
|
||||
//! Block chunker and rebuilder tests.
|
||||
|
||||
use devtools::RandomTempPath;
|
||||
use error::Error;
|
||||
|
||||
use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer};
|
||||
use blockchain::BlockChain;
|
||||
use snapshot::{chunk_blocks, BlockRebuilder, Progress};
|
||||
use snapshot::{chunk_blocks, BlockRebuilder, Error as SnapshotError, Progress};
|
||||
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
|
||||
|
||||
use util::{Mutex, snappy};
|
||||
@ -28,6 +29,7 @@ use util::kvdb::{Database, DatabaseConfig};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
fn chunk_and_restore(amount: u64) {
|
||||
let mut canon_chain = ChainGenerator::default();
|
||||
@ -75,10 +77,11 @@ fn chunk_and_restore(amount: u64) {
|
||||
let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), &manifest).unwrap();
|
||||
let reader = PackedReader::new(&snapshot_path).unwrap().unwrap();
|
||||
let engine = ::engines::NullEngine::new(Default::default(), Default::default());
|
||||
let flag = AtomicBool::new(true);
|
||||
for chunk_hash in &reader.manifest().block_hashes {
|
||||
let compressed = reader.chunk(*chunk_hash).unwrap();
|
||||
let chunk = snappy::decompress(&compressed).unwrap();
|
||||
rebuilder.feed(&chunk, &engine).unwrap();
|
||||
rebuilder.feed(&chunk, &engine, &flag).unwrap();
|
||||
}
|
||||
|
||||
rebuilder.finalize(HashMap::new()).unwrap();
|
||||
@ -93,3 +96,46 @@ fn chunk_and_restore_500() { chunk_and_restore(500) }
|
||||
|
||||
#[test]
|
||||
fn chunk_and_restore_40k() { chunk_and_restore(40000) }
|
||||
|
||||
#[test]
|
||||
fn checks_flag() {
|
||||
use ::rlp::{RlpStream, Stream};
|
||||
use util::H256;
|
||||
|
||||
let mut stream = RlpStream::new_list(5);
|
||||
|
||||
stream.append(&100u64)
|
||||
.append(&H256::default())
|
||||
.append(&(!0u64));
|
||||
|
||||
stream.append_empty_data().append_empty_data();
|
||||
|
||||
let genesis = {
|
||||
let mut canon_chain = ChainGenerator::default();
|
||||
let mut finalizer = BlockFinalizer::default();
|
||||
canon_chain.generate(&mut finalizer).unwrap()
|
||||
};
|
||||
|
||||
let chunk = stream.out();
|
||||
let path = RandomTempPath::create_dir();
|
||||
|
||||
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||
let db = Arc::new(Database::open(&db_cfg, path.as_str()).unwrap());
|
||||
let chain = BlockChain::new(Default::default(), &genesis, db.clone());
|
||||
let engine = ::engines::NullEngine::new(Default::default(), Default::default());
|
||||
|
||||
let manifest = ::snapshot::ManifestData {
|
||||
state_hashes: Vec::new(),
|
||||
block_hashes: Vec::new(),
|
||||
state_root: ::util::sha3::SHA3_NULL_RLP,
|
||||
block_number: 102,
|
||||
block_hash: H256::default(),
|
||||
};
|
||||
|
||||
let mut rebuilder = BlockRebuilder::new(chain, db.clone(), &manifest).unwrap();
|
||||
|
||||
match rebuilder.feed(&chunk, &engine, &AtomicBool::new(false)) {
|
||||
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {}
|
||||
_ => panic!("Wrong result on abort flag set")
|
||||
}
|
||||
}
|
@ -16,10 +16,12 @@
|
||||
|
||||
//! State snapshotting tests.
|
||||
|
||||
use snapshot::{chunk_state, Progress, StateRebuilder};
|
||||
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder};
|
||||
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
|
||||
use super::helpers::{compare_dbs, StateProducer};
|
||||
|
||||
use error::Error;
|
||||
|
||||
use rand::{XorShiftRng, SeedableRng};
|
||||
use util::hash::H256;
|
||||
use util::journaldb::{self, Algorithm};
|
||||
@ -29,6 +31,7 @@ use util::Mutex;
|
||||
use devtools::RandomTempPath;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
#[test]
|
||||
fn snap_and_restore() {
|
||||
@ -65,11 +68,13 @@ fn snap_and_restore() {
|
||||
let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::Archive);
|
||||
let reader = PackedReader::new(&snap_file).unwrap().unwrap();
|
||||
|
||||
let flag = AtomicBool::new(true);
|
||||
|
||||
for chunk_hash in &reader.manifest().state_hashes {
|
||||
let raw = reader.chunk(*chunk_hash).unwrap();
|
||||
let chunk = ::util::snappy::decompress(&raw).unwrap();
|
||||
|
||||
rebuilder.feed(&chunk).unwrap();
|
||||
rebuilder.feed(&chunk, &flag).unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(rebuilder.state_root(), state_root);
|
||||
@ -82,3 +87,52 @@ fn snap_and_restore() {
|
||||
|
||||
compare_dbs(&old_db, new_db.as_hashdb());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn checks_flag() {
|
||||
let mut producer = StateProducer::new();
|
||||
let mut rng = XorShiftRng::from_seed([5, 6, 7, 8]);
|
||||
let mut old_db = MemoryDB::new();
|
||||
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||
|
||||
for _ in 0..10 {
|
||||
producer.tick(&mut rng, &mut old_db);
|
||||
}
|
||||
|
||||
let snap_dir = RandomTempPath::create_dir();
|
||||
let mut snap_file = snap_dir.as_path().to_owned();
|
||||
snap_file.push("SNAP");
|
||||
|
||||
let state_root = producer.state_root();
|
||||
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
|
||||
|
||||
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
|
||||
|
||||
writer.into_inner().finish(::snapshot::ManifestData {
|
||||
state_hashes: state_hashes,
|
||||
block_hashes: Vec::new(),
|
||||
state_root: state_root,
|
||||
block_number: 0,
|
||||
block_hash: H256::default(),
|
||||
}).unwrap();
|
||||
|
||||
let mut db_path = snap_dir.as_path().to_owned();
|
||||
db_path.push("db");
|
||||
{
|
||||
let new_db = Arc::new(Database::open(&db_cfg, &db_path.to_string_lossy()).unwrap());
|
||||
let mut rebuilder = StateRebuilder::new(new_db.clone(), Algorithm::Archive);
|
||||
let reader = PackedReader::new(&snap_file).unwrap().unwrap();
|
||||
|
||||
let flag = AtomicBool::new(false);
|
||||
|
||||
for chunk_hash in &reader.manifest().state_hashes {
|
||||
let raw = reader.chunk(*chunk_hash).unwrap();
|
||||
let chunk = ::util::snappy::decompress(&raw).unwrap();
|
||||
|
||||
match rebuilder.feed(&chunk, &flag) {
|
||||
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {},
|
||||
_ => panic!("unexpected result when feeding with flag off"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -231,6 +231,7 @@ impl ChainNotify for EthSync {
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
self.handler.snapshot_service.abort_restore();
|
||||
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user