Better logging when backfilling ancient blocks fail (#10796)
* Better logging when backfilling ancient blocks fail Print total blocks imported, closes #10792 * `finalize()` doesn't need Engine Pull out call to migrated_blocks() from replace_client_db() * More logs * Clarify that the percentage may be misleading * Remove replace_client_db() and replace with a straight call to restore_db() * Include the parent_hash in UnlinkedAncientBlockChain errors * Add a new RestorationStatus varian: Finalizing (as it can take a loooong while) Call abort_restore() when restoration fails * Add missing cases for new variant * typos * Typo and derive Debug * Do not attempt to salvage existing blocks unless they form a complete chain back to genesis * Fix test * Revert "Fix test" This reverts commit f027d4b4cb7b6c23fceec528c1711886ba9cfe4e. * Fix test again * Update comment * Be careful about locks * fix test failure * Do not defer returning an error when the chain is broken * Review feedback * no hex formatting for Option
This commit is contained in:
parent
306c1764eb
commit
5dc5be1e58
@ -24,7 +24,8 @@ use common_types::header::Header;
|
|||||||
/// For GHOST fork-choice rule it would typically describe the block with highest
|
/// For GHOST fork-choice rule it would typically describe the block with highest
|
||||||
/// combined difficulty (usually the block with the highest block number).
|
/// combined difficulty (usually the block with the highest block number).
|
||||||
///
|
///
|
||||||
/// Sometimes refered as 'latest block'.
|
/// Sometimes referred as 'latest block'.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct BestBlock {
|
pub struct BestBlock {
|
||||||
/// Best block decoded header.
|
/// Best block decoded header.
|
||||||
pub header: Header,
|
pub header: Header,
|
||||||
@ -35,7 +36,7 @@ pub struct BestBlock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Best ancient block info. If the blockchain has a gap this keeps track of where it starts.
|
/// Best ancient block info. If the blockchain has a gap this keeps track of where it starts.
|
||||||
#[derive(Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct BestAncientBlock {
|
pub struct BestAncientBlock {
|
||||||
/// Best block hash.
|
/// Best block hash.
|
||||||
pub hash: H256,
|
pub hash: H256,
|
||||||
|
@ -652,10 +652,7 @@ impl BlockChain {
|
|||||||
// and write them
|
// and write them
|
||||||
if let (Some(hash), Some(number)) = (best_ancient, best_ancient_number) {
|
if let (Some(hash), Some(number)) = (best_ancient, best_ancient_number) {
|
||||||
let mut best_ancient_block = bc.best_ancient_block.write();
|
let mut best_ancient_block = bc.best_ancient_block.write();
|
||||||
*best_ancient_block = Some(BestAncientBlock {
|
*best_ancient_block = Some(BestAncientBlock { hash, number });
|
||||||
hash: hash,
|
|
||||||
number: number,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ pub struct TransactionAddress {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Contains all block receipts.
|
/// Contains all block receipts.
|
||||||
#[derive(Clone, RlpEncodableWrapper, RlpDecodableWrapper, MallocSizeOf)]
|
#[derive(Debug, Clone, RlpEncodableWrapper, RlpDecodableWrapper, MallocSizeOf)]
|
||||||
pub struct BlockReceipts {
|
pub struct BlockReceipts {
|
||||||
/// Block receipts
|
/// Block receipts
|
||||||
pub receipts: Vec<Receipt>,
|
pub receipts: Vec<Receipt>,
|
||||||
@ -214,9 +214,7 @@ pub struct BlockReceipts {
|
|||||||
impl BlockReceipts {
|
impl BlockReceipts {
|
||||||
/// Create new block receipts wrapper.
|
/// Create new block receipts wrapper.
|
||||||
pub fn new(receipts: Vec<Receipt>) -> Self {
|
pub fn new(receipts: Vec<Receipt>) -> Self {
|
||||||
BlockReceipts {
|
BlockReceipts { receipts }
|
||||||
receipts: receipts
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,7 +348,7 @@ impl Rebuilder for ChunkRebuilder {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn finalize(&mut self, _engine: &dyn Engine) -> Result<(), ::error::Error> {
|
fn finalize(&mut self) -> Result<(), ::error::Error> {
|
||||||
if !self.had_genesis {
|
if !self.had_genesis {
|
||||||
return Err(Error::WrongChunkFormat("No genesis transition included.".into()).into());
|
return Err(Error::WrongChunkFormat("No genesis transition included.".into()).into());
|
||||||
}
|
}
|
||||||
@ -358,6 +358,7 @@ impl Rebuilder for ChunkRebuilder {
|
|||||||
None => return Err(Error::WrongChunkFormat("Warp target block not included.".into()).into()),
|
None => return Err(Error::WrongChunkFormat("Warp target block not included.".into()).into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
trace!(target: "snapshot", "rebuilder, finalize: verifying {} unverified first blocks", self.unverified_firsts.len());
|
||||||
// verify the first entries of chunks we couldn't before.
|
// verify the first entries of chunks we couldn't before.
|
||||||
// we store all last verifiers, but not all firsts.
|
// we store all last verifiers, but not all firsts.
|
||||||
// match each unverified first epoch with a last epoch verifier.
|
// match each unverified first epoch with a last epoch verifier.
|
||||||
|
@ -92,5 +92,5 @@ pub trait Rebuilder: Send {
|
|||||||
///
|
///
|
||||||
/// This should apply the necessary "glue" between chunks,
|
/// This should apply the necessary "glue" between chunks,
|
||||||
/// and verify against the restored state.
|
/// and verify against the restored state.
|
||||||
fn finalize(&mut self, engine: &dyn Engine) -> Result<(), ::error::Error>;
|
fn finalize(&mut self) -> Result<(), ::error::Error>;
|
||||||
}
|
}
|
||||||
|
@ -208,15 +208,15 @@ impl PowRebuilder {
|
|||||||
/// Create a new PowRebuilder.
|
/// Create a new PowRebuilder.
|
||||||
fn new(chain: BlockChain, db: Arc<dyn KeyValueDB>, manifest: &ManifestData, snapshot_blocks: u64) -> Result<Self, ::error::Error> {
|
fn new(chain: BlockChain, db: Arc<dyn KeyValueDB>, manifest: &ManifestData, snapshot_blocks: u64) -> Result<Self, ::error::Error> {
|
||||||
Ok(PowRebuilder {
|
Ok(PowRebuilder {
|
||||||
chain: chain,
|
chain,
|
||||||
db: db,
|
db,
|
||||||
rng: OsRng::new().map_err(|e| format!("{}", e))?,
|
rng: OsRng::new().map_err(|e| format!("{}", e))?,
|
||||||
disconnected: Vec::new(),
|
disconnected: Vec::new(),
|
||||||
best_number: manifest.block_number,
|
best_number: manifest.block_number,
|
||||||
best_hash: manifest.block_hash,
|
best_hash: manifest.block_hash,
|
||||||
best_root: manifest.state_root,
|
best_root: manifest.state_root,
|
||||||
fed_blocks: 0,
|
fed_blocks: 0,
|
||||||
snapshot_blocks: snapshot_blocks,
|
snapshot_blocks,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -298,9 +298,9 @@ impl Rebuilder for PowRebuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Glue together any disconnected chunks and check that the chain is complete.
|
/// Glue together any disconnected chunks and check that the chain is complete.
|
||||||
fn finalize(&mut self, _: &dyn Engine) -> Result<(), ::error::Error> {
|
fn finalize(&mut self) -> Result<(), ::error::Error> {
|
||||||
let mut batch = self.db.transaction();
|
let mut batch = self.db.transaction();
|
||||||
|
trace!(target: "snapshot", "rebuilder, finalize: inserting {} disconnected chunks", self.disconnected.len());
|
||||||
for (first_num, first_hash) in self.disconnected.drain(..) {
|
for (first_num, first_hash) in self.disconnected.drain(..) {
|
||||||
let parent_num = first_num - 1;
|
let parent_num = first_num - 1;
|
||||||
|
|
||||||
|
@ -68,8 +68,8 @@ pub enum Error {
|
|||||||
BadEpochProof(u64),
|
BadEpochProof(u64),
|
||||||
/// Wrong chunk format.
|
/// Wrong chunk format.
|
||||||
WrongChunkFormat(String),
|
WrongChunkFormat(String),
|
||||||
/// Unlinked ancient block chain
|
/// Unlinked ancient block chain; includes the parent hash where linkage failed
|
||||||
UnlinkedAncientBlockChain,
|
UnlinkedAncientBlockChain(H256),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl error::Error for Error {
|
impl error::Error for Error {
|
||||||
@ -108,7 +108,7 @@ impl fmt::Display for Error {
|
|||||||
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
|
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
|
||||||
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
|
||||||
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
|
||||||
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
|
Error::UnlinkedAncientBlockChain(parent_hash) => write!(f, "Unlinked ancient blocks chain at parent_hash={:#x}", parent_hash),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,7 @@ use bytes::Bytes;
|
|||||||
use journaldb::Algorithm;
|
use journaldb::Algorithm;
|
||||||
use kvdb::DBTransaction;
|
use kvdb::DBTransaction;
|
||||||
use snappy;
|
use snappy;
|
||||||
|
use snapshot::error::Error::UnlinkedAncientBlockChain;
|
||||||
|
|
||||||
/// Helper for removing directories in case of error.
|
/// Helper for removing directories in case of error.
|
||||||
struct Guard(bool, PathBuf);
|
struct Guard(bool, PathBuf);
|
||||||
@ -110,17 +111,17 @@ impl Restoration {
|
|||||||
|
|
||||||
let secondary = components.rebuilder(chain, raw_db.clone(), &manifest)?;
|
let secondary = components.rebuilder(chain, raw_db.clone(), &manifest)?;
|
||||||
|
|
||||||
let root = manifest.state_root.clone();
|
let final_state_root = manifest.state_root.clone();
|
||||||
|
|
||||||
Ok(Restoration {
|
Ok(Restoration {
|
||||||
manifest: manifest,
|
manifest,
|
||||||
state_chunks_left: state_chunks,
|
state_chunks_left: state_chunks,
|
||||||
block_chunks_left: block_chunks,
|
block_chunks_left: block_chunks,
|
||||||
state: StateRebuilder::new(raw_db.key_value().clone(), params.pruning),
|
state: StateRebuilder::new(raw_db.key_value().clone(), params.pruning),
|
||||||
secondary: secondary,
|
secondary,
|
||||||
writer: params.writer,
|
writer: params.writer,
|
||||||
snappy_buffer: Vec::new(),
|
snappy_buffer: Vec::new(),
|
||||||
final_state_root: root,
|
final_state_root,
|
||||||
guard: params.guard,
|
guard: params.guard,
|
||||||
db: raw_db,
|
db: raw_db,
|
||||||
})
|
})
|
||||||
@ -170,7 +171,7 @@ impl Restoration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// finish up restoration.
|
// finish up restoration.
|
||||||
fn finalize(mut self, engine: &dyn Engine) -> Result<(), Error> {
|
fn finalize(mut self) -> Result<(), Error> {
|
||||||
use trie::TrieError;
|
use trie::TrieError;
|
||||||
|
|
||||||
if !self.is_done() { return Ok(()) }
|
if !self.is_done() { return Ok(()) }
|
||||||
@ -186,13 +187,14 @@ impl Restoration {
|
|||||||
self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?;
|
self.state.finalize(self.manifest.block_number, self.manifest.block_hash)?;
|
||||||
|
|
||||||
// connect out-of-order chunks and verify chain integrity.
|
// connect out-of-order chunks and verify chain integrity.
|
||||||
self.secondary.finalize(engine)?;
|
self.secondary.finalize()?;
|
||||||
|
|
||||||
if let Some(writer) = self.writer {
|
if let Some(writer) = self.writer {
|
||||||
writer.finish(self.manifest)?;
|
writer.finish(self.manifest)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.guard.disarm();
|
self.guard.disarm();
|
||||||
|
trace!(target: "snapshot", "restoration finalised correctly");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,16 +339,6 @@ impl Service {
|
|||||||
dir
|
dir
|
||||||
}
|
}
|
||||||
|
|
||||||
// replace one the client's database with our own.
|
|
||||||
fn replace_client_db(&self) -> Result<(), Error> {
|
|
||||||
let migrated_blocks = self.migrate_blocks()?;
|
|
||||||
info!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks);
|
|
||||||
|
|
||||||
let rest_db = self.restoration_db();
|
|
||||||
self.client.restore_db(&*rest_db.to_string_lossy())?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Migrate the blocks in the current DB into the new chain
|
// Migrate the blocks in the current DB into the new chain
|
||||||
fn migrate_blocks(&self) -> Result<usize, Error> {
|
fn migrate_blocks(&self) -> Result<usize, Error> {
|
||||||
// Count the number of migrated blocks
|
// Count the number of migrated blocks
|
||||||
@ -361,11 +353,27 @@ impl Service {
|
|||||||
|
|
||||||
// The old database looks like this:
|
// The old database looks like this:
|
||||||
// [genesis, best_ancient_block] ... [first_block, best_block]
|
// [genesis, best_ancient_block] ... [first_block, best_block]
|
||||||
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can assume that the whole range from [genesis, best_block] is imported.
|
// If we are fully synced neither `best_ancient_block` nor `first_block` is set, and we can
|
||||||
// The new database only contains the tip of the chain ([first_block, best_block]),
|
// assume that the whole range from [genesis, best_block] is imported.
|
||||||
|
// The new database only contains the tip of the chain ([new_first_block, new_best_block]),
|
||||||
// so the useful set of blocks is defined as:
|
// so the useful set of blocks is defined as:
|
||||||
// [0 ... min(new.first_block, best_ancient_block or best_block)]
|
// [0 ... min(new.first_block, best_ancient_block or best_block)]
|
||||||
|
//
|
||||||
|
// If, for whatever reason, the old db does not have ancient blocks (i.e.
|
||||||
|
// `best_ancient_block` is `None` AND a non-zero `first_block`), such that the old db looks
|
||||||
|
// like [old_first_block..old_best_block] (which may or may not partially overlap with
|
||||||
|
// [new_first_block..new_best_block]) we do the conservative thing and do not migrate the
|
||||||
|
// old blocks.
|
||||||
let find_range = || -> Option<(H256, H256)> {
|
let find_range = || -> Option<(H256, H256)> {
|
||||||
|
// In theory, if the current best_block is > new first_block (i.e. ranges overlap)
|
||||||
|
// we could salvage them but what if there's been a re-org at the boundary and the two
|
||||||
|
// chains do not match anymore? We'd have to check the existing blocks carefully.
|
||||||
|
if cur_chain_info.ancient_block_number.is_none() && cur_chain_info.first_block_number.unwrap_or(0) > 0 {
|
||||||
|
info!(target: "blockchain", "blocks in the current DB do not stretch back to genesis; can't salvage them into the new DB. In current DB, first block: #{:?}/{:?}, best block: #{:?}/{:?}",
|
||||||
|
cur_chain_info.first_block_number, cur_chain_info.first_block_hash,
|
||||||
|
cur_chain_info.best_block_number, cur_chain_info.best_block_hash);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
let next_available_from = next_chain_info.first_block_number?;
|
let next_available_from = next_chain_info.first_block_number?;
|
||||||
let cur_available_to = cur_chain_info.ancient_block_number.unwrap_or(cur_chain_info.best_block_number);
|
let cur_available_to = cur_chain_info.ancient_block_number.unwrap_or(cur_chain_info.best_block_number);
|
||||||
|
|
||||||
@ -375,10 +383,11 @@ impl Service {
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(target: "snapshot", "Trying to import ancient blocks until {}", highest_block_num);
|
trace!(target: "snapshot", "Trying to import ancient blocks until {}. First block in new chain=#{}, first block in old chain=#{:?}, best block in old chain=#{}",
|
||||||
|
highest_block_num, next_available_from, cur_chain_info.first_block_number, cur_chain_info.best_block_number);
|
||||||
|
|
||||||
// Here we start from the highest block number and go backward to 0,
|
// Here we start from the highest block number and go backward to 0,
|
||||||
// thus starting at `highest_block_num` and targetting `0`.
|
// thus starting at `highest_block_num` and targeting `0`.
|
||||||
let target_hash = self.client.block_hash(BlockId::Number(0))?;
|
let target_hash = self.client.block_hash(BlockId::Number(0))?;
|
||||||
let start_hash = self.client.block_hash(BlockId::Number(highest_block_num))?;
|
let start_hash = self.client.block_hash(BlockId::Number(highest_block_num))?;
|
||||||
|
|
||||||
@ -398,7 +407,10 @@ impl Service {
|
|||||||
return Ok(count);
|
return Ok(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
let block = self.client.block(BlockId::Hash(parent_hash)).ok_or(::snapshot::error::Error::UnlinkedAncientBlockChain)?;
|
let block = self.client.block(BlockId::Hash(parent_hash)).ok_or_else(|| {
|
||||||
|
error!(target: "snapshot", "migrate_blocks: did not find block from parent_hash={:#x} (start_hash={:#x})", parent_hash, start_hash);
|
||||||
|
UnlinkedAncientBlockChain(parent_hash)
|
||||||
|
})?;
|
||||||
parent_hash = block.parent_hash();
|
parent_hash = block.parent_hash();
|
||||||
|
|
||||||
let block_number = block.number();
|
let block_number = block.number();
|
||||||
@ -412,7 +424,14 @@ impl Service {
|
|||||||
next_chain.insert_unordered_block(&mut batch, block, block_receipts, Some(parent_total_difficulty), false, true);
|
next_chain.insert_unordered_block(&mut batch, block, block_receipts, Some(parent_total_difficulty), false, true);
|
||||||
count += 1;
|
count += 1;
|
||||||
},
|
},
|
||||||
_ => break,
|
_ => {
|
||||||
|
// We couldn't reach the targeted hash
|
||||||
|
error!(target: "snapshot", "migrate_blocks: failed to find receipts and parent total difficulty; cannot reach the target_hash ({:#x}). Block #{}, parent_hash={:#x}, parent_total_difficulty={:?}, start_hash={:#x}, ancient_block_number={:?}, best_block_number={:?}",
|
||||||
|
target_hash, block_number, parent_hash, parent_total_difficulty,
|
||||||
|
start_hash, cur_chain_info.ancient_block_number, cur_chain_info.best_block_number,
|
||||||
|
);
|
||||||
|
return Err(UnlinkedAncientBlockChain(parent_hash).into());
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writing changes to DB and logging every now and then
|
// Writing changes to DB and logging every now and then
|
||||||
@ -433,11 +452,6 @@ impl Service {
|
|||||||
next_chain.commit();
|
next_chain.commit();
|
||||||
next_db.key_value().flush().expect("DB flush failed.");
|
next_db.key_value().flush().expect("DB flush failed.");
|
||||||
|
|
||||||
// We couldn't reach the targeted hash
|
|
||||||
if parent_hash != target_hash {
|
|
||||||
return Err(::snapshot::error::Error::UnlinkedAncientBlockChain.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update best ancient block in the Next Chain
|
// Update best ancient block in the Next Chain
|
||||||
next_chain.update_best_ancient_block(&start_hash);
|
next_chain.update_best_ancient_block(&start_hash);
|
||||||
Ok(count)
|
Ok(count)
|
||||||
@ -549,6 +563,8 @@ impl Service {
|
|||||||
|
|
||||||
*self.status.lock() = RestorationStatus::Initializing {
|
*self.status.lock() = RestorationStatus::Initializing {
|
||||||
chunks_done: 0,
|
chunks_done: 0,
|
||||||
|
state_chunks: manifest.state_hashes.len() as u32,
|
||||||
|
block_chunks: manifest.block_hashes.len() as u32,
|
||||||
};
|
};
|
||||||
|
|
||||||
fs::create_dir_all(&rest_dir)?;
|
fs::create_dir_all(&rest_dir)?;
|
||||||
@ -563,7 +579,7 @@ impl Service {
|
|||||||
manifest: manifest.clone(),
|
manifest: manifest.clone(),
|
||||||
pruning: self.pruning,
|
pruning: self.pruning,
|
||||||
db: self.restoration_db_handler.open(&rest_db)?,
|
db: self.restoration_db_handler.open(&rest_db)?,
|
||||||
writer: writer,
|
writer,
|
||||||
genesis: &self.genesis_block,
|
genesis: &self.genesis_block,
|
||||||
guard: Guard::new(rest_db),
|
guard: Guard::new(rest_db),
|
||||||
engine: &*self.engine,
|
engine: &*self.engine,
|
||||||
@ -654,15 +670,20 @@ impl Service {
|
|||||||
// lead to deadlock.
|
// lead to deadlock.
|
||||||
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
|
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
|
||||||
trace!(target: "snapshot", "finalizing restoration");
|
trace!(target: "snapshot", "finalizing restoration");
|
||||||
|
*self.status.lock() = RestorationStatus::Finalizing;
|
||||||
|
|
||||||
let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());
|
let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());
|
||||||
|
|
||||||
// destroy the restoration before replacing databases and snapshot.
|
// destroy the restoration before replacing databases and snapshot.
|
||||||
rest.take()
|
rest.take()
|
||||||
.map(|r| r.finalize(&*self.engine))
|
.map(|r| r.finalize())
|
||||||
.unwrap_or(Ok(()))?;
|
.unwrap_or(Ok(()))?;
|
||||||
|
|
||||||
self.replace_client_db()?;
|
let migrated_blocks = self.migrate_blocks()?;
|
||||||
|
info!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks);
|
||||||
|
|
||||||
|
// replace the Client's database with the new one (restart the Client).
|
||||||
|
self.client.restore_db(&*self.restoration_db().to_string_lossy())?;
|
||||||
|
|
||||||
if recover {
|
if recover {
|
||||||
let mut reader = self.reader.write();
|
let mut reader = self.reader.write();
|
||||||
@ -690,14 +711,20 @@ impl Service {
|
|||||||
/// Feed a chunk of either kind (block or state). no-op if no restoration or status is wrong.
|
/// Feed a chunk of either kind (block or state). no-op if no restoration or status is wrong.
|
||||||
fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) {
|
fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) {
|
||||||
// TODO: be able to process block chunks and state chunks at same time?
|
// TODO: be able to process block chunks and state chunks at same time?
|
||||||
|
let r = {
|
||||||
let mut restoration = self.restoration.lock();
|
let mut restoration = self.restoration.lock();
|
||||||
match self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state) {
|
self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state)
|
||||||
|
};
|
||||||
|
match r {
|
||||||
Ok(()) |
|
Ok(()) |
|
||||||
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => (),
|
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => (),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
// TODO: after this we're sometimes deadlocked
|
||||||
warn!("Encountered error during snapshot restoration: {}", e);
|
warn!("Encountered error during snapshot restoration: {}", e);
|
||||||
*self.restoration.lock() = None;
|
self.abort_restore();
|
||||||
*self.status.lock() = RestorationStatus::Failed;
|
if let Some(mut status) = self.status.try_lock_for(std::time::Duration::from_millis(10)) {
|
||||||
|
*status = RestorationStatus::Failed;
|
||||||
|
}
|
||||||
let _ = fs::remove_dir_all(self.restoration_dir());
|
let _ = fs::remove_dir_all(self.restoration_dir());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -707,8 +734,8 @@ impl Service {
|
|||||||
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
|
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
|
||||||
let (result, db) = {
|
let (result, db) = {
|
||||||
match self.status() {
|
match self.status() {
|
||||||
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
RestorationStatus::Inactive | RestorationStatus::Failed | RestorationStatus::Finalizing => {
|
||||||
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash);
|
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive, failed or finalizing", hash);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
},
|
},
|
||||||
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => {
|
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => {
|
||||||
@ -803,7 +830,7 @@ impl SnapshotService for Service {
|
|||||||
let mut cur_status = self.status.lock();
|
let mut cur_status = self.status.lock();
|
||||||
|
|
||||||
match *cur_status {
|
match *cur_status {
|
||||||
RestorationStatus::Initializing { ref mut chunks_done } => {
|
RestorationStatus::Initializing { ref mut chunks_done, .. } => {
|
||||||
*chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 +
|
*chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 +
|
||||||
self.block_chunks.load(Ordering::SeqCst) as u32;
|
self.block_chunks.load(Ordering::SeqCst) as u32;
|
||||||
}
|
}
|
||||||
|
@ -187,5 +187,5 @@ pub fn restore(
|
|||||||
|
|
||||||
trace!(target: "snapshot", "finalizing");
|
trace!(target: "snapshot", "finalizing");
|
||||||
state.finalize(manifest.block_number, manifest.block_hash)?;
|
state.finalize(manifest.block_number, manifest.block_hash)?;
|
||||||
secondary.finalize(engine)
|
secondary.finalize()
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ fn chunk_and_restore(amount: u64) {
|
|||||||
rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap();
|
rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
rebuilder.finalize(engine.as_ref()).unwrap();
|
rebuilder.finalize().unwrap();
|
||||||
drop(rebuilder);
|
drop(rebuilder);
|
||||||
|
|
||||||
// and test it.
|
// and test it.
|
||||||
|
@ -533,6 +533,10 @@ impl SyncHandler {
|
|||||||
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
|
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
RestorationStatus::Finalizing => {
|
||||||
|
trace!(target: "warp", "{}: Snapshot finalizing restoration", peer_id);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
RestorationStatus::Ongoing { .. } => {
|
RestorationStatus::Ongoing { .. } => {
|
||||||
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
|
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
|
||||||
},
|
},
|
||||||
|
@ -1210,7 +1210,7 @@ impl ChainSync {
|
|||||||
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
||||||
self.set_state(SyncState::SnapshotWaiting);
|
self.set_state(SyncState::SnapshotWaiting);
|
||||||
},
|
},
|
||||||
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (),
|
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } | RestorationStatus::Finalizing => (),
|
||||||
},
|
},
|
||||||
SyncState::SnapshotWaiting => {
|
SyncState::SnapshotWaiting => {
|
||||||
match io.snapshot_service().status() {
|
match io.snapshot_service().status() {
|
||||||
@ -1221,6 +1221,9 @@ impl ChainSync {
|
|||||||
RestorationStatus::Initializing { .. } => {
|
RestorationStatus::Initializing { .. } => {
|
||||||
trace!(target:"sync", "Snapshot restoration is initializing");
|
trace!(target:"sync", "Snapshot restoration is initializing");
|
||||||
},
|
},
|
||||||
|
RestorationStatus::Finalizing { .. } => {
|
||||||
|
trace!(target:"sync", "Snapshot finalizing restoration");
|
||||||
|
},
|
||||||
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
|
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
|
||||||
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
|
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
|
||||||
trace!(target:"sync", "Resuming snapshot sync");
|
trace!(target:"sync", "Resuming snapshot sync");
|
||||||
|
@ -23,6 +23,10 @@ pub enum RestorationStatus {
|
|||||||
Inactive,
|
Inactive,
|
||||||
/// Restoration is initializing
|
/// Restoration is initializing
|
||||||
Initializing {
|
Initializing {
|
||||||
|
/// Total number of state chunks.
|
||||||
|
state_chunks: u32,
|
||||||
|
/// Total number of block chunks.
|
||||||
|
block_chunks: u32,
|
||||||
/// Number of chunks done/imported
|
/// Number of chunks done/imported
|
||||||
chunks_done: u32,
|
chunks_done: u32,
|
||||||
},
|
},
|
||||||
@ -37,6 +41,7 @@ pub enum RestorationStatus {
|
|||||||
/// Number of block chunks completed.
|
/// Number of block chunks completed.
|
||||||
block_chunks_done: u32,
|
block_chunks_done: u32,
|
||||||
},
|
},
|
||||||
|
Finalizing,
|
||||||
/// Failed restoration.
|
/// Failed restoration.
|
||||||
Failed,
|
Failed,
|
||||||
}
|
}
|
||||||
|
@ -319,9 +319,16 @@ impl<T: InformantData> Informant<T> {
|
|||||||
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => {
|
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => {
|
||||||
format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks)
|
format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks)
|
||||||
},
|
},
|
||||||
RestorationStatus::Initializing { chunks_done } => {
|
RestorationStatus::Initializing { chunks_done, state_chunks, block_chunks } => {
|
||||||
format!("Snapshot initializing ({} chunks restored)", chunks_done)
|
let total_chunks = state_chunks + block_chunks;
|
||||||
|
// Note that the percentage here can be slightly misleading when
|
||||||
|
// they have chunks already on disk: we'll import the local
|
||||||
|
// chunks first and then download the rest.
|
||||||
|
format!("Snapshot initializing ({}/{} chunks restored, {:.0}%)", chunks_done, total_chunks, (chunks_done as f32 / total_chunks as f32) * 100.0)
|
||||||
},
|
},
|
||||||
|
RestorationStatus::Finalizing => {
|
||||||
|
format!("Snapshot finalization under way")
|
||||||
|
}
|
||||||
_ => String::new(),
|
_ => String::new(),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -123,6 +123,7 @@ fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService>, reader: &R,
|
|||||||
match snapshot.status() {
|
match snapshot.status() {
|
||||||
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
|
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
|
||||||
RestorationStatus::Initializing { .. } => Err("Snapshot restoration is still initializing.".into()),
|
RestorationStatus::Initializing { .. } => Err("Snapshot restoration is still initializing.".into()),
|
||||||
|
RestorationStatus::Finalizing => Err("Snapshot restoration is still finalizing.".into()),
|
||||||
RestorationStatus::Failed => Err("Snapshot restoration failed.".into()),
|
RestorationStatus::Failed => Err("Snapshot restoration failed.".into()),
|
||||||
RestorationStatus::Inactive => {
|
RestorationStatus::Inactive => {
|
||||||
info!("Restoration complete.");
|
info!("Restoration complete.");
|
||||||
|
Loading…
Reference in New Issue
Block a user