diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index dacd9ba52..cef32bc93 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -23,11 +23,10 @@ use snapshot::Error; use util::{U256, H256, Bytes, HashDB, SHA3_EMPTY, SHA3_NULL_RLP}; use util::trie::{TrieDB, Trie}; use rlp::{RlpStream, UntrustedRlp}; -use itertools::Itertools; use std::collections::HashSet; -// An empty account -- these are replaced with RLP null data for a space optimization. +// An empty account -- these were replaced with RLP null data for a space optimization in v1. const ACC_EMPTY: BasicAccount = BasicAccount { nonce: U256([0, 0, 0, 0]), balance: U256([0, 0, 0, 0]), @@ -62,28 +61,19 @@ impl CodeState { } // walk the account's storage trie, returning a vector of RLP items containing the -// account properties and the storage. Each item contains at most `max_storage_items` +// account address hash, account properties and the storage. Each item contains at most `max_storage_items` // storage records split according to snapshot format definition. -pub fn to_fat_rlps(acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet, max_storage_items: usize) -> Result, Error> { - if acc == &ACC_EMPTY { - return Ok(vec![::rlp::NULL_RLP.to_vec()]); - } - +pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet, first_chunk_size: usize, max_chunk_size: usize) -> Result, Error> { let db = TrieDB::new(acct_db, &acc.storage_root)?; + let mut chunks = Vec::new(); + let mut db_iter = db.iter()?; + let mut target_chunk_size = first_chunk_size; + let mut account_stream = RlpStream::new_list(2); + let mut leftover: Option> = None; + loop { + account_stream.append(account_hash); + account_stream.begin_list(5); - let chunks = db.iter()?.chunks(max_storage_items); - let pair_chunks = chunks.into_iter().map(|chunk| chunk.collect()); - pair_chunks.pad_using(1, |_| Vec::new(), ).map(|pairs| { - let mut stream = RlpStream::new_list(pairs.len()); - - for r in pairs { - let (k, v) = r?; - stream.begin_list(2).append(&k).append(&&*v); - } - - let pairs_rlp = stream.out(); - - let mut account_stream = RlpStream::new_list(5); account_stream.append(&acc.nonce) .append(&acc.balance); @@ -105,9 +95,49 @@ pub fn to_fat_rlps(acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut Hash } } - account_stream.append_raw(&pairs_rlp, 1); - Ok(account_stream.out()) - }).collect() + account_stream.begin_unbounded_list(); + if account_stream.len() > target_chunk_size { + // account does not fit, push an empty record to mark a new chunk + target_chunk_size = max_chunk_size; + chunks.push(Vec::new()); + } + + if let Some(pair) = leftover.take() { + if !account_stream.append_raw_checked(&pair, 1, target_chunk_size) { + return Err(Error::ChunkTooSmall); + } + } + + loop { + match db_iter.next() { + Some(Ok((k, v))) => { + let pair = { + let mut stream = RlpStream::new_list(2); + stream.append(&k).append(&&*v); + stream.drain() + }; + if !account_stream.append_raw_checked(&pair, 1, target_chunk_size) { + account_stream.complete_unbounded_list(); + let stream = ::std::mem::replace(&mut account_stream, RlpStream::new_list(2)); + chunks.push(stream.out()); + target_chunk_size = max_chunk_size; + leftover = Some(pair.to_vec()); + break; + } + }, + Some(Err(e)) => { + return Err(e.into()); + }, + None => { + account_stream.complete_unbounded_list(); + let stream = ::std::mem::replace(&mut account_stream, RlpStream::new_list(2)); + chunks.push(stream.out()); + return Ok(chunks); + } + } + + } + } } // decode a fat rlp, and rebuild the storage trie as we go. @@ -181,7 +211,7 @@ mod tests { use snapshot::tests::helpers::fill_storage; use util::sha3::{SHA3_EMPTY, SHA3_NULL_RLP}; - use util::{Address, H256, HashDB, DBValue}; + use util::{Address, H256, HashDB, DBValue, Hashable}; use rlp::UntrustedRlp; use std::collections::HashSet; @@ -203,8 +233,8 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp), account); - let fat_rlps = to_fat_rlps(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), usize::max_value()).unwrap(); - let fat_rlp = UntrustedRlp::new(&fat_rlps[0]); + let fat_rlps = to_fat_rlps(&addr.sha3(), &account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap(); + let fat_rlp = UntrustedRlp::new(&fat_rlps[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account); } @@ -228,8 +258,8 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp), account); - let fat_rlp = to_fat_rlps(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), usize::max_value()).unwrap(); - let fat_rlp = UntrustedRlp::new(&fat_rlp[0]); + let fat_rlp = to_fat_rlps(&addr.sha3(), &account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap(); + let fat_rlp = UntrustedRlp::new(&fat_rlp[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account); } @@ -253,11 +283,11 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp), account); - let fat_rlps = to_fat_rlps(&account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), 100).unwrap(); + let fat_rlps = to_fat_rlps(&addr.sha3(), &account, &AccountDB::new(db.as_hashdb(), &addr), &mut Default::default(), 500, 1000).unwrap(); let mut root = SHA3_NULL_RLP; let mut restored_account = None; for rlp in fat_rlps { - let fat_rlp = UntrustedRlp::new(&rlp); + let fat_rlp = UntrustedRlp::new(&rlp).at(1).unwrap(); restored_account = Some(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, root).unwrap().0); root = restored_account.as_ref().unwrap().storage_root.clone(); } @@ -297,12 +327,12 @@ mod tests { let mut used_code = HashSet::new(); - let fat_rlp1 = to_fat_rlps(&account1, &AccountDB::new(db.as_hashdb(), &addr1), &mut used_code, usize::max_value()).unwrap(); - let fat_rlp2 = to_fat_rlps(&account2, &AccountDB::new(db.as_hashdb(), &addr2), &mut used_code, usize::max_value()).unwrap(); + let fat_rlp1 = to_fat_rlps(&addr1.sha3(), &account1, &AccountDB::new(db.as_hashdb(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); + let fat_rlp2 = to_fat_rlps(&addr2.sha3(), &account2, &AccountDB::new(db.as_hashdb(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); assert_eq!(used_code.len(), 1); - let fat_rlp1 = UntrustedRlp::new(&fat_rlp1[0]); - let fat_rlp2 = UntrustedRlp::new(&fat_rlp2[0]); + let fat_rlp1 = UntrustedRlp::new(&fat_rlp1[0]).at(1).unwrap(); + let fat_rlp2 = UntrustedRlp::new(&fat_rlp2[0]).at(1).unwrap(); let (acc, maybe_code) = from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr2), fat_rlp2, H256::zero()).unwrap(); assert!(maybe_code.is_none()); @@ -316,9 +346,6 @@ mod tests { #[test] fn encoding_empty_acc() { let mut db = get_temp_state_db(); - let mut used_code = HashSet::new(); - - assert_eq!(to_fat_rlps(&ACC_EMPTY, &AccountDB::new(db.as_hashdb(), &Address::default()), &mut used_code, usize::max_value()).unwrap(), vec![::rlp::NULL_RLP.to_vec()]); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &Address::default()), UntrustedRlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None)); } } diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index da8a9816b..c1391b300 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -55,6 +55,8 @@ pub enum Error { Io(::std::io::Error), /// Snapshot version is not supported. VersionNotSupported(u64), + /// Max chunk size is to small to fit basic account data. + ChunkTooSmall, } impl fmt::Display for Error { @@ -76,6 +78,7 @@ impl fmt::Display for Error { Error::Decoder(ref err) => err.fmt(f), Error::Trie(ref err) => err.fmt(f), Error::VersionNotSupported(ref ver) => write!(f, "Snapshot version {} is not supprted.", ver), + Error::ChunkTooSmall => write!(f, "Chunk size is too small."), } } } diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 97eceeab3..69dbc943d 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -83,9 +83,6 @@ mod traits { // Try to have chunks be around 4MB (before compression) const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; -// Try to have chunks be around 4MB (before compression) -const MAX_STORAGE_ENTRIES_PER_ACCOUNT_RECORD: usize = 80_000; - // How many blocks to include in a snapshot, starting from the head of the chain. const SNAPSHOT_BLOCKS: u64 = 30000; @@ -305,20 +302,9 @@ impl<'a> StateChunker<'a> { // // If the buffer is greater than the desired chunk size, // this will write out the data to disk. - fn push(&mut self, account_hash: Bytes, data: Bytes, force_chunk: bool) -> Result<(), Error> { - let pair = { - let mut stream = RlpStream::new_list(2); - stream.append(&account_hash).append_raw(&data, 1); - stream.out() - }; - - if force_chunk || self.cur_size + pair.len() >= PREFERRED_CHUNK_SIZE { - self.write_chunk()?; - } - - self.cur_size += pair.len(); - self.rlps.push(pair); - + fn push(&mut self, data: Bytes) -> Result<(), Error> { + self.cur_size += data.len(); + self.rlps.push(data); Ok(()) } @@ -348,6 +334,11 @@ impl<'a> StateChunker<'a> { Ok(()) } + + // Get current chunk size. + fn chunk_size(&self) -> usize { + self.cur_size + } } /// Walk the given state database starting from the given root, @@ -377,9 +368,12 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex 0)?; + if i > 0 { + chunker.write_chunk()?; + } + chunker.push(fat_rlp)?; } } diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index cbe7e172f..744b86577 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -122,10 +122,9 @@ fn get_code_from_prev_chunk() { let mut db = MemoryDB::new(); AccountDBMut::from_hash(&mut db, hash).insert(&code[..]); - let fat_rlp = account::to_fat_rlps(&acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value()).unwrap(); - + let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); let mut stream = RlpStream::new_list(1); - stream.begin_list(2).append(&hash).append_raw(&fat_rlp[0], 1); + stream.append_raw(&fat_rlp[0], 1); stream.out() }; diff --git a/util/rlp/src/stream.rs b/util/rlp/src/stream.rs index 318e019fc..11a16b859 100644 --- a/util/rlp/src/stream.rs +++ b/util/rlp/src/stream.rs @@ -23,11 +23,11 @@ use traits::Encodable; struct ListInfo { position: usize, current: usize, - max: usize, + max: Option, } impl ListInfo { - fn new(position: usize, max: usize) -> ListInfo { + fn new(position: usize, max: Option) -> ListInfo { ListInfo { position: position, current: 0, @@ -133,7 +133,7 @@ impl RlpStream { self.buffer.push(0); let position = self.buffer.len(); - self.unfinished_lists.push(ListInfo::new(position, len)); + self.unfinished_lists.push(ListInfo::new(position, Some(len))); }, } @@ -141,6 +141,19 @@ impl RlpStream { self } + + /// Declare appending the list of unknown size, chainable. + pub fn begin_unbounded_list(&mut self) -> &mut RlpStream { + self.finished_list = false; + // payload is longer than 1 byte only for lists > 55 bytes + // by pushing always this 1 byte we may avoid unnecessary shift of data + self.buffer.push(0); + let position = self.buffer.len(); + self.unfinished_lists.push(ListInfo::new(position, None)); + // return chainable self + self + } + /// Apends null to the end of stream, chainable. /// /// ```rust @@ -177,6 +190,36 @@ impl RlpStream { self } + /// Appends raw (pre-serialised) RLP data. Checks for size oveflow. + pub fn append_raw_checked<'a>(&'a mut self, bytes: &[u8], item_count: usize, max_size: usize) -> bool { + if self.estimate_size(bytes.len()) > max_size { + return false; + } + self.append_raw(bytes, item_count); + true + } + + /// Calculate total RLP size for appended payload. + pub fn estimate_size<'a>(&'a self, add: usize) -> usize { + let total_size = self.buffer.len() + add; + let mut base_size = total_size; + for list in &self.unfinished_lists[..] { + let len = total_size - list.position; + if len > 55 { + let leading_empty_bytes = (len as u64).leading_zeros() as usize / 8; + let size_bytes = 8 - leading_empty_bytes; + base_size += size_bytes; + } + } + base_size + } + + + /// Returns current RLP size in bytes for the data pushed into the list. + pub fn len<'a>(&'a self) -> usize { + self.estimate_size(0) + } + /// Clear the output stream so far. /// /// ```rust @@ -246,10 +289,11 @@ impl RlpStream { None => false, Some(ref mut x) => { x.current += inserted_items; - if x.current > x.max { - panic!("You cannot append more items then you expect!"); + match x.max { + Some(ref max) if x.current > *max => panic!("You cannot append more items then you expect!"), + Some(ref max) => x.current == *max, + _ => false, } - x.current == x.max } }; @@ -273,6 +317,17 @@ impl RlpStream { false => panic!() } } + + /// Finalize current ubnbound list. Panics if no unbounded list has been opened. + pub fn complete_unbounded_list(&mut self) { + let list = self.unfinished_lists.pop().expect("No open list."); + if list.max.is_some() { + panic!("List type mismatch."); + } + let len = self.buffer.len() - list.position; + self.encoder().insert_list_payload(len, list.position); + self.note_appended(1); + } } pub struct BasicEncoder<'a> { diff --git a/util/rlp/tests/tests.rs b/util/rlp/tests/tests.rs index 1c996caac..d207034ce 100644 --- a/util/rlp/tests/tests.rs +++ b/util/rlp/tests/tests.rs @@ -412,3 +412,25 @@ fn test_rlp_list_length_overflow() { let as_val: Result = rlp.val_at(0); assert_eq!(Err(DecoderError::RlpIsTooShort), as_val); } + +#[test] +fn test_rlp_stream_size_limit() { + for limit in 40 .. 270 { + let item = [0u8; 1]; + let mut stream = RlpStream::new(); + while stream.append_raw_checked(&item, 1, limit) {} + assert_eq!(stream.drain().len(), limit); + } +} + +#[test] +fn test_rlp_stream_unbounded_list() { + let mut stream = RlpStream::new(); + stream.begin_unbounded_list(); + stream.append(&40u32); + stream.append(&41u32); + assert!(!stream.is_finished()); + stream.complete_unbounded_list(); + assert!(stream.is_finished()); +} +