diff --git a/devtools/src/random_path.rs b/devtools/src/random_path.rs index 7c1fd19ea..f9c454c30 100644 --- a/devtools/src/random_path.rs +++ b/devtools/src/random_path.rs @@ -19,6 +19,7 @@ use std::path::*; use std::fs; use std::env; +use std::ops::{Deref, DerefMut}; use rand::random; pub struct RandomTempPath { @@ -93,6 +94,16 @@ impl GuardedTempResult { } } +impl Deref for GuardedTempResult { + type Target = T; + + fn deref(&self) -> &T { self.result.as_ref().unwrap() } +} + +impl DerefMut for GuardedTempResult { + fn deref_mut(&mut self) -> &mut T { self.result.as_mut().unwrap() } +} + #[test] fn creates_dir() { let temp = RandomTempPath::create_dir(); diff --git a/ethcore/src/block.rs b/ethcore/src/block.rs index e15a4b168..f8e9de3f4 100644 --- a/ethcore/src/block.rs +++ b/ethcore/src/block.rs @@ -24,7 +24,7 @@ use trace::Trace; use evm::Factory as EvmFactory; /// A block, encoded as it is on the block chain. -#[derive(Default, Debug, Clone)] +#[derive(Default, Debug, Clone, PartialEq)] pub struct Block { /// The header of this block. pub header: Header, diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 94424d3ef..f95df66fb 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -14,13 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::path::PathBuf; use std::collections::{HashSet, HashMap}; use std::ops::Deref; use std::mem; use std::collections::VecDeque; use std::sync::*; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::fmt; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::time::Instant; diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index 10786cf91..d492c2cf5 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -230,6 +230,10 @@ pub enum Error { PowInvalid, /// Error concerning TrieDBs Trie(TrieError), + /// Io error. + Io(::std::io::Error), + /// Snappy error. + Snappy(::util::snappy::InvalidInput), } impl fmt::Display for Error { @@ -246,6 +250,8 @@ impl fmt::Display for Error { Error::PowHashInvalid => f.write_str("Invalid or out of date PoW hash."), Error::PowInvalid => f.write_str("Invalid nonce or mishash"), Error::Trie(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Io(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Snappy(ref err) => f.write_fmt(format_args!("{}", err)), } } } @@ -313,6 +319,18 @@ impl From for Error { } } +impl From<::std::io::Error> for Error { + fn from(err: ::std::io::Error) -> Error { + Error::Io(err) + } +} + +impl From<::util::snappy::InvalidInput> for Error { + fn from(err: ::util::snappy::InvalidInput) -> Error { + Error::Snappy(err) + } +} + impl From for Error { fn from(err: BlockImportError) -> Error { match err { diff --git a/ethcore/src/header.rs b/ethcore/src/header.rs index 01d0bbb3d..48a5f5bcc 100644 --- a/ethcore/src/header.rs +++ b/ethcore/src/header.rs @@ -149,6 +149,16 @@ impl Header { /// Set the number field of the header. pub fn set_parent_hash(&mut self, a: H256) { self.parent_hash = a; self.note_dirty(); } + /// Set the uncles hash field of the header. + pub fn set_uncles_hash(&mut self, a: H256) { self.uncles_hash = a; self.note_dirty(); } + /// Set the state root field of the header. + pub fn set_state_root(&mut self, a: H256) { self.state_root = a; self.note_dirty(); } + /// Set the transactions root field of the header. + pub fn set_transactions_root(&mut self, a: H256) { self.transactions_root = a; self.note_dirty() } + /// Set the receipts root field of the header. + pub fn set_receipts_root(&mut self, a: H256) { self.receipts_root = a; self.note_dirty() } + /// Set the log bloom field of the header. + pub fn set_log_bloom(&mut self, a: LogBloom) { self.log_bloom = a; self.note_dirty() } /// Set the timestamp field of the header. pub fn set_timestamp(&mut self, a: u64) { self.timestamp = a; self.note_dirty(); } /// Set the timestamp field of the header to the current time. diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 4f644db03..456821b75 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -118,8 +118,9 @@ pub mod pod_state; pub mod engine; pub mod migrations; pub mod miner; -#[macro_use] pub mod evm; +pub mod snapshot; pub mod action_params; +#[macro_use] pub mod evm; mod blooms; mod db; diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs new file mode 100644 index 000000000..2329b0e34 --- /dev/null +++ b/ethcore/src/snapshot/account.rs @@ -0,0 +1,211 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Account state encoding and decoding + +use account_db::{AccountDB, AccountDBMut}; +use error::Error; + +use util::{Bytes, HashDB, SHA3_EMPTY, TrieDB}; +use util::hash::{FixedHash, H256}; +use util::numbers::U256; +use util::rlp::{DecoderError, Rlp, RlpStream, Stream, UntrustedRlp, View}; + +// An alternate account structure from ::account::Account. +#[derive(PartialEq, Clone, Debug)] +pub struct Account { + nonce: U256, + balance: U256, + storage_root: H256, + code_hash: H256, +} + +impl Account { + // decode the account from rlp. + pub fn from_thin_rlp(rlp: &[u8]) -> Self { + let r: Rlp = Rlp::new(rlp); + + Account { + nonce: r.val_at(0), + balance: r.val_at(1), + storage_root: r.val_at(2), + code_hash: r.val_at(3), + } + } + + // encode the account to a standard rlp. + pub fn to_thin_rlp(&self) -> Bytes { + let mut stream = RlpStream::new_list(4); + stream + .append(&self.nonce) + .append(&self.balance) + .append(&self.storage_root) + .append(&self.code_hash); + + stream.out() + } + + // walk the account's storage trie, returning an RLP item containing the + // account properties and the storage. + pub fn to_fat_rlp(&self, acct_db: &AccountDB) -> Result { + let db = try!(TrieDB::new(acct_db, &self.storage_root)); + + let mut pairs = Vec::new(); + + for (k, v) in db.iter() { + pairs.push((k, v)); + } + + let mut stream = RlpStream::new_list(pairs.len()); + + for (k, v) in pairs { + stream.begin_list(2).append(&k).append(&v); + } + + let pairs_rlp = stream.out(); + + let mut account_stream = RlpStream::new_list(5); + account_stream.append(&self.nonce) + .append(&self.balance); + + // [has_code, code_hash]. + if self.code_hash == SHA3_EMPTY { + account_stream.append(&false).append_empty_data(); + } else { + match acct_db.get(&self.code_hash) { + Some(c) => { + account_stream.append(&true).append(&c); + } + None => { + warn!("code lookup failed during snapshot"); + account_stream.append(&false).append_empty_data(); + } + } + } + + account_stream.append_raw(&pairs_rlp, 1); + + Ok(account_stream.out()) + } + + // decode a fat rlp, and rebuild the storage trie as we go. + pub fn from_fat_rlp(acct_db: &mut AccountDBMut, rlp: UntrustedRlp) -> Result { + use util::{TrieDBMut, TrieMut}; + + let nonce = try!(rlp.val_at(0)); + let balance = try!(rlp.val_at(1)); + let code_hash = if try!(rlp.val_at(2)) { + let code: Bytes = try!(rlp.val_at(3)); + acct_db.insert(&code) + } else { + SHA3_EMPTY + }; + + let mut storage_root = H256::zero(); + + { + let mut storage_trie = TrieDBMut::new(acct_db, &mut storage_root); + let pairs = try!(rlp.at(4)); + for pair_rlp in pairs.iter() { + let k: Bytes = try!(pair_rlp.val_at(0)); + let v: Bytes = try!(pair_rlp.val_at(1)); + + storage_trie.insert(&k, &v); + } + } + Ok(Account { + nonce: nonce, + balance: balance, + storage_root: storage_root, + code_hash: code_hash, + }) + } +} + +#[cfg(test)] +mod tests { + use account_db::{AccountDB, AccountDBMut}; + use tests::helpers::get_temp_journal_db; + + use util::{SHA3_NULL_RLP, SHA3_EMPTY}; + use util::hash::{Address, FixedHash, H256}; + use util::rlp::{UntrustedRlp, View}; + use util::trie::{Alphabet, StandardMap, SecTrieDBMut, TrieMut, ValueMode}; + + use super::Account; + + fn fill_storage(mut db: AccountDBMut) -> H256 { + let map = StandardMap { + alphabet: Alphabet::All, + min_key: 6, + journal_key: 6, + value_mode: ValueMode::Random, + count: 100 + }; + + let mut root = H256::new(); + { + let mut trie = SecTrieDBMut::new(&mut db, &mut root); + for (k, v) in map.make() { + trie.insert(&k, &v); + } + } + root + } + + #[test] + fn encoding_basic() { + let mut db = get_temp_journal_db(); + let mut db = &mut **db; + let addr = Address::random(); + + let account = Account { + nonce: 50.into(), + balance: 123456789.into(), + storage_root: SHA3_NULL_RLP, + code_hash: SHA3_EMPTY, + }; + + let thin_rlp = account.to_thin_rlp(); + assert_eq!(Account::from_thin_rlp(&thin_rlp), account); + + let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap(); + let fat_rlp = UntrustedRlp::new(&fat_rlp); + assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account); + } + + #[test] + fn encoding_storage() { + let mut db = get_temp_journal_db(); + let mut db = &mut **db; + let addr = Address::random(); + + let root = fill_storage(AccountDBMut::new(db.as_hashdb_mut(), &addr)); + let account = Account { + nonce: 25.into(), + balance: 987654321.into(), + storage_root: root, + code_hash: SHA3_EMPTY, + }; + + let thin_rlp = account.to_thin_rlp(); + assert_eq!(Account::from_thin_rlp(&thin_rlp), account); + + let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap(); + let fat_rlp = UntrustedRlp::new(&fat_rlp); + assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account); + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/block.rs b/ethcore/src/snapshot/block.rs new file mode 100644 index 000000000..fd034d97b --- /dev/null +++ b/ethcore/src/snapshot/block.rs @@ -0,0 +1,211 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Block RLP compression. + +// TODO [rob] remove when BlockRebuilder done. +#![allow(dead_code)] + +use block::Block; +use header::Header; + +use views::BlockView; +use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View}; +use util::{Bytes, Hashable, H256}; + +const HEADER_FIELDS: usize = 10; +const BLOCK_FIELDS: usize = 2; + +pub struct AbridgedBlock { + rlp: Bytes, +} + +impl AbridgedBlock { + /// Create from a vector of bytes. Does no verification. + pub fn from_raw(rlp: Bytes) -> Self { + AbridgedBlock { + rlp: rlp, + } + } + + /// Return the inner bytes. + pub fn into_inner(self) -> Bytes { + self.rlp + } + + /// Given a full block view, trim out the parent hash and block number, + /// producing new rlp. + pub fn from_block_view(block_view: &BlockView) -> Self { + let header = block_view.header_view(); + + let seal_fields = header.seal(); + + // 10 header fields, unknown amount of seal fields, and 2 block fields. + let mut stream = RlpStream::new_list( + HEADER_FIELDS + + seal_fields.len() + + BLOCK_FIELDS + ); + + // write header values. + stream + .append(&header.author()) + .append(&header.state_root()) + .append(&header.transactions_root()) + .append(&header.receipts_root()) + .append(&header.log_bloom()) + .append(&header.difficulty()) + .append(&header.gas_limit()) + .append(&header.gas_used()) + .append(&header.timestamp()) + .append(&header.extra_data()); + + // write block values. + stream.append(&block_view.transactions()).append(&block_view.uncles()); + + // write seal fields. + for field in seal_fields { + stream.append_raw(&field, 1); + } + + AbridgedBlock { + rlp: stream.out(), + } + } + + /// Flesh out an abridged block view with the provided parent hash and block number. + /// + /// Will fail if contains invalid rlp. + pub fn to_block(&self, parent_hash: H256, number: u64) -> Result { + let rlp = UntrustedRlp::new(&self.rlp); + + let mut header = Header { + parent_hash: parent_hash, + author: try!(rlp.val_at(0)), + state_root: try!(rlp.val_at(1)), + transactions_root: try!(rlp.val_at(2)), + receipts_root: try!(rlp.val_at(3)), + log_bloom: try!(rlp.val_at(4)), + difficulty: try!(rlp.val_at(5)), + number: number, + gas_limit: try!(rlp.val_at(6)), + gas_used: try!(rlp.val_at(7)), + timestamp: try!(rlp.val_at(8)), + extra_data: try!(rlp.val_at(9)), + ..Default::default() + }; + let transactions = try!(rlp.val_at(10)); + let uncles: Vec
= try!(rlp.val_at(11)); + + // iterator-based approach is cleaner but doesn't work w/ try. + let seal = { + let mut seal = Vec::new(); + + for i in 12..rlp.item_count() { + seal.push(try!(rlp.val_at(i))); + } + + seal + }; + + header.set_seal(seal); + + let uncle_bytes = uncles.iter() + .fold(RlpStream::new_list(uncles.len()), |mut s, u| { + s.append_raw(&u.rlp(::basic_types::Seal::With), 1); + s + }).out(); + header.uncles_hash = uncle_bytes.sha3(); + + Ok(Block { + header: header, + transactions: transactions, + uncles: uncles, + }) + } +} + +#[cfg(test)] +mod tests { + use views::BlockView; + use block::Block; + use super::AbridgedBlock; + use types::transaction::{Action, Transaction}; + + use util::numbers::U256; + use util::hash::{Address, H256, FixedHash}; + use util::{Bytes, RlpStream, Stream}; + + fn encode_block(b: &Block) -> Bytes { + let mut s = RlpStream::new_list(3); + + b.header.stream_rlp(&mut s, ::basic_types::Seal::With); + s.append(&b.transactions); + s.append(&b.uncles); + + s.out() + } + + #[test] + fn empty_block_abridging() { + let b = Block::default(); + let encoded = encode_block(&b); + + let abridged = AbridgedBlock::from_block_view(&BlockView::new(&encoded)); + assert_eq!(abridged.to_block(H256::new(), 0).unwrap(), b); + } + + #[test] + #[should_panic] + fn wrong_number() { + let b = Block::default(); + let encoded = encode_block(&b); + + let abridged = AbridgedBlock::from_block_view(&BlockView::new(&encoded)); + assert_eq!(abridged.to_block(H256::new(), 2).unwrap(), b); + } + + #[test] + fn with_transactions() { + let mut b = Block::default(); + + let t1 = Transaction { + action: Action::Create, + nonce: U256::from(42), + gas_price: U256::from(3000), + gas: U256::from(50_000), + value: U256::from(1), + data: b"Hello!".to_vec() + }.fake_sign(Address::from(0x69)); + + let t2 = Transaction { + action: Action::Create, + nonce: U256::from(88), + gas_price: U256::from(12345), + gas: U256::from(300000), + value: U256::from(1000000000), + data: "Eep!".into(), + }.fake_sign(Address::from(0x55)); + + b.transactions.push(t1); + b.transactions.push(t2); + + let encoded = encode_block(&b); + + let abridged = AbridgedBlock::from_block_view(&BlockView::new(&encoded[..])); + assert_eq!(abridged.to_block(H256::new(), 0).unwrap(), b); + } +} \ No newline at end of file diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs new file mode 100644 index 000000000..aff16b86e --- /dev/null +++ b/ethcore/src/snapshot/mod.rs @@ -0,0 +1,417 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snapshot creation helpers. + +use std::collections::VecDeque; +use std::fs::{create_dir_all, File}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use account_db::{AccountDB, AccountDBMut}; +use client::BlockChainClient; +use error::Error; +use ids::BlockID; +use views::{BlockView, HeaderView}; + +use util::{Bytes, Hashable, HashDB, JournalDB, snappy, TrieDB, TrieDBMut, TrieMut}; +use util::hash::{FixedHash, H256}; +use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View}; + +use self::account::Account; +use self::block::AbridgedBlock; + +use crossbeam::{scope, ScopedJoinHandle}; + +mod account; +mod block; + +// Try to have chunks be around 16MB (before compression) +const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024; + +/// Take a snapshot using the given client and database, writing into `path`. +pub fn take_snapshot(client: &BlockChainClient, mut path: PathBuf, state_db: &HashDB) -> Result<(), Error> { + let chain_info = client.chain_info(); + + let genesis_hash = chain_info.genesis_hash; + let best_header_raw = client.best_block_header(); + let best_header = HeaderView::new(&best_header_raw); + let state_root = best_header.state_root(); + + trace!(target: "snapshot", "Taking snapshot starting at block {}", best_header.number()); + + let _ = create_dir_all(&path); + + let state_hashes = try!(chunk_state(state_db, &state_root, &path)); + let block_hashes = try!(chunk_blocks(client, best_header.hash(), genesis_hash, &path)); + + trace!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); + + let manifest_data = ManifestData { + state_hashes: state_hashes, + block_hashes: block_hashes, + state_root: state_root, + block_number: chain_info.best_block_number, + block_hash: chain_info.best_block_hash, + }; + + path.push("MANIFEST"); + + let mut manifest_file = try!(File::create(&path)); + + try!(manifest_file.write_all(&manifest_data.to_rlp())); + + Ok(()) +} + +// shared portion of write_chunk +// returns either a (hash, compressed_size) pair or an io error. +fn write_chunk(raw_data: &[u8], compression_buffer: &mut Vec, path: &Path) -> Result<(H256, usize), Error> { + let compressed_size = snappy::compress_into(raw_data, compression_buffer); + let compressed = &compression_buffer[..compressed_size]; + let hash = compressed.sha3(); + + let mut file_path = path.to_owned(); + file_path.push(hash.hex()); + + let mut file = try!(File::create(file_path)); + try!(file.write_all(compressed)); + + Ok((hash, compressed_size)) +} + +/// Used to build block chunks. +struct BlockChunker<'a> { + client: &'a BlockChainClient, + // block, receipt rlp pairs. + rlps: VecDeque, + current_hash: H256, + hashes: Vec, + snappy_buffer: Vec, +} + +impl<'a> BlockChunker<'a> { + // Repeatedly fill the buffers and writes out chunks, moving backwards from starting block hash. + // Loops until we reach the genesis, and writes out the remainder. + fn chunk_all(&mut self, genesis_hash: H256, path: &Path) -> Result<(), Error> { + let mut loaded_size = 0; + + while self.current_hash != genesis_hash { + let block = self.client.block(BlockID::Hash(self.current_hash)) + .expect("started from the head of chain and walking backwards; client stores full chain; qed"); + let view = BlockView::new(&block); + let abridged_rlp = AbridgedBlock::from_block_view(&view).into_inner(); + + let receipts = self.client.block_receipts(&self.current_hash) + .expect("started from head of chain and walking backwards; client stores full chain; qed"); + + let pair = { + let mut pair_stream = RlpStream::new_list(2); + pair_stream.append(&abridged_rlp).append(&receipts); + pair_stream.out() + }; + + let new_loaded_size = loaded_size + pair.len(); + + // cut off the chunk if too large + if new_loaded_size > PREFERRED_CHUNK_SIZE { + let header = view.header_view(); + try!(self.write_chunk(header.parent_hash(), header.number(), path)); + loaded_size = pair.len(); + } else { + loaded_size = new_loaded_size; + } + + self.rlps.push_front(pair); + self.current_hash = view.header_view().parent_hash(); + } + + if loaded_size != 0 { + // we don't store the genesis block, so once we get to this point, + // the "first" block will be number 1. + try!(self.write_chunk(genesis_hash, 1, path)); + } + + Ok(()) + } + + // write out the data in the buffers to a chunk on disk + fn write_chunk(&mut self, parent_hash: H256, number: u64, path: &Path) -> Result<(), Error> { + trace!(target: "snapshot", "prepared block chunk with {} blocks", self.rlps.len()); + let mut rlp_stream = RlpStream::new_list(self.rlps.len() + 2); + rlp_stream.append(&parent_hash).append(&number); + for pair in self.rlps.drain(..) { + rlp_stream.append_raw(&pair, 1); + } + + let raw_data = rlp_stream.out(); + let (hash, size) = try!(write_chunk(&raw_data, &mut self.snappy_buffer, path)); + trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len()); + + self.hashes.push(hash); + Ok(()) + } +} + +/// Create and write out all block chunks to disk, returning a vector of all +/// the hashes of block chunks created. +/// +/// The path parameter is the directory to store the block chunks in. +/// This function assumes the directory exists already. +pub fn chunk_blocks(client: &BlockChainClient, best_block_hash: H256, genesis_hash: H256, path: &Path) -> Result, Error> { + let mut chunker = BlockChunker { + client: client, + rlps: VecDeque::new(), + current_hash: best_block_hash, + hashes: Vec::new(), + snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], + }; + + try!(chunker.chunk_all(genesis_hash, path)); + + Ok(chunker.hashes) +} + +/// State trie chunker. +struct StateChunker<'a> { + hashes: Vec, + rlps: Vec, + cur_size: usize, + snapshot_path: &'a Path, + snappy_buffer: Vec, +} + +impl<'a> StateChunker<'a> { + // Push a key, value pair to be encoded. + // + // If the buffer is greater than the desired chunk size, + // this will write out the data to disk. + fn push(&mut self, account_hash: Bytes, data: Bytes) -> Result<(), Error> { + let pair = { + let mut stream = RlpStream::new_list(2); + stream.append(&account_hash).append_raw(&data, 1); + stream.out() + }; + + if self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE { + try!(self.write_chunk()); + } + + self.cur_size += pair.len(); + self.rlps.push(pair); + + Ok(()) + } + + // Write out the buffer to disk, pushing the created chunk's hash to + // the list. + fn write_chunk(&mut self) -> Result<(), Error> { + let mut stream = RlpStream::new_list(self.rlps.len()); + for rlp in self.rlps.drain(..) { + stream.append_raw(&rlp, 1); + } + + let raw_data = stream.out(); + let (hash, compressed_size) = try!(write_chunk(&raw_data, &mut self.snappy_buffer, self.snapshot_path)); + trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len()); + + self.hashes.push(hash); + self.cur_size = 0; + + Ok(()) + } +} + +/// Walk the given state database starting from the given root, +/// creating chunks and writing them out. +/// +/// Returns a list of hashes of chunks created, or any error it may +/// have encountered. +pub fn chunk_state(db: &HashDB, root: &H256, path: &Path) -> Result, Error> { + let account_view = try!(TrieDB::new(db, &root)); + + let mut chunker = StateChunker { + hashes: Vec::new(), + rlps: Vec::new(), + cur_size: 0, + snapshot_path: path, + snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], + }; + + trace!(target: "snapshot", "beginning state chunking"); + + // account_key here is the address' hash. + for (account_key, account_data) in account_view.iter() { + let account = Account::from_thin_rlp(account_data); + let account_key_hash = H256::from_slice(&account_key); + + let account_db = AccountDB::from_hash(db, account_key_hash); + + let fat_rlp = try!(account.to_fat_rlp(&account_db)); + try!(chunker.push(account_key, fat_rlp)); + } + + if chunker.cur_size != 0 { + try!(chunker.write_chunk()); + } + + Ok(chunker.hashes) +} + +/// Manifest data. +pub struct ManifestData { + /// List of state chunk hashes. + pub state_hashes: Vec, + /// List of block chunk hashes. + pub block_hashes: Vec, + /// The final, expected state root. + pub state_root: H256, + /// Block number this snapshot was taken at. + pub block_number: u64, + /// Block hash this snapshot was taken at. + pub block_hash: H256, +} + +impl ManifestData { + /// Encode the manifest data to rlp. + pub fn to_rlp(self) -> Bytes { + let mut stream = RlpStream::new_list(5); + stream.append(&self.state_hashes); + stream.append(&self.block_hashes); + stream.append(&self.state_root); + stream.append(&self.block_number); + stream.append(&self.block_hash); + + stream.out() + } + + /// Try to restore manifest data from raw bytes, interpreted as RLP. + pub fn from_rlp(raw: &[u8]) -> Result { + let decoder = UntrustedRlp::new(raw); + + let state_hashes: Vec = try!(decoder.val_at(0)); + let block_hashes: Vec = try!(decoder.val_at(1)); + let state_root: H256 = try!(decoder.val_at(2)); + let block_number: u64 = try!(decoder.val_at(3)); + let block_hash: H256 = try!(decoder.val_at(4)); + + Ok(ManifestData { + state_hashes: state_hashes, + block_hashes: block_hashes, + state_root: state_root, + block_number: block_number, + block_hash: block_hash, + }) + } +} + +/// Used to rebuild the state trie piece by piece. +pub struct StateRebuilder { + db: Box, + state_root: H256, + snappy_buffer: Vec +} + +impl StateRebuilder { + /// Create a new state rebuilder to write into the given backing DB. + pub fn new(db: Box) -> Self { + StateRebuilder { + db: db, + state_root: H256::zero(), + snappy_buffer: Vec::new(), + } + } + + /// Feed a compressed state chunk into the rebuilder. + pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> { + let len = try!(snappy::decompress_into(compressed, &mut self.snappy_buffer)); + let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]); + let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); + let mut pairs = Vec::with_capacity(rlp.item_count()); + + // initialize the pairs vector with empty values so we have slots to write into. + for _ in 0..rlp.item_count() { + pairs.push((H256::new(), Vec::new())); + } + + let chunk_size = account_fat_rlps.len() / ::num_cpus::get(); + + // build account tries in parallel. + try!(scope(|scope| { + let mut handles = Vec::new(); + for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) { + let mut db = self.db.boxed_clone(); + let handle: ScopedJoinHandle> = scope.spawn(move || { + try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk)); + + // commit the db changes we made in this thread. + try!(db.commit(0, &H256::zero(), None)); + + Ok(()) + }); + + handles.push(handle); + } + + // see if we got any errors. + for handle in handles { + try!(handle.join()); + } + + Ok::<_, Error>(()) + })); + + // batch trie writes + { + let mut account_trie = if self.state_root != H256::zero() { + try!(TrieDBMut::from_existing(self.db.as_hashdb_mut(), &mut self.state_root)) + } else { + TrieDBMut::new(self.db.as_hashdb_mut(), &mut self.state_root) + }; + + for (hash, thin_rlp) in pairs { + account_trie.insert(&hash, &thin_rlp); + } + } + + try!(self.db.commit(0, &H256::zero(), None)); + Ok(()) + } + + /// Get the state root of the rebuilder. + pub fn state_root(&self) -> H256 { self.state_root } +} + +fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), Error> { + for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) { + let account_rlp = UntrustedRlp::new(account_pair); + + let hash: H256 = try!(account_rlp.val_at(0)); + let fat_rlp = try!(account_rlp.at(1)); + + let thin_rlp = { + let mut acct_db = AccountDBMut::from_hash(db.as_hashdb_mut(), hash); + + // fill out the storage trie and code while decoding. + let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp)); + + acc.to_thin_rlp() + }; + + *out = (hash, thin_rlp); + } + Ok(()) +} \ No newline at end of file diff --git a/util/src/error.rs b/util/src/error.rs index c5f40d717..f5048445b 100644 --- a/util/src/error.rs +++ b/util/src/error.rs @@ -65,6 +65,8 @@ pub enum UtilError { SimpleString(String), /// Error from a bad input size being given for the needed output. BadSize, + /// Error from snappy. + Snappy(::snappy::InvalidInput), } impl fmt::Display for UtilError { @@ -82,6 +84,7 @@ impl fmt::Display for UtilError { UtilError::Decoder(ref err) => f.write_fmt(format_args!("{}", err)), UtilError::SimpleString(ref msg) => f.write_str(&msg), UtilError::BadSize => f.write_str("Bad input size."), + UtilError::Snappy(ref err) => f.write_fmt(format_args!("{}", err)), } } } @@ -179,6 +182,12 @@ impl From for UtilError { } } +impl From<::snappy::InvalidInput> for UtilError { + fn from(err: ::snappy::InvalidInput) -> UtilError { + UtilError::Snappy(err) + } +} + // TODO: uncomment below once https://github.com/rust-lang/rust/issues/27336 sorted. /*#![feature(concat_idents)] macro_rules! assimilate { diff --git a/util/src/lib.rs b/util/src/lib.rs index 08480d85f..ee6b53138 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -153,6 +153,7 @@ pub mod log; pub mod panics; pub mod network_settings; pub mod path; +pub mod snappy; mod timer; pub use common::*; diff --git a/util/src/snappy.rs b/util/src/snappy.rs new file mode 100644 index 000000000..6919fb1ad --- /dev/null +++ b/util/src/snappy.rs @@ -0,0 +1,157 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Snappy compression bindings. + +use std::fmt; +use libc::{c_char, c_int, size_t}; + +const SNAPPY_OK: c_int = 0; +const SNAPPY_INVALID_INPUT: c_int = 1; +const SNAPPY_BUFFER_TOO_SMALL: c_int = 2; + +#[link(name = "snappy")] +extern { + fn snappy_compress( + input: *const c_char, + input_len: size_t, + compressed: *mut c_char, + compressed_len: *mut size_t + ) -> c_int; + + fn snappy_max_compressed_length(source_len: size_t) -> size_t; + + fn snappy_uncompress( + compressed: *const c_char, + compressed_len: size_t, + uncompressed: *mut c_char, + uncompressed_len: *mut size_t, + ) -> c_int; + + fn snappy_uncompressed_length( + compressed: *const c_char, + compressed_len: size_t, + result: *mut size_t, + ) -> c_int; + + fn snappy_validate_compressed_buffer( + compressed: *const c_char, + compressed_len: size_t, + ) -> c_int; +} + +/// Attempted to decompress an uncompressed buffer. +#[derive(Debug)] +pub struct InvalidInput; + +impl fmt::Display for InvalidInput { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Attempted snappy decompression with invalid input") + } +} + +/// The maximum compressed length given a size. +pub fn max_compressed_len(len: usize) -> usize { + unsafe { snappy_max_compressed_length(len as size_t) as usize } +} + +/// How large the given data will be when decompressed. +pub fn decompressed_len(compressed: &[u8]) -> Result { + let mut size: size_t = 0; + let len = compressed.len() as size_t; + + let status = unsafe { snappy_uncompressed_length(compressed.as_ptr() as *const c_char, len, &mut size) }; + + if status == SNAPPY_INVALID_INPUT { + Err(InvalidInput) + } else { + Ok(size) + } +} + +/// Compress a buffer using snappy. +pub fn compress(input: &[u8]) -> Vec { + let mut buf = Vec::new(); + compress_into(input, &mut buf); + buf +} + +/// Compress a buffer using snappy, writing the result into +/// the given output buffer, growing it if necessary. +/// Otherwise, returns the length of the compressed data. +pub fn compress_into(input: &[u8], output: &mut Vec) -> usize { + let mut len = max_compressed_len(input.len()); + + if output.len() < len { + output.resize(len, 0); + } + + let status = unsafe { + snappy_compress( + input.as_ptr() as *const c_char, + input.len() as size_t, + output.as_mut_ptr() as *mut c_char, + &mut len as &mut size_t, + ) + }; + + match status { + SNAPPY_OK => len, + SNAPPY_INVALID_INPUT => panic!("snappy compression has no concept of invalid input"), + SNAPPY_BUFFER_TOO_SMALL => panic!("buffer cannot be too small, the capacity was just ensured."), + _ => panic!("snappy returned unspecified status"), + } +} + +/// Decompress a buffer using snappy. Will return an error if the buffer is not snappy-compressed. +pub fn decompress(input: &[u8]) -> Result, InvalidInput> { + let mut v = Vec::new(); + decompress_into(input, &mut v).map(|_| v) +} + +/// Decompress a buffer using snappy, writing the result into +/// the given output buffer, growing it if necessary. +/// Will error if the input buffer is not snappy-compressed. +/// Otherwise, returns the length of the decompressed data. +pub fn decompress_into(input: &[u8], output: &mut Vec) -> Result { + let mut len = try!(decompressed_len(input)); + + if output.len() < len { + output.resize(len, 0); + } + + let status = unsafe { + snappy_uncompress( + input.as_ptr() as *const c_char, + input.len() as size_t, + output.as_mut_ptr() as *mut c_char, + &mut len as &mut size_t, + ) + }; + + match status { + SNAPPY_OK => Ok(len as usize), + SNAPPY_INVALID_INPUT => Err(InvalidInput), + SNAPPY_BUFFER_TOO_SMALL => panic!("buffer cannot be too small, size was just set to large enough."), + _ => panic!("snappy returned unspecified status"), + } +} + +/// Validate a compressed buffer. True if valid, false if not. +pub fn validate_compressed_buffer(input: &[u8]) -> bool { + let status = unsafe { snappy_validate_compressed_buffer(input.as_ptr() as *const c_char, input.len() as size_t )}; + status == SNAPPY_OK +} \ No newline at end of file