io error handling and slight blockchunker refactoring

This commit is contained in:
Robert Habermeier 2016-06-13 16:21:23 +02:00
parent f478812441
commit 446d59426a
3 changed files with 45 additions and 27 deletions

View File

@ -777,7 +777,7 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
let best_hash = best_header.hash();
let genesis_hash = self.chain.genesis_hash();
let block_chunk_hashes = BlockChunker::new(self, best_hash, genesis_hash).chunk_all(&path);
let block_chunk_hashes = BlockChunker::new(self, best_hash, genesis_hash).chunk_all(&path).unwrap();
let manifest_data = ManifestData {
state_hashes: state_hashes,

View File

@ -225,7 +225,9 @@ pub enum Error {
/// The value of the nonce or mishash is invalid.
PowInvalid,
/// Error concerning TrieDBs
TrieError(TrieError),
Trie(TrieError),
/// Io error.
Io(::std::io::Error),
}
impl fmt::Display for Error {
@ -241,7 +243,8 @@ impl fmt::Display for Error {
f.write_fmt(format_args!("Unknown engine name ({})", name)),
Error::PowHashInvalid => f.write_str("Invalid or out of date PoW hash."),
Error::PowInvalid => f.write_str("Invalid nonce or mishash"),
Error::TrieError(ref err) => f.write_fmt(format_args!("{}", err)),
Error::Trie(ref err) => f.write_fmt(format_args!("{}", err)),
Error::Io(ref err) => f.write_fmt(format_args!("{}", err)),
}
}
}
@ -305,7 +308,13 @@ impl From<IoError> for Error {
impl From<TrieError> for Error {
fn from(err: TrieError) -> Error {
Error::TrieError(err)
Error::Trie(err)
}
}
impl From<::std::io::Error> for Error {
fn from(err: ::std::io::Error) -> Error {
Error::Io(err)
}
}

View File

@ -39,9 +39,10 @@ use util::rlp::{DecoderError, Stream, Rlp, RlpStream, UntrustedRlp, View};
pub struct BlockChunker<'a> {
client: &'a BlockChainClient,
// block, receipt rlp pairs.
rlps: VecDeque<(Bytes, Bytes)>,
rlps: VecDeque<Bytes>,
genesis_hash: H256,
current_hash: H256,
hashes: Vec<H256>,
}
impl<'a> BlockChunker<'a> {
@ -53,6 +54,7 @@ impl<'a> BlockChunker<'a> {
rlps: VecDeque::new(),
genesis_hash: genesis_hash,
current_hash: best_block_hash,
hashes: Vec::new(),
}
}
@ -68,7 +70,13 @@ impl<'a> BlockChunker<'a> {
let block = self.client.block(BlockID::Hash(self.current_hash)).unwrap();
let receipts = self.client.block_receipts(&self.current_hash).unwrap();
let new_loaded_size = loaded_size + (block.len() + receipts.len());
let pair = {
let mut pair_stream = RlpStream::new_list(2);
pair_stream.append(&block).append(&receipts);
pair_stream.out()
};
let new_loaded_size = loaded_size + pair.len();
// cut off the chunk if too large
if new_loaded_size > PREFERRED_CHUNK_SIZE {
@ -78,8 +86,6 @@ impl<'a> BlockChunker<'a> {
}
self.current_hash = BlockView::new(&block).header_view().parent_hash();
self.rlps.push_front((block, receipts));
blocks_loaded += 1;
}
@ -91,11 +97,11 @@ impl<'a> BlockChunker<'a> {
}
// write out the data in the buffers to a chunk on disk
fn write_chunk(&mut self, path: &Path) -> H256 {
fn write_chunk(&mut self, path: &Path) -> Result<(), Error> {
// Todo [rob]: compress raw data, put parent hash and block number into chunk.
let mut rlp_stream = RlpStream::new_list(self.rlps.len());
for (block, receipts) in self.rlps.drain(..) {
rlp_stream.begin_list(2).append(&block).append(&receipts);
for pair in self.rlps.drain(..) {
rlp_stream.append(&pair);
}
let raw_data = rlp_stream.out();
@ -106,10 +112,11 @@ impl<'a> BlockChunker<'a> {
let mut file_path = path.to_owned();
file_path.push(hash.hex());
let mut file = File::create(file_path).unwrap();
file.write_all(&raw_data).unwrap();
let mut file = try!(File::create(file_path));
try!(file.write_all(&raw_data));
hash
self.hashes.push(hash);
Ok(())
}
/// Create and write out all block chunks to disk, returning a vector of all
@ -117,18 +124,16 @@ impl<'a> BlockChunker<'a> {
///
/// The path parameter is the directory to store the block chunks in.
/// This function assumes the directory exists already.
pub fn chunk_all(mut self, path: &Path) -> Vec<H256> {
let mut chunk_hashes = Vec::new();
pub fn chunk_all(mut self, path: &Path) -> Result<Vec<H256>, Error> {
while self.fill_buffers() {
chunk_hashes.push(self.write_chunk(path));
try!(self.write_chunk(path));
}
if self.rlps.len() != 0 {
chunk_hashes.push(self.write_chunk(path));
try!(self.write_chunk(path));
}
chunk_hashes
Ok(self.hashes)
}
}
@ -145,7 +150,7 @@ impl<'a> StateChunker<'a> {
//
// If the buffer is greater than the desired chunk size,
// this will write out the data to disk.
fn push(&mut self, key: Bytes, value: Bytes) {
fn push(&mut self, key: Bytes, value: Bytes) -> Result<(), Error> {
let pair = {
let mut stream = RlpStream::new_list(2);
stream.append(&key).append(&value);
@ -153,16 +158,18 @@ impl<'a> StateChunker<'a> {
};
if self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE {
self.write_chunk();
try!(self.write_chunk());
}
self.cur_size += pair.len();
self.rlps.push(pair);
Ok(())
}
// Write out the buffer to disk, pushing the created chunk's hash to
// the list.
fn write_chunk(&mut self) {
fn write_chunk(&mut self) -> Result<(), Error> {
trace!(target: "pv64_snapshot", "writing state chunk. uncompressed size: {}", self.cur_size);
let bytes = {
@ -178,11 +185,13 @@ impl<'a> StateChunker<'a> {
let mut path = self.snapshot_path.to_owned();
path.push(hash.hex());
let mut file = File::create(path).unwrap();
file.write_all(&bytes).unwrap();
let mut file = try!(File::create(path));
try!(file.write_all(&bytes));
self.hashes.push(hash);
self.cur_size = 0;
Ok(())
}
/// Walk the given state database starting from the given root,
@ -210,11 +219,11 @@ impl<'a> StateChunker<'a> {
let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlp = try!(account.to_fat_rlp(&account_db));
chunker.push(account_key, fat_rlp);
try!(chunker.push(account_key, fat_rlp));
}
if chunker.cur_size != 0 {
chunker.write_chunk();
try!(chunker.write_chunk());
}
Ok(chunker.hashes)