Snapshot and blockchain stability improvements (#2843)

* allow taking snapshot from just-restored database without error

* make creation informant less spammy

* Ancestry iterator failure-resilient

* make uncle hash searching resilient to incomplete chain

* deduce pre-chunk info from last written block's details
This commit is contained in:
Robert Habermeier 2016-10-24 18:27:23 +02:00 committed by Gav Wood
parent 1a5bae8ef1
commit bc81ae0407
5 changed files with 68 additions and 59 deletions

View File

@ -394,6 +394,8 @@ impl BlockProvider for BlockChain {
}
}
/// An iterator which walks the blockchain towards the genesis.
#[derive(Clone)]
pub struct AncestryIter<'a> {
current: H256,
chain: &'a BlockChain,
@ -403,11 +405,10 @@ impl<'a> Iterator for AncestryIter<'a> {
type Item = H256;
fn next(&mut self) -> Option<H256> {
if self.current.is_zero() {
Option::None
None
} else {
let mut n = self.chain.block_details(&self.current).unwrap().parent;
mem::swap(&mut self.current, &mut n);
Some(n)
self.chain.block_details(&self.current)
.map(|details| mem::replace(&mut self.current, details.parent))
}
}
}
@ -999,17 +1000,29 @@ impl BlockChain {
if !self.is_known(parent) { return None; }
let mut excluded = HashSet::new();
for a in self.ancestry_iter(parent.clone()).unwrap().take(uncle_generations) {
excluded.extend(self.uncle_hashes(&a).unwrap().into_iter());
let ancestry = match self.ancestry_iter(parent.clone()) {
Some(iter) => iter,
None => return None,
};
for a in ancestry.clone().take(uncle_generations) {
if let Some(uncles) = self.uncle_hashes(&a) {
excluded.extend(uncles);
excluded.insert(a);
} else {
break
}
}
let mut ret = Vec::new();
for a in self.ancestry_iter(parent.clone()).unwrap().skip(1).take(uncle_generations) {
ret.extend(self.block_details(&a).unwrap().children.iter()
.filter(|h| !excluded.contains(h))
);
for a in ancestry.skip(1).take(uncle_generations) {
if let Some(details) = self.block_details(&a) {
ret.extend(details.children.iter().filter(|h| !excluded.contains(h)))
} else {
break
}
}
Some(ret)
}

View File

@ -136,7 +136,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = try!(scope(|scope| {
let block_guard = scope.spawn(|| chunk_blocks(chain, (number, block_at), &writer, p));
let block_guard = scope.spawn(|| chunk_blocks(chain, block_at, &writer, p));
let state_res = chunk_state(state_db, state_root, &writer, p);
state_res.and_then(|state_hashes| {
@ -176,10 +176,15 @@ struct BlockChunker<'a> {
impl<'a> BlockChunker<'a> {
// Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash.
// Loops until we reach the first desired block, and writes out the remainder.
fn chunk_all(&mut self, first_hash: H256) -> Result<(), Error> {
fn chunk_all(&mut self) -> Result<(), Error> {
let mut loaded_size = 0;
let mut last = self.current_hash;
let genesis_hash = self.chain.genesis_hash();
for _ in 0..SNAPSHOT_BLOCKS {
if self.current_hash == genesis_hash { break }
while self.current_hash != first_hash {
let (block, receipts) = try!(self.chain.block(&self.current_hash)
.and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r)))
.ok_or(Error::BlockNotFound(self.current_hash)));
@ -197,21 +202,21 @@ impl<'a> BlockChunker<'a> {
// cut off the chunk if too large.
if new_loaded_size > PREFERRED_CHUNK_SIZE {
try!(self.write_chunk());
if new_loaded_size > PREFERRED_CHUNK_SIZE && self.rlps.len() > 0 {
try!(self.write_chunk(last));
loaded_size = pair.len();
} else {
loaded_size = new_loaded_size;
}
self.rlps.push_front(pair);
last = self.current_hash;
self.current_hash = view.header_view().parent_hash();
}
if loaded_size != 0 {
// we don't store the first block, so once we get to this point,
// the "first" block will be first_number + 1.
try!(self.write_chunk());
try!(self.write_chunk(last));
}
Ok(())
@ -219,23 +224,24 @@ impl<'a> BlockChunker<'a> {
// write out the data in the buffers to a chunk on disk
//
// we preface each chunk with the parent of the first block's details.
fn write_chunk(&mut self) -> Result<(), Error> {
// since the block we're inspecting now doesn't go into the
// chunk if it's too large, the current hash is the parent hash
// for the first block in that chunk.
let parent_hash = self.current_hash;
// we preface each chunk with the parent of the first block's details,
// obtained from the details of the last block written.
fn write_chunk(&mut self, last: H256) -> Result<(), Error> {
trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len());
let (parent_number, parent_details) = try!(self.chain.block_number(&parent_hash)
.and_then(|n| self.chain.block_details(&parent_hash).map(|d| (n, d)))
.ok_or(Error::BlockNotFound(parent_hash)));
let parent_total_difficulty = parent_details.total_difficulty;
let (last_header, last_details) = try!(self.chain.block_header(&last)
.and_then(|n| self.chain.block_details(&last).map(|d| (n, d)))
.ok_or(Error::BlockNotFound(last)));
let parent_number = last_header.number() - 1;
let parent_hash = last_header.parent_hash();
let parent_total_difficulty = last_details.total_difficulty - *last_header.difficulty();
trace!(target: "snapshot", "parent last written block: {}", parent_hash);
let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty);
rlp_stream.append(&parent_number).append(parent_hash).append(&parent_total_difficulty);
for pair in self.rlps.drain(..) {
rlp_stream.append_raw(&pair, 1);
@ -264,17 +270,7 @@ impl<'a> BlockChunker<'a> {
/// The path parameter is the directory to store the block chunks in.
/// This function assumes the directory exists already.
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
let (start_number, start_hash) = start_block_info;
let first_hash = if start_number < SNAPSHOT_BLOCKS {
// use the genesis hash.
chain.genesis_hash()
} else {
let first_num = start_number - SNAPSHOT_BLOCKS;
try!(chain.block_hash(first_num).ok_or(Error::IncompleteChain))
};
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_hash: H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
let mut chunker = BlockChunker {
chain: chain,
rlps: VecDeque::new(),
@ -285,7 +281,7 @@ pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), wr
progress: progress,
};
try!(chunker.chunk_all(first_hash));
try!(chunker.chunk_all());
Ok(chunker.hashes)
}
@ -596,7 +592,7 @@ impl BlockRebuilder {
let rlp = UntrustedRlp::new(chunk);
let item_count = rlp.item_count();
trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 2);
trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3);
// todo: assert here that these values are consistent with chunks being in order.
let mut cur_number = try!(rlp.val_at::<u64>(0)) + 1;

View File

@ -57,7 +57,7 @@ fn chunk_and_restore(amount: u64) {
// snapshot it.
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
let block_hashes = chunk_blocks(&bc, (amount, best_hash), &writer, &Progress::default()).unwrap();
let block_hashes = chunk_blocks(&bc, best_hash, &writer, &Progress::default()).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
state_hashes: Vec::new(),
block_hashes: block_hashes,

View File

@ -45,6 +45,14 @@ pub struct Informant {
skipped: AtomicUsize,
}
/// Format byte counts to standard denominations.
pub fn format_bytes(b: usize) -> String {
match binary_prefix(b as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
}
/// Something that can be converted to milliseconds.
pub trait MillisecondDuration {
/// Get the value in milliseconds.
@ -75,13 +83,6 @@ impl Informant {
}
}
fn format_bytes(b: usize) -> String {
match binary_prefix(b as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
}
#[cfg_attr(feature="dev", allow(match_bool))]
pub fn tick(&self) {
@ -156,11 +157,11 @@ impl Informant {
_ => String::new(),
},
format!("{} db {} chain {} queue{}",
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(report.state_db_mem))),
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(cache_info.total()))),
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(queue_info.mem_used))),
paint(Blue.bold(), format!("{:>8}", format_bytes(report.state_db_mem))),
paint(Blue.bold(), format!("{:>8}", format_bytes(cache_info.total()))),
paint(Blue.bold(), format!("{:>8}", format_bytes(queue_info.mem_used))),
match sync_status {
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))),
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", format_bytes(sync_info.mem_used)))),
_ => String::new(),
}
)

View File

@ -232,9 +232,8 @@ impl SnapshotCommand {
let cur_size = p.size();
if cur_size != last_size {
last_size = cur_size;
info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size());
} else {
info!("Snapshot: No progress since last update.");
let bytes = ::informant::format_bytes(p.size());
info!("Snapshot: {} accounts {} blocks {}", p.accounts(), p.blocks(), bytes);
}
::std::thread::sleep(Duration::from_secs(5));