compress into reusable buffers

This commit is contained in:
Robert Habermeier 2016-06-14 13:22:15 +02:00
parent 16e58958c9
commit dfb603dd08
2 changed files with 46 additions and 9 deletions

View File

@ -16,9 +16,6 @@
//! Snapshot creation helpers.
// Try to have chunks be around 16MB (before compression)
const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024;
use std::collections::VecDeque;
use std::fs::File;
use std::io::Write;
@ -35,6 +32,31 @@ use util::hash::{FixedHash, H256};
use util::numbers::U256;
use util::rlp::{DecoderError, Rlp, RlpStream, Stream, SHA3_NULL_RLP, UntrustedRlp, View};
use util::snappy;
// Try to have chunks be around 16MB (before compression)
const PREFERRED_CHUNK_SIZE: usize = 16 * 1024 * 1024;
// use initially 20MB for the reusable snappy buffers.
// should always be larger than PREFERRED_CHUNK_SIZE for fault tolerance.
const SNAPPY_BUFFER_SIZE: usize = 20 * 1024 * 1024;
// compresses the data into the buffer, resizing if necessary.
fn compression_helper(input: &[u8], output: &mut Vec<u8>) -> usize {
let max_size = snappy::max_compressed_len(input.len());
let buf_len = output.len();
// resize if necessary, but in reality this will probably never happen.
if max_size > buf_len {
output.resize(max_size, 0);
}
match snappy::compress_into(&input, output) {
Ok(size) => size,
Err(snappy::Error::BufferTooSmall) => panic!("buffer too small although capacity ensured?"),
Err(snappy::Error::InvalidInput) => panic!("invalid input error impossible in snappy_compress"),
}
}
/// Used to build block chunks.
struct BlockChunker<'a> {
client: &'a BlockChainClient,
@ -43,6 +65,7 @@ struct BlockChunker<'a> {
genesis_hash: H256,
current_hash: H256,
hashes: Vec<H256>,
snappy_buffer: Vec<u8>,
}
impl<'a> BlockChunker<'a> {
@ -93,7 +116,10 @@ impl<'a> BlockChunker<'a> {
rlp_stream.append(&pair);
}
let raw_data = snappy::compress(&rlp_stream.out());
let uncompressed = rlp_stream.out();
let compressed_size = compression_helper(&uncompressed, &mut self.snappy_buffer);
let raw_data = &self.snappy_buffer[..compressed_size];
let hash = raw_data.sha3();
trace!(target: "snapshot", "writing block chunk. hash: {}, size: {} bytes", hash.hex(), raw_data.len());
@ -121,6 +147,7 @@ pub fn chunk_blocks(client: &BlockChainClient, best_block_hash: H256, genesis_ha
genesis_hash: genesis_hash,
current_hash: best_block_hash,
hashes: Vec::new(),
snappy_buffer: vec![0; SNAPPY_BUFFER_SIZE],
};
while chunker.fill_buffers() {
@ -138,6 +165,7 @@ struct StateChunker<'a> {
rlps: Vec<Bytes>,
cur_size: usize,
snapshot_path: &'a Path,
snappy_buffer: Vec<u8>,
}
impl<'a> StateChunker<'a> {
@ -167,21 +195,24 @@ impl<'a> StateChunker<'a> {
fn write_chunk(&mut self) -> Result<(), Error> {
trace!(target: "snapshot", "writing state chunk. uncompressed size: {}", self.cur_size);
let bytes = {
let compressed_size = {
let mut stream = RlpStream::new();
stream.append(&&self.rlps[..]);
snappy::compress(&stream.out())
compression_helper(&stream.out(), &mut self.snappy_buffer)
};
self.rlps.clear();
let bytes = &self.snappy_buffer[..compressed_size];
let hash = bytes.sha3();
let mut path = self.snapshot_path.to_owned();
path.push(hash.hex());
let mut file = try!(File::create(path));
try!(file.write_all(&bytes));
try!(file.write_all(bytes));
self.hashes.push(hash);
self.cur_size = 0;
@ -203,6 +234,7 @@ pub fn chunk_state(db: &HashDB, root: &H256, path: &Path) -> Result<Vec<H256>, E
rlps: Vec::new(),
cur_size: 0,
snapshot_path: path,
snappy_buffer: vec![0; SNAPPY_BUFFER_SIZE],
};
trace!(target: "snapshot", "beginning state chunking");

View File

@ -68,9 +68,14 @@ impl fmt::Display for Error {
}
}
/// The maximum compressed length given a size.
pub fn max_compressed_len(len: usize) -> usize {
unsafe { snappy_max_compressed_length(len as size_t) as usize }
}
/// Compress a buffer using snappy.
pub fn compress(input: &[u8]) -> Vec<u8> {
let mut buf_size = unsafe { snappy_max_compressed_length(input.len() as size_t) };
let mut buf_size = max_compressed_len(input.len());
let mut output = vec![0; buf_size as usize];
buf_size = compress_into(input, &mut output).expect("snappy compression failed with large enough buffer.");
@ -79,7 +84,7 @@ pub fn compress(input: &[u8]) -> Vec<u8> {
}
/// Compress a buffer using snappy, writing the result into
/// the given output buffer. Will error if the buffer is too small.
/// the given output buffer. Will error iff the buffer is too small.
/// Otherwise, returns the length of the compressed data.
pub fn compress_into(input: &[u8], output: &mut [u8]) -> Result<usize, Error> {
let mut len = output.len() as size_t;