diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 7257ed04e..6d62d0e8e 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -16,9 +16,6 @@ //! Snapshot creation helpers. -// Try to have chunks be around 16MB (before compression) -const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024; - use std::collections::VecDeque; use std::fs::File; use std::io::Write; @@ -35,6 +32,31 @@ use util::hash::{FixedHash, H256}; use util::numbers::U256; use util::rlp::{DecoderError, Rlp, RlpStream, Stream, SHA3_NULL_RLP, UntrustedRlp, View}; use util::snappy; + +// Try to have chunks be around 16MB (before compression) +const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024; + +// use initially 20MB for the reusable snappy buffers. +// should always be larger than PREFERRED_CHUNK_SIZE for fault tolerance. +const SNAPPY_BUFFER_SIZE: usize = 20 * 1024 * 1024; + +// compresses the data into the buffer, resizing if necessary. +fn compression_helper(input: &[u8], output: &mut Vec) -> usize { + let max_size = snappy::max_compressed_len(input.len()); + let buf_len = output.len(); + + // resize if necessary, but in reality this will probably never happen. + if max_size > buf_len { + output.resize(max_size, 0); + } + + match snappy::compress_into(&input, output) { + Ok(size) => size, + Err(snappy::Error::BufferTooSmall) => panic!("buffer too small although capacity ensured?"), + Err(snappy::Error::InvalidInput) => panic!("invalid input error impossible in snappy_compress"), + } +} + /// Used to build block chunks. struct BlockChunker<'a> { client: &'a BlockChainClient, @@ -43,6 +65,7 @@ struct BlockChunker<'a> { genesis_hash: H256, current_hash: H256, hashes: Vec, + snappy_buffer: Vec, } impl<'a> BlockChunker<'a> { @@ -93,7 +116,10 @@ impl<'a> BlockChunker<'a> { rlp_stream.append(&pair); } - let raw_data = snappy::compress(&rlp_stream.out()); + let uncompressed = rlp_stream.out(); + let compressed_size = compression_helper(&uncompressed, &mut self.snappy_buffer); + let raw_data = &self.snappy_buffer[..compressed_size]; + let hash = raw_data.sha3(); trace!(target: "snapshot", "writing block chunk. hash: {}, size: {} bytes", hash.hex(), raw_data.len()); @@ -121,6 +147,7 @@ pub fn chunk_blocks(client: &BlockChainClient, best_block_hash: H256, genesis_ha genesis_hash: genesis_hash, current_hash: best_block_hash, hashes: Vec::new(), + snappy_buffer: vec![0; SNAPPY_BUFFER_SIZE], }; while chunker.fill_buffers() { @@ -138,6 +165,7 @@ struct StateChunker<'a> { rlps: Vec, cur_size: usize, snapshot_path: &'a Path, + snappy_buffer: Vec, } impl<'a> StateChunker<'a> { @@ -167,21 +195,24 @@ impl<'a> StateChunker<'a> { fn write_chunk(&mut self) -> Result<(), Error> { trace!(target: "snapshot", "writing state chunk. uncompressed size: {}", self.cur_size); - let bytes = { + let compressed_size = { let mut stream = RlpStream::new(); stream.append(&&self.rlps[..]); - snappy::compress(&stream.out()) + + compression_helper(&stream.out(), &mut self.snappy_buffer) }; self.rlps.clear(); + let bytes = &self.snappy_buffer[..compressed_size]; + let hash = bytes.sha3(); let mut path = self.snapshot_path.to_owned(); path.push(hash.hex()); let mut file = try!(File::create(path)); - try!(file.write_all(&bytes)); + try!(file.write_all(bytes)); self.hashes.push(hash); self.cur_size = 0; @@ -203,6 +234,7 @@ pub fn chunk_state(db: &HashDB, root: &H256, path: &Path) -> Result, E rlps: Vec::new(), cur_size: 0, snapshot_path: path, + snappy_buffer: vec![0; SNAPPY_BUFFER_SIZE], }; trace!(target: "snapshot", "beginning state chunking"); diff --git a/util/src/snappy.rs b/util/src/snappy.rs index 519575e4c..ed47fa9e8 100644 --- a/util/src/snappy.rs +++ b/util/src/snappy.rs @@ -68,9 +68,14 @@ impl fmt::Display for Error { } } +/// The maximum compressed length given a size. +pub fn max_compressed_len(len: usize) -> usize { + unsafe { snappy_max_compressed_length(len as size_t) as usize } +} + /// Compress a buffer using snappy. pub fn compress(input: &[u8]) -> Vec { - let mut buf_size = unsafe { snappy_max_compressed_length(input.len() as size_t) }; + let mut buf_size = max_compressed_len(input.len()); let mut output = vec![0; buf_size as usize]; buf_size = compress_into(input, &mut output).expect("snappy compression failed with large enough buffer."); @@ -79,7 +84,7 @@ pub fn compress(input: &[u8]) -> Vec { } /// Compress a buffer using snappy, writing the result into -/// the given output buffer. Will error if the buffer is too small. +/// the given output buffer. Will error iff the buffer is too small. /// Otherwise, returns the length of the compressed data. pub fn compress_into(input: &[u8], output: &mut [u8]) -> Result { let mut len = output.len() as size_t;