Merge pull request #1577 from ethcore/pv64

bring snapshotting work into master
This commit is contained in:
Nikolay Volf 2016-07-12 13:46:55 +02:00 committed by GitHub
commit d956b7cea3
12 changed files with 1049 additions and 4 deletions

View File

@ -19,6 +19,7 @@
use std::path::*;
use std::fs;
use std::env;
use std::ops::{Deref, DerefMut};
use rand::random;
pub struct RandomTempPath {
@ -93,6 +94,16 @@ impl<T> GuardedTempResult<T> {
}
}
impl<T> Deref for GuardedTempResult<T> {
type Target = T;
fn deref(&self) -> &T { self.result.as_ref().unwrap() }
}
impl<T> DerefMut for GuardedTempResult<T> {
fn deref_mut(&mut self) -> &mut T { self.result.as_mut().unwrap() }
}
#[test]
fn creates_dir() {
let temp = RandomTempPath::create_dir();

View File

@ -24,7 +24,7 @@ use trace::Trace;
use evm::Factory as EvmFactory;
/// A block, encoded as it is on the block chain.
#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, PartialEq)]
pub struct Block {
/// The header of this block.
pub header: Header,

View File

@ -14,13 +14,12 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::path::PathBuf;
use std::collections::{HashSet, HashMap};
use std::ops::Deref;
use std::mem;
use std::collections::VecDeque;
use std::sync::*;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::Instant;

View File

@ -230,6 +230,10 @@ pub enum Error {
PowInvalid,
/// Error concerning TrieDBs
Trie(TrieError),
/// Io error.
Io(::std::io::Error),
/// Snappy error.
Snappy(::util::snappy::InvalidInput),
}
impl fmt::Display for Error {
@ -246,6 +250,8 @@ impl fmt::Display for Error {
Error::PowHashInvalid => f.write_str("Invalid or out of date PoW hash."),
Error::PowInvalid => f.write_str("Invalid nonce or mishash"),
Error::Trie(ref err) => f.write_fmt(format_args!("{}", err)),
Error::Io(ref err) => f.write_fmt(format_args!("{}", err)),
Error::Snappy(ref err) => f.write_fmt(format_args!("{}", err)),
}
}
}
@ -313,6 +319,18 @@ impl From<TrieError> for Error {
}
}
impl From<::std::io::Error> for Error {
fn from(err: ::std::io::Error) -> Error {
Error::Io(err)
}
}
impl From<::util::snappy::InvalidInput> for Error {
fn from(err: ::util::snappy::InvalidInput) -> Error {
Error::Snappy(err)
}
}
impl From<BlockImportError> for Error {
fn from(err: BlockImportError) -> Error {
match err {

View File

@ -149,6 +149,16 @@ impl Header {
/// Set the number field of the header.
pub fn set_parent_hash(&mut self, a: H256) { self.parent_hash = a; self.note_dirty(); }
/// Set the uncles hash field of the header.
pub fn set_uncles_hash(&mut self, a: H256) { self.uncles_hash = a; self.note_dirty(); }
/// Set the state root field of the header.
pub fn set_state_root(&mut self, a: H256) { self.state_root = a; self.note_dirty(); }
/// Set the transactions root field of the header.
pub fn set_transactions_root(&mut self, a: H256) { self.transactions_root = a; self.note_dirty() }
/// Set the receipts root field of the header.
pub fn set_receipts_root(&mut self, a: H256) { self.receipts_root = a; self.note_dirty() }
/// Set the log bloom field of the header.
pub fn set_log_bloom(&mut self, a: LogBloom) { self.log_bloom = a; self.note_dirty() }
/// Set the timestamp field of the header.
pub fn set_timestamp(&mut self, a: u64) { self.timestamp = a; self.note_dirty(); }
/// Set the timestamp field of the header to the current time.

View File

@ -118,8 +118,9 @@ pub mod pod_state;
pub mod engine;
pub mod migrations;
pub mod miner;
#[macro_use] pub mod evm;
pub mod snapshot;
pub mod action_params;
#[macro_use] pub mod evm;
mod blooms;
mod db;

View File

@ -0,0 +1,211 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Account state encoding and decoding
use account_db::{AccountDB, AccountDBMut};
use error::Error;
use util::{Bytes, HashDB, SHA3_EMPTY, TrieDB};
use util::hash::{FixedHash, H256};
use util::numbers::U256;
use util::rlp::{DecoderError, Rlp, RlpStream, Stream, UntrustedRlp, View};
// An alternate account structure from ::account::Account.
#[derive(PartialEq, Clone, Debug)]
pub struct Account {
nonce: U256,
balance: U256,
storage_root: H256,
code_hash: H256,
}
impl Account {
// decode the account from rlp.
pub fn from_thin_rlp(rlp: &[u8]) -> Self {
let r: Rlp = Rlp::new(rlp);
Account {
nonce: r.val_at(0),
balance: r.val_at(1),
storage_root: r.val_at(2),
code_hash: r.val_at(3),
}
}
// encode the account to a standard rlp.
pub fn to_thin_rlp(&self) -> Bytes {
let mut stream = RlpStream::new_list(4);
stream
.append(&self.nonce)
.append(&self.balance)
.append(&self.storage_root)
.append(&self.code_hash);
stream.out()
}
// walk the account's storage trie, returning an RLP item containing the
// account properties and the storage.
pub fn to_fat_rlp(&self, acct_db: &AccountDB) -> Result<Bytes, Error> {
let db = try!(TrieDB::new(acct_db, &self.storage_root));
let mut pairs = Vec::new();
for (k, v) in db.iter() {
pairs.push((k, v));
}
let mut stream = RlpStream::new_list(pairs.len());
for (k, v) in pairs {
stream.begin_list(2).append(&k).append(&v);
}
let pairs_rlp = stream.out();
let mut account_stream = RlpStream::new_list(5);
account_stream.append(&self.nonce)
.append(&self.balance);
// [has_code, code_hash].
if self.code_hash == SHA3_EMPTY {
account_stream.append(&false).append_empty_data();
} else {
match acct_db.get(&self.code_hash) {
Some(c) => {
account_stream.append(&true).append(&c);
}
None => {
warn!("code lookup failed during snapshot");
account_stream.append(&false).append_empty_data();
}
}
}
account_stream.append_raw(&pairs_rlp, 1);
Ok(account_stream.out())
}
// decode a fat rlp, and rebuild the storage trie as we go.
pub fn from_fat_rlp(acct_db: &mut AccountDBMut, rlp: UntrustedRlp) -> Result<Self, DecoderError> {
use util::{TrieDBMut, TrieMut};
let nonce = try!(rlp.val_at(0));
let balance = try!(rlp.val_at(1));
let code_hash = if try!(rlp.val_at(2)) {
let code: Bytes = try!(rlp.val_at(3));
acct_db.insert(&code)
} else {
SHA3_EMPTY
};
let mut storage_root = H256::zero();
{
let mut storage_trie = TrieDBMut::new(acct_db, &mut storage_root);
let pairs = try!(rlp.at(4));
for pair_rlp in pairs.iter() {
let k: Bytes = try!(pair_rlp.val_at(0));
let v: Bytes = try!(pair_rlp.val_at(1));
storage_trie.insert(&k, &v);
}
}
Ok(Account {
nonce: nonce,
balance: balance,
storage_root: storage_root,
code_hash: code_hash,
})
}
}
#[cfg(test)]
mod tests {
use account_db::{AccountDB, AccountDBMut};
use tests::helpers::get_temp_journal_db;
use util::{SHA3_NULL_RLP, SHA3_EMPTY};
use util::hash::{Address, FixedHash, H256};
use util::rlp::{UntrustedRlp, View};
use util::trie::{Alphabet, StandardMap, SecTrieDBMut, TrieMut, ValueMode};
use super::Account;
fn fill_storage(mut db: AccountDBMut) -> H256 {
let map = StandardMap {
alphabet: Alphabet::All,
min_key: 6,
journal_key: 6,
value_mode: ValueMode::Random,
count: 100
};
let mut root = H256::new();
{
let mut trie = SecTrieDBMut::new(&mut db, &mut root);
for (k, v) in map.make() {
trie.insert(&k, &v);
}
}
root
}
#[test]
fn encoding_basic() {
let mut db = get_temp_journal_db();
let mut db = &mut **db;
let addr = Address::random();
let account = Account {
nonce: 50.into(),
balance: 123456789.into(),
storage_root: SHA3_NULL_RLP,
code_hash: SHA3_EMPTY,
};
let thin_rlp = account.to_thin_rlp();
assert_eq!(Account::from_thin_rlp(&thin_rlp), account);
let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap();
let fat_rlp = UntrustedRlp::new(&fat_rlp);
assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account);
}
#[test]
fn encoding_storage() {
let mut db = get_temp_journal_db();
let mut db = &mut **db;
let addr = Address::random();
let root = fill_storage(AccountDBMut::new(db.as_hashdb_mut(), &addr));
let account = Account {
nonce: 25.into(),
balance: 987654321.into(),
storage_root: root,
code_hash: SHA3_EMPTY,
};
let thin_rlp = account.to_thin_rlp();
assert_eq!(Account::from_thin_rlp(&thin_rlp), account);
let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap();
let fat_rlp = UntrustedRlp::new(&fat_rlp);
assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account);
}
}

View File

@ -0,0 +1,211 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Block RLP compression.
// TODO [rob] remove when BlockRebuilder done.
#![allow(dead_code)]
use block::Block;
use header::Header;
use views::BlockView;
use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View};
use util::{Bytes, Hashable, H256};
const HEADER_FIELDS: usize = 10;
const BLOCK_FIELDS: usize = 2;
pub struct AbridgedBlock {
rlp: Bytes,
}
impl AbridgedBlock {
/// Create from a vector of bytes. Does no verification.
pub fn from_raw(rlp: Bytes) -> Self {
AbridgedBlock {
rlp: rlp,
}
}
/// Return the inner bytes.
pub fn into_inner(self) -> Bytes {
self.rlp
}
/// Given a full block view, trim out the parent hash and block number,
/// producing new rlp.
pub fn from_block_view(block_view: &BlockView) -> Self {
let header = block_view.header_view();
let seal_fields = header.seal();
// 10 header fields, unknown amount of seal fields, and 2 block fields.
let mut stream = RlpStream::new_list(
HEADER_FIELDS +
seal_fields.len() +
BLOCK_FIELDS
);
// write header values.
stream
.append(&header.author())
.append(&header.state_root())
.append(&header.transactions_root())
.append(&header.receipts_root())
.append(&header.log_bloom())
.append(&header.difficulty())
.append(&header.gas_limit())
.append(&header.gas_used())
.append(&header.timestamp())
.append(&header.extra_data());
// write block values.
stream.append(&block_view.transactions()).append(&block_view.uncles());
// write seal fields.
for field in seal_fields {
stream.append_raw(&field, 1);
}
AbridgedBlock {
rlp: stream.out(),
}
}
/// Flesh out an abridged block view with the provided parent hash and block number.
///
/// Will fail if contains invalid rlp.
pub fn to_block(&self, parent_hash: H256, number: u64) -> Result<Block, DecoderError> {
let rlp = UntrustedRlp::new(&self.rlp);
let mut header = Header {
parent_hash: parent_hash,
author: try!(rlp.val_at(0)),
state_root: try!(rlp.val_at(1)),
transactions_root: try!(rlp.val_at(2)),
receipts_root: try!(rlp.val_at(3)),
log_bloom: try!(rlp.val_at(4)),
difficulty: try!(rlp.val_at(5)),
number: number,
gas_limit: try!(rlp.val_at(6)),
gas_used: try!(rlp.val_at(7)),
timestamp: try!(rlp.val_at(8)),
extra_data: try!(rlp.val_at(9)),
..Default::default()
};
let transactions = try!(rlp.val_at(10));
let uncles: Vec<Header> = try!(rlp.val_at(11));
// iterator-based approach is cleaner but doesn't work w/ try.
let seal = {
let mut seal = Vec::new();
for i in 12..rlp.item_count() {
seal.push(try!(rlp.val_at(i)));
}
seal
};
header.set_seal(seal);
let uncle_bytes = uncles.iter()
.fold(RlpStream::new_list(uncles.len()), |mut s, u| {
s.append_raw(&u.rlp(::basic_types::Seal::With), 1);
s
}).out();
header.uncles_hash = uncle_bytes.sha3();
Ok(Block {
header: header,
transactions: transactions,
uncles: uncles,
})
}
}
#[cfg(test)]
mod tests {
use views::BlockView;
use block::Block;
use super::AbridgedBlock;
use types::transaction::{Action, Transaction};
use util::numbers::U256;
use util::hash::{Address, H256, FixedHash};
use util::{Bytes, RlpStream, Stream};
fn encode_block(b: &Block) -> Bytes {
let mut s = RlpStream::new_list(3);
b.header.stream_rlp(&mut s, ::basic_types::Seal::With);
s.append(&b.transactions);
s.append(&b.uncles);
s.out()
}
#[test]
fn empty_block_abridging() {
let b = Block::default();
let encoded = encode_block(&b);
let abridged = AbridgedBlock::from_block_view(&BlockView::new(&encoded));
assert_eq!(abridged.to_block(H256::new(), 0).unwrap(), b);
}
#[test]
#[should_panic]
fn wrong_number() {
let b = Block::default();
let encoded = encode_block(&b);
let abridged = AbridgedBlock::from_block_view(&BlockView::new(&encoded));
assert_eq!(abridged.to_block(H256::new(), 2).unwrap(), b);
}
#[test]
fn with_transactions() {
let mut b = Block::default();
let t1 = Transaction {
action: Action::Create,
nonce: U256::from(42),
gas_price: U256::from(3000),
gas: U256::from(50_000),
value: U256::from(1),
data: b"Hello!".to_vec()
}.fake_sign(Address::from(0x69));
let t2 = Transaction {
action: Action::Create,
nonce: U256::from(88),
gas_price: U256::from(12345),
gas: U256::from(300000),
value: U256::from(1000000000),
data: "Eep!".into(),
}.fake_sign(Address::from(0x55));
b.transactions.push(t1);
b.transactions.push(t2);
let encoded = encode_block(&b);
let abridged = AbridgedBlock::from_block_view(&BlockView::new(&encoded[..]));
assert_eq!(abridged.to_block(H256::new(), 0).unwrap(), b);
}
}

417
ethcore/src/snapshot/mod.rs Normal file
View File

@ -0,0 +1,417 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Snapshot creation helpers.
use std::collections::VecDeque;
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use account_db::{AccountDB, AccountDBMut};
use client::BlockChainClient;
use error::Error;
use ids::BlockID;
use views::{BlockView, HeaderView};
use util::{Bytes, Hashable, HashDB, JournalDB, snappy, TrieDB, TrieDBMut, TrieMut};
use util::hash::{FixedHash, H256};
use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View};
use self::account::Account;
use self::block::AbridgedBlock;
use crossbeam::{scope, ScopedJoinHandle};
mod account;
mod block;
// Try to have chunks be around 16MB (before compression)
const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024;
/// Take a snapshot using the given client and database, writing into `path`.
pub fn take_snapshot(client: &BlockChainClient, mut path: PathBuf, state_db: &HashDB) -> Result<(), Error> {
let chain_info = client.chain_info();
let genesis_hash = chain_info.genesis_hash;
let best_header_raw = client.best_block_header();
let best_header = HeaderView::new(&best_header_raw);
let state_root = best_header.state_root();
trace!(target: "snapshot", "Taking snapshot starting at block {}", best_header.number());
let _ = create_dir_all(&path);
let state_hashes = try!(chunk_state(state_db, &state_root, &path));
let block_hashes = try!(chunk_blocks(client, best_header.hash(), genesis_hash, &path));
trace!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
let manifest_data = ManifestData {
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: state_root,
block_number: chain_info.best_block_number,
block_hash: chain_info.best_block_hash,
};
path.push("MANIFEST");
let mut manifest_file = try!(File::create(&path));
try!(manifest_file.write_all(&manifest_data.to_rlp()));
Ok(())
}
// shared portion of write_chunk
// returns either a (hash, compressed_size) pair or an io error.
fn write_chunk(raw_data: &[u8], compression_buffer: &mut Vec<u8>, path: &Path) -> Result<(H256, usize), Error> {
let compressed_size = snappy::compress_into(raw_data, compression_buffer);
let compressed = &compression_buffer[..compressed_size];
let hash = compressed.sha3();
let mut file_path = path.to_owned();
file_path.push(hash.hex());
let mut file = try!(File::create(file_path));
try!(file.write_all(compressed));
Ok((hash, compressed_size))
}
/// Used to build block chunks.
struct BlockChunker<'a> {
client: &'a BlockChainClient,
// block, receipt rlp pairs.
rlps: VecDeque<Bytes>,
current_hash: H256,
hashes: Vec<H256>,
snappy_buffer: Vec<u8>,
}
impl<'a> BlockChunker<'a> {
// Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash.
// Loops until we reach the genesis, and writes out the remainder.
fn chunk_all(&mut self, genesis_hash: H256, path: &Path) -> Result<(), Error> {
let mut loaded_size = 0;
while self.current_hash != genesis_hash {
let block = self.client.block(BlockID::Hash(self.current_hash))
.expect("started from the head of chain and walking backwards; client stores full chain; qed");
let view = BlockView::new(&block);
let abridged_rlp = AbridgedBlock::from_block_view(&view).into_inner();
let receipts = self.client.block_receipts(&self.current_hash)
.expect("started from head of chain and walking backwards; client stores full chain; qed");
let pair = {
let mut pair_stream = RlpStream::new_list(2);
pair_stream.append(&abridged_rlp).append(&receipts);
pair_stream.out()
};
let new_loaded_size = loaded_size + pair.len();
// cut off the chunk if too large
if new_loaded_size > PREFERRED_CHUNK_SIZE {
let header = view.header_view();
try!(self.write_chunk(header.parent_hash(), header.number(), path));
loaded_size = pair.len();
} else {
loaded_size = new_loaded_size;
}
self.rlps.push_front(pair);
self.current_hash = view.header_view().parent_hash();
}
if loaded_size != 0 {
// we don't store the genesis block, so once we get to this point,
// the "first" block will be number 1.
try!(self.write_chunk(genesis_hash, 1, path));
}
Ok(())
}
// write out the data in the buffers to a chunk on disk
fn write_chunk(&mut self, parent_hash: H256, number: u64, path: &Path) -> Result<(), Error> {
trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len());
let mut rlp_stream = RlpStream::new_list(self.rlps.len() + 2);
rlp_stream.append(&parent_hash).append(&number);
for pair in self.rlps.drain(..) {
rlp_stream.append_raw(&pair, 1);
}
let raw_data = rlp_stream.out();
let (hash, size) = try!(write_chunk(&raw_data, &mut self.snappy_buffer, path));
trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len());
self.hashes.push(hash);
Ok(())
}
}
/// Create and write out all block chunks to disk, returning a vector of all
/// the hashes of block chunks created.
///
/// The path parameter is the directory to store the block chunks in.
/// This function assumes the directory exists already.
pub fn chunk_blocks(client: &BlockChainClient, best_block_hash: H256, genesis_hash: H256, path: &Path) -> Result<Vec<H256>, Error> {
let mut chunker = BlockChunker {
client: client,
rlps: VecDeque::new(),
current_hash: best_block_hash,
hashes: Vec::new(),
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
};
try!(chunker.chunk_all(genesis_hash, path));
Ok(chunker.hashes)
}
/// State trie chunker.
struct StateChunker<'a> {
hashes: Vec<H256>,
rlps: Vec<Bytes>,
cur_size: usize,
snapshot_path: &'a Path,
snappy_buffer: Vec<u8>,
}
impl<'a> StateChunker<'a> {
// Push a key, value pair to be encoded.
//
// If the buffer is greater than the desired chunk size,
// this will write out the data to disk.
fn push(&mut self, account_hash: Bytes, data: Bytes) -> Result<(), Error> {
let pair = {
let mut stream = RlpStream::new_list(2);
stream.append(&account_hash).append_raw(&data, 1);
stream.out()
};
if self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE {
try!(self.write_chunk());
}
self.cur_size += pair.len();
self.rlps.push(pair);
Ok(())
}
// Write out the buffer to disk, pushing the created chunk's hash to
// the list.
fn write_chunk(&mut self) -> Result<(), Error> {
let mut stream = RlpStream::new_list(self.rlps.len());
for rlp in self.rlps.drain(..) {
stream.append_raw(&rlp, 1);
}
let raw_data = stream.out();
let (hash, compressed_size) = try!(write_chunk(&raw_data, &mut self.snappy_buffer, self.snapshot_path));
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());
self.hashes.push(hash);
self.cur_size = 0;
Ok(())
}
}
/// Walk the given state database starting from the given root,
/// creating chunks and writing them out.
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state(db: &HashDB, root: &H256, path: &Path) -> Result<Vec<H256>, Error> {
let account_view = try!(TrieDB::new(db, &root));
let mut chunker = StateChunker {
hashes: Vec::new(),
rlps: Vec::new(),
cur_size: 0,
snapshot_path: path,
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
};
trace!(target: "snapshot", "beginning state chunking");
// account_key here is the address' hash.
for (account_key, account_data) in account_view.iter() {
let account = Account::from_thin_rlp(account_data);
let account_key_hash = H256::from_slice(&account_key);
let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlp = try!(account.to_fat_rlp(&account_db));
try!(chunker.push(account_key, fat_rlp));
}
if chunker.cur_size != 0 {
try!(chunker.write_chunk());
}
Ok(chunker.hashes)
}
/// Manifest data.
pub struct ManifestData {
/// List of state chunk hashes.
pub state_hashes: Vec<H256>,
/// List of block chunk hashes.
pub block_hashes: Vec<H256>,
/// The final, expected state root.
pub state_root: H256,
/// Block number this snapshot was taken at.
pub block_number: u64,
/// Block hash this snapshot was taken at.
pub block_hash: H256,
}
impl ManifestData {
/// Encode the manifest data to rlp.
pub fn to_rlp(self) -> Bytes {
let mut stream = RlpStream::new_list(5);
stream.append(&self.state_hashes);
stream.append(&self.block_hashes);
stream.append(&self.state_root);
stream.append(&self.block_number);
stream.append(&self.block_hash);
stream.out()
}
/// Try to restore manifest data from raw bytes, interpreted as RLP.
pub fn from_rlp(raw: &[u8]) -> Result<Self, DecoderError> {
let decoder = UntrustedRlp::new(raw);
let state_hashes: Vec<H256> = try!(decoder.val_at(0));
let block_hashes: Vec<H256> = try!(decoder.val_at(1));
let state_root: H256 = try!(decoder.val_at(2));
let block_number: u64 = try!(decoder.val_at(3));
let block_hash: H256 = try!(decoder.val_at(4));
Ok(ManifestData {
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: state_root,
block_number: block_number,
block_hash: block_hash,
})
}
}
/// Used to rebuild the state trie piece by piece.
pub struct StateRebuilder {
db: Box<JournalDB>,
state_root: H256,
snappy_buffer: Vec<u8>
}
impl StateRebuilder {
/// Create a new state rebuilder to write into the given backing DB.
pub fn new(db: Box<JournalDB>) -> Self {
StateRebuilder {
db: db,
state_root: H256::zero(),
snappy_buffer: Vec::new(),
}
}
/// Feed a compressed state chunk into the rebuilder.
pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> {
let len = try!(snappy::decompress_into(compressed, &mut self.snappy_buffer));
let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]);
let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect();
let mut pairs = Vec::with_capacity(rlp.item_count());
// initialize the pairs vector with empty values so we have slots to write into.
for _ in 0..rlp.item_count() {
pairs.push((H256::new(), Vec::new()));
}
let chunk_size = account_fat_rlps.len() / ::num_cpus::get();
// build account tries in parallel.
try!(scope(|scope| {
let mut handles = Vec::new();
for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) {
let mut db = self.db.boxed_clone();
let handle: ScopedJoinHandle<Result<(), Error>> = scope.spawn(move || {
try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk));
// commit the db changes we made in this thread.
try!(db.commit(0, &H256::zero(), None));
Ok(())
});
handles.push(handle);
}
// see if we got any errors.
for handle in handles {
try!(handle.join());
}
Ok::<_, Error>(())
}));
// batch trie writes
{
let mut account_trie = if self.state_root != H256::zero() {
try!(TrieDBMut::from_existing(self.db.as_hashdb_mut(), &mut self.state_root))
} else {
TrieDBMut::new(self.db.as_hashdb_mut(), &mut self.state_root)
};
for (hash, thin_rlp) in pairs {
account_trie.insert(&hash, &thin_rlp);
}
}
try!(self.db.commit(0, &H256::zero(), None));
Ok(())
}
/// Get the state root of the rebuilder.
pub fn state_root(&self) -> H256 { self.state_root }
}
fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), Error> {
for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) {
let account_rlp = UntrustedRlp::new(account_pair);
let hash: H256 = try!(account_rlp.val_at(0));
let fat_rlp = try!(account_rlp.at(1));
let thin_rlp = {
let mut acct_db = AccountDBMut::from_hash(db.as_hashdb_mut(), hash);
// fill out the storage trie and code while decoding.
let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp));
acc.to_thin_rlp()
};
*out = (hash, thin_rlp);
}
Ok(())
}

