// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
//! Snapshot creation, restoration, and network service.
//!
//! Documentation of the format can be found at
//! https://github.com/ethcore/parity/wiki/%22PV64%22-Snapshot-Format
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use account_db::{AccountDB, AccountDBMut};
use blockchain::{BlockChain, BlockProvider};
use engines::Engine;
use header::Header;
use ids::BlockID;
use views::BlockView;
use util::{Bytes, Hashable, HashDB, DBValue, snappy, U256, Uint};
use util::Mutex;
use util::hash::{FixedHash, H256};
use util::journaldb::{self, Algorithm, JournalDB};
use util::kvdb::Database;
use util::trie::{TrieDB, TrieDBMut, Trie, TrieMut};
use util::sha3::SHA3_NULL_RLP;
use rlp::{RlpStream, Stream, UntrustedRlp, View};
use bloom_journal::Bloom;
use self::account::Account;
use self::block::AbridgedBlock;
use self::io::SnapshotWriter;
use super::state_db::StateDB;
use super::state::Account as StateAccount;
use crossbeam::scope;
use rand::{Rng, OsRng};
pub use self::error::Error;
pub use self::service::{Service, DatabaseRestore};
pub use self::traits::SnapshotService;
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData;
pub use types::restoration_status::RestorationStatus;
pub mod io;
pub mod service;
mod account;
mod block;
mod error;
mod watcher;
#[cfg(test)]
mod tests;
/// IPC interfaces
#[cfg(feature="ipc")]
pub mod remote {
pub use super::traits::RemoteSnapshotService;
}
mod traits {
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/snapshot_service_trait.rs"));
}
// Try to have chunks be around 4MB (before compression)
const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024;
// How many blocks to include in a snapshot, starting from the head of the chain.
const SNAPSHOT_BLOCKS: u64 = 30000;
/// A progress indicator for snapshots.
#[derive(Debug, Default)]
pub struct Progress {
accounts: AtomicUsize,
blocks: AtomicUsize,
size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes.
done: AtomicBool,
}
impl Progress {
/// Reset the progress.
pub fn reset(&self) {
self.accounts.store(0, Ordering::Release);
self.blocks.store(0, Ordering::Release);
self.size.store(0, Ordering::Release);
// atomic fence here to ensure the others are written first?
// logs might very rarely get polluted if not.
self.done.store(false, Ordering::Release);
}
/// Get the number of accounts snapshotted thus far.
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Acquire) }
/// Get the number of blocks snapshotted thus far.
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
/// Get the written size of the snapshot in bytes.
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
/// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
}
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
pub fn take_snapshot(
chain: &BlockChain,
block_at: H256,
state_db: &HashDB,
writer: W,
p: &Progress
) -> Result<(), Error> {
let start_header = try!(chain.block_header(&block_at)
.ok_or(Error::InvalidStartingBlock(BlockID::Hash(block_at))));
let state_root = start_header.state_root();
let number = start_header.number();
info!("Taking snapshot starting at block {}", number);
let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = try!(scope(|scope| {
let block_guard = scope.spawn(|| chunk_blocks(chain, block_at, &writer, p));
let state_res = chunk_state(state_db, state_root, &writer, p);
state_res.and_then(|state_hashes| {
block_guard.join().map(|block_hashes| (state_hashes, block_hashes))
})
}));
info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
let manifest_data = ManifestData {
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: *state_root,
block_number: number,
block_hash: block_at,
};
try!(writer.into_inner().finish(manifest_data));
p.done.store(true, Ordering::SeqCst);
Ok(())
}
/// Used to build block chunks.
struct BlockChunker<'a> {
chain: &'a BlockChain,
// block, receipt rlp pairs.
rlps: VecDeque,
current_hash: H256,
hashes: Vec,
snappy_buffer: Vec,
writer: &'a Mutex,
progress: &'a Progress,
}
impl<'a> BlockChunker<'a> {
// Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash.
// Loops until we reach the first desired block, and writes out the remainder.
fn chunk_all(&mut self) -> Result<(), Error> {
let mut loaded_size = 0;
let mut last = self.current_hash;
let genesis_hash = self.chain.genesis_hash();
for _ in 0..SNAPSHOT_BLOCKS {
if self.current_hash == genesis_hash { break }
let (block, receipts) = try!(self.chain.block(&self.current_hash)
.and_then(|b| self.chain.block_receipts(&self.current_hash).map(|r| (b, r)))
.ok_or(Error::BlockNotFound(self.current_hash)));
let view = BlockView::new(&block);
let abridged_rlp = AbridgedBlock::from_block_view(&view).into_inner();
let pair = {
let mut pair_stream = RlpStream::new_list(2);
pair_stream.append_raw(&abridged_rlp, 1).append(&receipts);
pair_stream.out()
};
let new_loaded_size = loaded_size + pair.len();
// cut off the chunk if too large.
if new_loaded_size > PREFERRED_CHUNK_SIZE && !self.rlps.is_empty() {
try!(self.write_chunk(last));
loaded_size = pair.len();
} else {
loaded_size = new_loaded_size;
}
self.rlps.push_front(pair);
last = self.current_hash;
self.current_hash = view.header_view().parent_hash();
}
if loaded_size != 0 {
try!(self.write_chunk(last));
}
Ok(())
}
// write out the data in the buffers to a chunk on disk
//
// we preface each chunk with the parent of the first block's details,
// obtained from the details of the last block written.
fn write_chunk(&mut self, last: H256) -> Result<(), Error> {
trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len());
let (last_header, last_details) = try!(self.chain.block_header(&last)
.and_then(|n| self.chain.block_details(&last).map(|d| (n, d)))
.ok_or(Error::BlockNotFound(last)));
let parent_number = last_header.number() - 1;
let parent_hash = last_header.parent_hash();
let parent_total_difficulty = last_details.total_difficulty - *last_header.difficulty();
trace!(target: "snapshot", "parent last written block: {}", parent_hash);
let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
rlp_stream.append(&parent_number).append(parent_hash).append(&parent_total_difficulty);
for pair in self.rlps.drain(..) {
rlp_stream.append_raw(&pair, 1);
}
let raw_data = rlp_stream.out();
let size = snappy::compress_into(&raw_data, &mut self.snappy_buffer);
let compressed = &self.snappy_buffer[..size];
let hash = compressed.sha3();
try!(self.writer.lock().write_block_chunk(hash, compressed));
trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len());
self.progress.size.fetch_add(size, Ordering::SeqCst);
self.progress.blocks.fetch_add(num_entries, Ordering::SeqCst);
self.hashes.push(hash);
Ok(())
}
}
/// Create and write out all block chunks to disk, returning a vector of all
/// the hashes of block chunks created.
///
/// The path parameter is the directory to store the block chunks in.
/// This function assumes the directory exists already.
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> {
let mut chunker = BlockChunker {
chain: chain,
rlps: VecDeque::new(),
current_hash: start_hash,
hashes: Vec::new(),
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
};
try!(chunker.chunk_all());
Ok(chunker.hashes)
}
/// State trie chunker.
struct StateChunker<'a> {
hashes: Vec,
rlps: Vec,
cur_size: usize,
snappy_buffer: Vec,
writer: &'a Mutex,
progress: &'a Progress,
}
impl<'a> StateChunker<'a> {
// Push a key, value pair to be encoded.
//
// If the buffer is greater than the desired chunk size,
// this will write out the data to disk.
fn push(&mut self, account_hash: Bytes, data: Bytes) -> Result<(), Error> {
let pair = {
let mut stream = RlpStream::new_list(2);
stream.append(&account_hash).append_raw(&data, 1);
stream.out()
};
if self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE {
try!(self.write_chunk());
}
self.cur_size += pair.len();
self.rlps.push(pair);
Ok(())
}
// Write out the buffer to disk, pushing the created chunk's hash to
// the list.
fn write_chunk(&mut self) -> Result<(), Error> {
let num_entries = self.rlps.len();
let mut stream = RlpStream::new_list(num_entries);
for rlp in self.rlps.drain(..) {
stream.append_raw(&rlp, 1);
}
let raw_data = stream.out();
let compressed_size = snappy::compress_into(&raw_data, &mut self.snappy_buffer);
let compressed = &self.snappy_buffer[..compressed_size];
let hash = compressed.sha3();
try!(self.writer.lock().write_state_chunk(hash, compressed));
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());
self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size, Ordering::SeqCst);
self.hashes.push(hash);
self.cur_size = 0;
Ok(())
}
}
/// Walk the given state database starting from the given root,
/// creating chunks and writing them out.
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> {
let account_trie = try!(TrieDB::new(db, &root));
let mut chunker = StateChunker {
hashes: Vec::new(),
rlps: Vec::new(),
cur_size: 0,
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
};
let mut used_code = HashSet::new();
// account_key here is the address' hash.
for item in try!(account_trie.iter()) {
let (account_key, account_data) = try!(item);
let account = Account::from_thin_rlp(&*account_data);
let account_key_hash = H256::from_slice(&account_key);
let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlp = try!(account.to_fat_rlp(&account_db, &mut used_code));
try!(chunker.push(account_key, fat_rlp));
}
if chunker.cur_size != 0 {
try!(chunker.write_chunk());
}
Ok(chunker.hashes)
}
/// Used to rebuild the state trie piece by piece.
pub struct StateRebuilder {
db: Box,
state_root: H256,
known_code: HashMap, // code hashes mapped to first account with this code.
missing_code: HashMap>, // maps code hashes to lists of accounts missing that code.
bloom: Bloom,
}
impl StateRebuilder {
/// Create a new state rebuilder to write into the given backing DB.
pub fn new(db: Arc, pruning: Algorithm) -> Self {
StateRebuilder {
db: journaldb::new(db.clone(), pruning, ::db::COL_STATE),
state_root: SHA3_NULL_RLP,
known_code: HashMap::new(),
missing_code: HashMap::new(),
bloom: StateDB::load_bloom(&*db),
}
}
/// Feed an uncompressed state chunk into the rebuilder.
pub fn feed(&mut self, chunk: &[u8], flag: &AtomicBool) -> Result<(), ::error::Error> {
let rlp = UntrustedRlp::new(chunk);
let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp();
let mut pairs = Vec::with_capacity(rlp.item_count());
// initialize the pairs vector with empty values so we have slots to write into.
pairs.resize(rlp.item_count(), (H256::new(), Vec::new()));
let status = try!(rebuild_accounts(
self.db.as_hashdb_mut(),
rlp,
&mut pairs,
&self.known_code,
flag
));
for (addr_hash, code_hash) in status.missing_code {
self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash);
}
// patch up all missing code. must be done after collecting all new missing code entries.
for (code_hash, code, first_with) in status.new_code {
for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) {
let mut db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), addr_hash);
db.emplace(code_hash, DBValue::from_slice(&code));
}
self.known_code.insert(code_hash, first_with);
}
let backing = self.db.backing().clone();
// batch trie writes
{
let mut account_trie = if self.state_root != SHA3_NULL_RLP {
try!(TrieDBMut::from_existing(self.db.as_hashdb_mut(), &mut self.state_root))
} else {
TrieDBMut::new(self.db.as_hashdb_mut(), &mut self.state_root)
};
for (hash, thin_rlp) in pairs {
if !flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
if &thin_rlp[..] != &empty_rlp[..] {
self.bloom.set(&*hash);
}
try!(account_trie.insert(&hash, &thin_rlp));
}
}
let bloom_journal = self.bloom.drain_journal();
let mut batch = backing.transaction();
try!(StateDB::commit_bloom(&mut batch, bloom_journal));
try!(self.db.inject(&mut batch));
backing.write_buffered(batch);
trace!(target: "snapshot", "current state root: {:?}", self.state_root);
Ok(())
}
/// Check for accounts missing code. Once all chunks have been fed, there should
/// be none.
pub fn check_missing(self) -> Result<(), Error> {
let missing = self.missing_code.keys().cloned().collect::>();
match missing.is_empty() {
true => Ok(()),
false => Err(Error::MissingCode(missing)),
}
}
/// Get the state root of the rebuilder.
pub fn state_root(&self) -> H256 { self.state_root }
}
#[derive(Default)]
struct RebuiltStatus {
// new code that's become available. (code_hash, code, addr_hash)
new_code: Vec<(H256, Bytes, H256)>,
missing_code: Vec<(H256, H256)>, // accounts that are missing code.
}
// rebuild a set of accounts and their storage.
// returns a status detailing newly-loaded code and accounts missing code.
fn rebuild_accounts(
db: &mut HashDB,
account_fat_rlps: UntrustedRlp,
out_chunk: &mut [(H256, Bytes)],
known_code: &HashMap,
abort_flag: &AtomicBool,
) -> Result {
let mut status = RebuiltStatus::default();
for (account_rlp, out) in account_fat_rlps.into_iter().zip(out_chunk) {
if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
let hash: H256 = try!(account_rlp.val_at(0));
let fat_rlp = try!(account_rlp.at(1));
let thin_rlp = {
// fill out the storage trie and code while decoding.
let (acc, maybe_code) = {
let mut acct_db = AccountDBMut::from_hash(db, hash);
try!(Account::from_fat_rlp(&mut acct_db, fat_rlp))
};
let code_hash = acc.code_hash().clone();
match maybe_code {
// new inline code
Some(code) => status.new_code.push((code_hash, code, hash)),
None => {
if code_hash != ::util::SHA3_EMPTY {
// see if this code has already been included inline
match known_code.get(&code_hash) {
Some(&first_with) => {
// if so, load it from the database.
let code = try!(AccountDB::from_hash(db, first_with)
.get(&code_hash)
.ok_or_else(|| Error::MissingCode(vec![first_with])));
// and write it again under a different mangled key
AccountDBMut::from_hash(db, hash).emplace(code_hash, code);
}
// if not, queue it up to be filled later
None => status.missing_code.push((hash, code_hash)),
}
}
}
}
acc.to_thin_rlp()
};
*out = (hash, thin_rlp);
}
Ok(status)
}
/// Proportion of blocks which we will verify `PoW` for.
const POW_VERIFY_RATE: f32 = 0.02;
/// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform
/// the fullest verification possible. If not, it will take a random sample to determine whether it will
/// do heavy or light verification.
pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &Engine, chain: &BlockChain, body: Option<&[u8]>, always: bool) -> Result<(), ::error::Error> {
if always || rng.gen::() <= POW_VERIFY_RATE {
match chain.block_header(header.parent_hash()) {
Some(parent) => engine.verify_block_family(header, &parent, body),
None => engine.verify_block_seal(header),
}
} else {
engine.verify_block_basic(header, body)
}
}
/// Rebuilds the blockchain from chunks.
///
/// Does basic verification for all blocks, but `PoW` verification for some.
/// Blocks must be fed in-order.
///
/// The first block in every chunk is disconnected from the last block in the
/// chunk before it, as chunks may be submitted out-of-order.
///
/// After all chunks have been submitted, we "glue" the chunks together.
pub struct BlockRebuilder {
chain: BlockChain,
db: Arc,
rng: OsRng,
disconnected: Vec<(u64, H256)>,
best_number: u64,
best_hash: H256,
best_root: H256,
fed_blocks: u64,
}
impl BlockRebuilder {
/// Create a new BlockRebuilder.
pub fn new(chain: BlockChain, db: Arc, manifest: &ManifestData) -> Result {
Ok(BlockRebuilder {
chain: chain,
db: db,
rng: try!(OsRng::new()),
disconnected: Vec::new(),
best_number: manifest.block_number,
best_hash: manifest.block_hash,
best_root: manifest.state_root,
fed_blocks: 0,
})
}
/// Feed the rebuilder an uncompressed block chunk.
/// Returns the number of blocks fed or any errors.
pub fn feed(&mut self, chunk: &[u8], engine: &Engine, abort_flag: &AtomicBool) -> Result {
use basic_types::Seal::With;
use util::U256;
use util::triehash::ordered_trie_root;
let rlp = UntrustedRlp::new(chunk);
let item_count = rlp.item_count();
let num_blocks = (item_count - 3) as u64;
trace!(target: "snapshot", "restoring block chunk with {} blocks.", item_count - 3);
if self.fed_blocks + num_blocks > SNAPSHOT_BLOCKS {
return Err(Error::TooManyBlocks(SNAPSHOT_BLOCKS, self.fed_blocks).into())
}
// todo: assert here that these values are consistent with chunks being in order.
let mut cur_number = try!(rlp.val_at::(0)) + 1;
let mut parent_hash = try!(rlp.val_at::(1));
let parent_total_difficulty = try!(rlp.val_at::(2));
for idx in 3..item_count {
if !abort_flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
let pair = try!(rlp.at(idx));
let abridged_rlp = try!(pair.at(0)).as_raw().to_owned();
let abridged_block = AbridgedBlock::from_raw(abridged_rlp);
let receipts: Vec<::receipt::Receipt> = try!(pair.val_at(1));
let receipts_root = ordered_trie_root(
try!(pair.at(1)).iter().map(|r| r.as_raw().to_owned())
);
let block = try!(abridged_block.to_block(parent_hash, cur_number, receipts_root));
let block_bytes = block.rlp_bytes(With);
let is_best = cur_number == self.best_number;
if is_best {
if block.header.hash() != self.best_hash {
return Err(Error::WrongBlockHash(cur_number, self.best_hash, block.header.hash()).into())
}
if block.header.state_root() != &self.best_root {
return Err(Error::WrongStateRoot(self.best_root, *block.header.state_root()).into())
}
}
try!(verify_old_block(
&mut self.rng,
&block.header,
engine,
&self.chain,
Some(&block_bytes),
is_best
));
let mut batch = self.db.transaction();
// special-case the first block in each chunk.
if idx == 3 {
if self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, Some(parent_total_difficulty), is_best, false) {
self.disconnected.push((cur_number, block.header.hash()));
}
} else {
self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false);
}
self.db.write_buffered(batch);
self.chain.commit();
parent_hash = BlockView::new(&block_bytes).hash();
cur_number += 1;
}
self.fed_blocks += num_blocks;
Ok(num_blocks)
}
/// Glue together any disconnected chunks and check that the chain is complete.
pub fn finalize(self, canonical: HashMap) -> Result<(), Error> {
let mut batch = self.db.transaction();
for (first_num, first_hash) in self.disconnected {
let parent_num = first_num - 1;
// check if the parent is even in the chain.
// since we don't restore every single block in the chain,
// the first block of the first chunks has nothing to connect to.
if let Some(parent_hash) = self.chain.block_hash(parent_num) {
// if so, add the child to it.
self.chain.add_child(&mut batch, parent_hash, first_hash);
}
}
self.db.write_buffered(batch);
let best_number = self.best_number;
for num in (0..self.fed_blocks).map(|x| best_number - x) {
let hash = try!(self.chain.block_hash(num).ok_or(Error::IncompleteChain));
if let Some(canon_hash) = canonical.get(&num).cloned() {
if canon_hash != hash {
return Err(Error::WrongBlockHash(num, canon_hash, hash));
}
}
}
Ok(())
}
}