From 0e3a15cadb4959d6ff2a023a6eb5c6f7ac98e3d2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 15 Jun 2016 17:46:40 +0200 Subject: [PATCH] add a state rebuilder --- ethcore/src/error.rs | 9 +++++ ethcore/src/snapshot/account.rs | 4 +- ethcore/src/snapshot/mod.rs | 66 +++++++++++++++++++++++++++++++-- util/src/snappy.rs | 31 ++++++++++------ 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index 7e01a86cb..40442c071 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -228,6 +228,8 @@ pub enum Error { Trie(TrieError), /// Io error. Io(::std::io::Error), + /// Snappy error. + Snappy(::util::snappy::Error), } impl fmt::Display for Error { @@ -245,6 +247,7 @@ impl fmt::Display for Error { Error::PowInvalid => f.write_str("Invalid nonce or mishash"), Error::Trie(ref err) => f.write_fmt(format_args!("{}", err)), Error::Io(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Snappy(ref err) => f.write_fmt(format_args!("{}", err)), } } } @@ -318,6 +321,12 @@ impl From<::std::io::Error> for Error { } } +impl From<::util::snappy::Error> for Error { + fn from(err: ::util::snappy::Error) -> Error { + Error::Snappy(err) + } +} + // TODO: uncomment below once https://github.com/rust-lang/rust/issues/27336 sorted. /*#![feature(concat_idents)] macro_rules! assimilate { diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index 922e4c9e2..c514da405 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -102,11 +102,9 @@ impl Account { } // decode a fat rlp, and rebuild the storage trie as we go. - pub fn from_fat_rlp(acct_db: &mut AccountDBMut, rlp: Bytes) -> Result { + pub fn from_fat_rlp(acct_db: &mut AccountDBMut, rlp: UntrustedRlp) -> Result { use util::{TrieDBMut, TrieMut}; - let rlp = UntrustedRlp::new(&rlp); - let nonce = try!(rlp.val_at(0)); let balance = try!(rlp.val_at(1)); let code_hash = if try!(rlp.val_at(2)) { diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index c0023f800..06945b9d5 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -21,7 +21,7 @@ use std::fs::File; use std::io::Write; use std::path::Path; -use account_db::{AccountDB}; +use account_db::{AccountDB, AccountDBMut}; use client::BlockChainClient; use error::Error; use ids::BlockID; @@ -45,7 +45,7 @@ fn compression_helper(input: &[u8], output: &mut Vec) -> usize { let max_size = snappy::max_compressed_len(input.len()); let buf_len = output.len(); - // resize if necessary, but in reality this will probably never happen. + // resize if necessary, but in reality this will probably almost never happen. if max_size > buf_len { output.resize(max_size, 0); } @@ -57,6 +57,20 @@ fn compression_helper(input: &[u8], output: &mut Vec) -> usize { } } +// decompresses the data into the buffer, resizing if necessary. +// may return invalid input error. +fn decompression_helper(input: &[u8], output: &mut Vec) -> Result { + let size = try!(snappy::decompressed_len(input)); + let buf_len = output.len(); + + // resize if necessary, slow path. + if size > buf_len { + output.resize(size, 0); + } + + snappy::decompress_into(&input, output) +} + // shared portion of write_chunk // returns either a (hash, compressed_size) pair or an io error. fn write_chunk(raw_data: &[u8], compression_buffer: &mut Vec, path: &Path) -> Result<(H256, usize), Error> { @@ -259,7 +273,7 @@ pub struct ManifestData { } impl ManifestData { - /// Encode the manifest data to. + /// Encode the manifest data to rlp. pub fn to_rlp(self) -> Bytes { let mut stream = RlpStream::new_list(3); stream.append(&self.state_hashes); @@ -269,7 +283,7 @@ impl ManifestData { stream.out() } - /// Try to restore manifest data from raw bytes interpreted as RLP. + /// Try to restore manifest data from raw bytes, interpreted as RLP. pub fn from_rlp(raw: &[u8]) -> Result { let decoder = UntrustedRlp::new(raw); @@ -283,4 +297,48 @@ impl ManifestData { state_root: state_root, }) } +} + +/// Used to rebuild the state trie piece by piece. +pub struct StateRebuilder<'a> { + db: &'a mut HashDB, + state_root: H256, + snappy_buffer: Vec +} + +impl<'a> StateRebuilder<'a> { + /// Create a new state rebuilder to write into the given backing DB. + pub fn new(db: &'a mut HashDB) -> Self { + StateRebuilder { + db: db, + state_root: H256::zero(), + snappy_buffer: Vec::new(), + } + } + + /// Feed a compressed state chunk into the rebuilder. + pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> { + let len = try!(decompression_helper(compressed, &mut self.snappy_buffer)); + let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]); + + for account_pair in rlp.iter() { + let hash: H256 = try!(rlp.val_at(0)); + let fat_rlp = try!(rlp.at(1)); + + let thin_rlp = { + let mut acct_db = AccountDBMut::from_hash(self.db, hash); + + // fill out the storage trie and code while decoding. + let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp)); + acc.to_thin_rlp() + }; + + self.db.insert(&thin_rlp); + } + + Ok(()) + } + + /// Get the state root of the rebuilder. + pub fn state_root(&self) -> H256 { self.state_root } } \ No newline at end of file diff --git a/util/src/snappy.rs b/util/src/snappy.rs index ed47fa9e8..e4c2ae9b7 100644 --- a/util/src/snappy.rs +++ b/util/src/snappy.rs @@ -73,6 +73,20 @@ pub fn max_compressed_len(len: usize) -> usize { unsafe { snappy_max_compressed_length(len as size_t) as usize } } +/// How large the given data will be when decompressed. +pub fn decompressed_len(compressed: &[u8]) -> Result { + let mut size: size_t = 0; + let len = compressed.len() as size_t; + + let status = unsafe { snappy_uncompressed_length(compressed.as_ptr() as *const c_char, len, &mut size) }; + + if status == SNAPPY_INVALID_INPUT { + Err(Error::InvalidInput) + } else { + Ok(len) + } +} + /// Compress a buffer using snappy. pub fn compress(input: &[u8]) -> Vec { let mut buf_size = max_compressed_len(input.len()); @@ -107,18 +121,13 @@ pub fn compress_into(input: &[u8], output: &mut [u8]) -> Result { /// Decompress a buffer using snappy. Will return an error if the buffer is not snappy-compressed. pub fn decompress(input: &[u8]) -> Result, Error> { - let mut buf_size: size_t = 0; + decompressed_len(input).and_then(|mut buf_size| { + let mut output = vec![0; buf_size]; - let status = unsafe { snappy_uncompressed_length(input.as_ptr() as *const c_char, input.len() as size_t, &mut buf_size) }; - if status == SNAPPY_INVALID_INPUT { - return Err(Error::InvalidInput); - } - - let mut output = vec![0; buf_size]; - - buf_size = try!(decompress_into(input, &mut output)); - output.truncate(buf_size); - Ok(output) + buf_size = try!(decompress_into(input, &mut output)); + output.truncate(buf_size); + Ok(output) + }) } /// Decompress a buffer using snappy, writing the result into