View File

@ -65,6 +65,8 @@ pub enum UtilError {
SimpleString(String),
/// Error from a bad input size being given for the needed output.
BadSize,
/// Error from snappy.
Snappy(::snappy::InvalidInput),
}
impl fmt::Display for UtilError {
@ -82,6 +84,7 @@ impl fmt::Display for UtilError {
UtilError::Decoder(ref err) => f.write_fmt(format_args!("{}", err)),
UtilError::SimpleString(ref msg) => f.write_str(&msg),
UtilError::BadSize => f.write_str("Bad input size."),
UtilError::Snappy(ref err) => f.write_fmt(format_args!("{}", err)),
}
}
}
@ -179,6 +182,12 @@ impl From<String> for UtilError {
}
}
impl From<::snappy::InvalidInput> for UtilError {
fn from(err: ::snappy::InvalidInput) -> UtilError {
UtilError::Snappy(err)
}
}
// TODO: uncomment below once https://github.com/rust-lang/rust/issues/27336 sorted.
/*#![feature(concat_idents)]
macro_rules! assimilate {

View File

@ -153,6 +153,7 @@ pub mod log;
pub mod panics;
pub mod network_settings;
pub mod path;
pub mod snappy;
mod timer;
pub use common::*;

157
util/src/snappy.rs Normal file
View File

@ -0,0 +1,157 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Snappy compression bindings.
use std::fmt;
use libc::{c_char, c_int, size_t};
const SNAPPY_OK: c_int = 0;
const SNAPPY_INVALID_INPUT: c_int = 1;
const SNAPPY_BUFFER_TOO_SMALL: c_int = 2;
#[link(name = "snappy")]
extern {
fn snappy_compress(
input: *const c_char,
input_len: size_t,
compressed: *mut c_char,
compressed_len: *mut size_t
) -> c_int;
fn snappy_max_compressed_length(source_len: size_t) -> size_t;
fn snappy_uncompress(
compressed: *const c_char,
compressed_len: size_t,
uncompressed: *mut c_char,
uncompressed_len: *mut size_t,
) -> c_int;
fn snappy_uncompressed_length(
compressed: *const c_char,
compressed_len: size_t,
result: *mut size_t,
) -> c_int;
fn snappy_validate_compressed_buffer(
compressed: *const c_char,
compressed_len: size_t,
) -> c_int;
}
/// Attempted to decompress an uncompressed buffer.
#[derive(Debug)]
pub struct InvalidInput;
impl fmt::Display for InvalidInput {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Attempted snappy decompression with invalid input")
}
}
/// The maximum compressed length given a size.
pub fn max_compressed_len(len: usize) -> usize {
unsafe { snappy_max_compressed_length(len as size_t) as usize }
}
/// How large the given data will be when decompressed.
pub fn decompressed_len(compressed: &[u8]) -> Result<usize, InvalidInput> {
let mut size: size_t = 0;
let len = compressed.len() as size_t;
let status = unsafe { snappy_uncompressed_length(compressed.as_ptr() as *const c_char, len, &mut size) };
if status == SNAPPY_INVALID_INPUT {
Err(InvalidInput)
} else {
Ok(size)
}
}
/// Compress a buffer using snappy.
pub fn compress(input: &[u8]) -> Vec<u8> {
let mut buf = Vec::new();
compress_into(input, &mut buf);
buf
}
/// Compress a buffer using snappy, writing the result into
/// the given output buffer, growing it if necessary.
/// Otherwise, returns the length of the compressed data.
pub fn compress_into(input: &[u8], output: &mut Vec<u8>) -> usize {
let mut len = max_compressed_len(input.len());
if output.len() < len {
output.resize(len, 0);
}
let status = unsafe {
snappy_compress(
input.as_ptr() as *const c_char,
input.len() as size_t,
output.as_mut_ptr() as *mut c_char,
&mut len as &mut size_t,
)
};
match status {
SNAPPY_OK => len,
SNAPPY_INVALID_INPUT => panic!("snappy compression has no concept of invalid input"),
SNAPPY_BUFFER_TOO_SMALL => panic!("buffer cannot be too small, the capacity was just ensured."),
_ => panic!("snappy returned unspecified status"),
}
}
/// Decompress a buffer using snappy. Will return an error if the buffer is not snappy-compressed.
pub fn decompress(input: &[u8]) -> Result<Vec<u8>, InvalidInput> {
let mut v = Vec::new();
decompress_into(input, &mut v).map(|_| v)
}
/// Decompress a buffer using snappy, writing the result into
/// the given output buffer, growing it if necessary.
/// Will error if the input buffer is not snappy-compressed.
/// Otherwise, returns the length of the decompressed data.
pub fn decompress_into(input: &[u8], output: &mut Vec<u8>) -> Result<usize, InvalidInput> {
let mut len = try!(decompressed_len(input));
if output.len() < len {
output.resize(len, 0);
}
let status = unsafe {
snappy_uncompress(
input.as_ptr() as *const c_char,
input.len() as size_t,
output.as_mut_ptr() as *mut c_char,
&mut len as &mut size_t,
)
};
match status {
SNAPPY_OK => Ok(len as usize),
SNAPPY_INVALID_INPUT => Err(InvalidInput),
SNAPPY_BUFFER_TOO_SMALL => panic!("buffer cannot be too small, size was just set to large enough."),
_ => panic!("snappy returned unspecified status"),
}
}
/// Validate a compressed buffer. True if valid, false if not.
pub fn validate_compressed_buffer(input: &[u8]) -> bool {
let status = unsafe { snappy_validate_compressed_buffer(input.as_ptr() as *const c_char, input.len() as size_t )};
status == SNAPPY_OK
}