From 446d59426aef6d560ca440a412c5fa9771218480 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 13 Jun 2016 16:21:23 +0200 Subject: [PATCH] io error handling and slight blockchunker refactoring --- ethcore/src/client/client.rs | 2 +- ethcore/src/error.rs | 15 ++++++++-- ethcore/src/pv64/mod.rs | 55 +++++++++++++++++++++--------------- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 61909f498..627300ea6 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -777,7 +777,7 @@ impl BlockChainClient for Client where V: Verifier { let best_hash = best_header.hash(); let genesis_hash = self.chain.genesis_hash(); - let block_chunk_hashes = BlockChunker::new(self, best_hash, genesis_hash).chunk_all(&path); + let block_chunk_hashes = BlockChunker::new(self, best_hash, genesis_hash).chunk_all(&path).unwrap(); let manifest_data = ManifestData { state_hashes: state_hashes, diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index 98aec917a..7e01a86cb 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -225,7 +225,9 @@ pub enum Error { /// The value of the nonce or mishash is invalid. PowInvalid, /// Error concerning TrieDBs - TrieError(TrieError), + Trie(TrieError), + /// Io error. + Io(::std::io::Error), } impl fmt::Display for Error { @@ -241,7 +243,8 @@ impl fmt::Display for Error { f.write_fmt(format_args!("Unknown engine name ({})", name)), Error::PowHashInvalid => f.write_str("Invalid or out of date PoW hash."), Error::PowInvalid => f.write_str("Invalid nonce or mishash"), - Error::TrieError(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Trie(ref err) => f.write_fmt(format_args!("{}", err)), + Error::Io(ref err) => f.write_fmt(format_args!("{}", err)), } } } @@ -305,7 +308,13 @@ impl From for Error { impl From for Error { fn from(err: TrieError) -> Error { - Error::TrieError(err) + Error::Trie(err) + } +} + +impl From<::std::io::Error> for Error { + fn from(err: ::std::io::Error) -> Error { + Error::Io(err) } } diff --git a/ethcore/src/pv64/mod.rs b/ethcore/src/pv64/mod.rs index 3e9ffde02..9af689588 100644 --- a/ethcore/src/pv64/mod.rs +++ b/ethcore/src/pv64/mod.rs @@ -39,9 +39,10 @@ use util::rlp::{DecoderError, Stream, Rlp, RlpStream, UntrustedRlp, View}; pub struct BlockChunker<'a> { client: &'a BlockChainClient, // block, receipt rlp pairs. - rlps: VecDeque<(Bytes, Bytes)>, + rlps: VecDeque, genesis_hash: H256, current_hash: H256, + hashes: Vec, } impl<'a> BlockChunker<'a> { @@ -53,6 +54,7 @@ impl<'a> BlockChunker<'a> { rlps: VecDeque::new(), genesis_hash: genesis_hash, current_hash: best_block_hash, + hashes: Vec::new(), } } @@ -68,7 +70,13 @@ impl<'a> BlockChunker<'a> { let block = self.client.block(BlockID::Hash(self.current_hash)).unwrap(); let receipts = self.client.block_receipts(&self.current_hash).unwrap(); - let new_loaded_size = loaded_size + (block.len() + receipts.len()); + let pair = { + let mut pair_stream = RlpStream::new_list(2); + pair_stream.append(&block).append(&receipts); + pair_stream.out() + }; + + let new_loaded_size = loaded_size + pair.len(); // cut off the chunk if too large if new_loaded_size > PREFERRED_CHUNK_SIZE { @@ -78,8 +86,6 @@ impl<'a> BlockChunker<'a> { } self.current_hash = BlockView::new(&block).header_view().parent_hash(); - - self.rlps.push_front((block, receipts)); blocks_loaded += 1; } @@ -91,11 +97,11 @@ impl<'a> BlockChunker<'a> { } // write out the data in the buffers to a chunk on disk - fn write_chunk(&mut self, path: &Path) -> H256 { + fn write_chunk(&mut self, path: &Path) -> Result<(), Error> { // Todo [rob]: compress raw data, put parent hash and block number into chunk. let mut rlp_stream = RlpStream::new_list(self.rlps.len()); - for (block, receipts) in self.rlps.drain(..) { - rlp_stream.begin_list(2).append(&block).append(&receipts); + for pair in self.rlps.drain(..) { + rlp_stream.append(&pair); } let raw_data = rlp_stream.out(); @@ -106,10 +112,11 @@ impl<'a> BlockChunker<'a> { let mut file_path = path.to_owned(); file_path.push(hash.hex()); - let mut file = File::create(file_path).unwrap(); - file.write_all(&raw_data).unwrap(); + let mut file = try!(File::create(file_path)); + try!(file.write_all(&raw_data)); - hash + self.hashes.push(hash); + Ok(()) } /// Create and write out all block chunks to disk, returning a vector of all @@ -117,18 +124,16 @@ impl<'a> BlockChunker<'a> { /// /// The path parameter is the directory to store the block chunks in. /// This function assumes the directory exists already. - pub fn chunk_all(mut self, path: &Path) -> Vec { - let mut chunk_hashes = Vec::new(); - + pub fn chunk_all(mut self, path: &Path) -> Result, Error> { while self.fill_buffers() { - chunk_hashes.push(self.write_chunk(path)); + try!(self.write_chunk(path)); } if self.rlps.len() != 0 { - chunk_hashes.push(self.write_chunk(path)); + try!(self.write_chunk(path)); } - chunk_hashes + Ok(self.hashes) } } @@ -145,7 +150,7 @@ impl<'a> StateChunker<'a> { // // If the buffer is greater than the desired chunk size, // this will write out the data to disk. - fn push(&mut self, key: Bytes, value: Bytes) { + fn push(&mut self, key: Bytes, value: Bytes) -> Result<(), Error> { let pair = { let mut stream = RlpStream::new_list(2); stream.append(&key).append(&value); @@ -153,16 +158,18 @@ impl<'a> StateChunker<'a> { }; if self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE { - self.write_chunk(); + try!(self.write_chunk()); } self.cur_size += pair.len(); self.rlps.push(pair); + + Ok(()) } // Write out the buffer to disk, pushing the created chunk's hash to // the list. - fn write_chunk(&mut self) { + fn write_chunk(&mut self) -> Result<(), Error> { trace!(target: "pv64_snapshot", "writing state chunk. uncompressed size: {}", self.cur_size); let bytes = { @@ -178,11 +185,13 @@ impl<'a> StateChunker<'a> { let mut path = self.snapshot_path.to_owned(); path.push(hash.hex()); - let mut file = File::create(path).unwrap(); - file.write_all(&bytes).unwrap(); + let mut file = try!(File::create(path)); + try!(file.write_all(&bytes)); self.hashes.push(hash); self.cur_size = 0; + + Ok(()) } /// Walk the given state database starting from the given root, @@ -210,11 +219,11 @@ impl<'a> StateChunker<'a> { let account_db = AccountDB::from_hash(db, account_key_hash); let fat_rlp = try!(account.to_fat_rlp(&account_db)); - chunker.push(account_key, fat_rlp); + try!(chunker.push(account_key, fat_rlp)); } if chunker.cur_size != 0 { - chunker.write_chunk(); + try!(chunker.write_chunk()); } Ok(chunker.hashes)