From b18407b9e383ff04b889f15d8fad8fc36033f401 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 25 Aug 2016 14:28:45 +0200 Subject: [PATCH] Snapshot optimizations (#1991) * apply RLP compression to abridged blocks * add memorydb consolidate * code hash optimization * add warning to snapshot restoration CLI --- ethcore/src/snapshot/account.rs | 147 +++++++++++++++++++++++--- ethcore/src/snapshot/block.rs | 12 ++- ethcore/src/snapshot/error.rs | 6 ++ ethcore/src/snapshot/mod.rs | 91 +++++++++++++--- ethcore/src/snapshot/service.rs | 2 + ethcore/src/snapshot/tests/state.rs | 1 + parity/snapshot.rs | 1 + util/src/journaldb/archivedb.rs | 4 + util/src/journaldb/earlymergedb.rs | 4 + util/src/journaldb/overlayrecentdb.rs | 4 + util/src/journaldb/refcounteddb.rs | 13 +++ util/src/journaldb/traits.rs | 3 + util/src/memorydb.rs | 36 +++++++ 13 files changed, 287 insertions(+), 37 deletions(-) diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index 3c31bab0d..fcd7b6abc 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -22,6 +22,34 @@ use util::rlp::{Rlp, RlpStream, Stream, UntrustedRlp, View}; use util::trie::{TrieDB, Trie}; use snapshot::Error; +use std::collections::{HashMap, HashSet}; + +// whether an encoded account has code and how it is referred to. +#[repr(u8)] +enum CodeState { + // the account has no code. + Empty = 0, + // raw code is encoded. + Inline = 1, + // the code is referred to by hash. + Hash = 2, +} + +impl CodeState { + fn from(x: u8) -> Result { + match x { + 0 => Ok(CodeState::Empty), + 1 => Ok(CodeState::Inline), + 2 => Ok(CodeState::Hash), + _ => Err(Error::UnrecognizedCodeState(x)) + } + } + + fn raw(self) -> u8 { + self as u8 + } +} + // An alternate account structure from ::account::Account. #[derive(PartialEq, Clone, Debug)] pub struct Account { @@ -58,7 +86,7 @@ impl Account { // walk the account's storage trie, returning an RLP item containing the // account properties and the storage. - pub fn to_fat_rlp(&self, acct_db: &AccountDB) -> Result { + pub fn to_fat_rlp(&self, acct_db: &AccountDB, used_code: &mut HashSet) -> Result { let db = try!(TrieDB::new(acct_db, &self.storage_root)); let mut pairs = Vec::new(); @@ -81,11 +109,14 @@ impl Account { // [has_code, code_hash]. if self.code_hash == SHA3_EMPTY { - account_stream.append(&false).append_empty_data(); + account_stream.append(&CodeState::Empty.raw()).append_empty_data(); + } else if used_code.contains(&self.code_hash) { + account_stream.append(&CodeState::Hash.raw()).append(&self.code_hash); } else { match acct_db.get(&self.code_hash) { Some(c) => { - account_stream.append(&true).append(&c); + used_code.insert(self.code_hash.clone()); + account_stream.append(&CodeState::Inline.raw()).append(&c); } None => { warn!("code lookup failed during snapshot"); @@ -100,16 +131,39 @@ impl Account { } // decode a fat rlp, and rebuild the storage trie as we go. - pub fn from_fat_rlp(acct_db: &mut AccountDBMut, rlp: UntrustedRlp) -> Result { + // returns the account structure along with its newly recovered code, + // if it exists. + pub fn from_fat_rlp( + acct_db: &mut AccountDBMut, + rlp: UntrustedRlp, + code_map: &HashMap, + ) -> Result<(Self, Option), Error> { use util::{TrieDBMut, TrieMut}; let nonce = try!(rlp.val_at(0)); let balance = try!(rlp.val_at(1)); - let code_hash = if try!(rlp.val_at(2)) { - let code: Bytes = try!(rlp.val_at(3)); - acct_db.insert(&code) - } else { - SHA3_EMPTY + let code_state: CodeState = { + let raw: u8 = try!(rlp.val_at(2)); + try!(CodeState::from(raw)) + }; + + // load the code if it exists. + let (code_hash, new_code) = match code_state { + CodeState::Empty => (SHA3_EMPTY, None), + CodeState::Inline => { + let code: Bytes = try!(rlp.val_at(3)); + let code_hash = acct_db.insert(&code); + + (code_hash, Some(code)) + } + CodeState::Hash => { + let code_hash = try!(rlp.val_at(3)); + if let Some(code) = code_map.get(&code_hash) { + acct_db.emplace(code_hash.clone(), code.clone()); + } + + (code_hash, None) + } }; let mut storage_root = H256::zero(); @@ -124,12 +178,20 @@ impl Account { try!(storage_trie.insert(&k, &v)); } } - Ok(Account { + + let acc = Account { nonce: nonce, balance: balance, storage_root: storage_root, code_hash: code_hash, - }) + }; + + Ok((acc, new_code)) + } + + /// Get the account's code hash. + pub fn code_hash(&self) -> &H256 { + &self.code_hash } #[cfg(test)] @@ -145,9 +207,11 @@ mod tests { use snapshot::tests::helpers::fill_storage; use util::{SHA3_NULL_RLP, SHA3_EMPTY}; - use util::{Address, FixedHash, H256}; + use util::{Address, FixedHash, H256, HashDB}; use util::rlp::{UntrustedRlp, View}; + use std::collections::{HashSet, HashMap}; + use super::Account; #[test] @@ -166,9 +230,9 @@ mod tests { let thin_rlp = account.to_thin_rlp(); assert_eq!(Account::from_thin_rlp(&thin_rlp), account); - let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap(); + let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr), &mut Default::default()).unwrap(); let fat_rlp = UntrustedRlp::new(&fat_rlp); - assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account); + assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, &Default::default()).unwrap().0, account); } #[test] @@ -192,8 +256,59 @@ mod tests { let thin_rlp = account.to_thin_rlp(); assert_eq!(Account::from_thin_rlp(&thin_rlp), account); - let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap(); + let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr), &mut Default::default()).unwrap(); let fat_rlp = UntrustedRlp::new(&fat_rlp); - assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account); + assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, &Default::default()).unwrap().0, account); + } + + #[test] + fn encoding_code() { + let mut db = get_temp_journal_db(); + let mut db = &mut **db; + + let addr1 = Address::random(); + let addr2 = Address::random(); + + let code_hash = { + let mut acct_db = AccountDBMut::new(db.as_hashdb_mut(), &addr1); + acct_db.insert(b"this is definitely code") + }; + + { + let mut acct_db = AccountDBMut::new(db.as_hashdb_mut(), &addr2); + acct_db.emplace(code_hash.clone(), b"this is definitely code".to_vec()); + } + + let account1 = Account { + nonce: 50.into(), + balance: 123456789.into(), + storage_root: SHA3_NULL_RLP, + code_hash: code_hash, + }; + + let account2 = Account { + nonce: 400.into(), + balance: 98765432123456789usize.into(), + storage_root: SHA3_NULL_RLP, + code_hash: code_hash, + }; + + let mut used_code = HashSet::new(); + + let fat_rlp1 = account1.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr1), &mut used_code).unwrap(); + let fat_rlp2 = account2.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr2), &mut used_code).unwrap(); + assert_eq!(used_code.len(), 1); + + let fat_rlp1 = UntrustedRlp::new(&fat_rlp1); + let fat_rlp2 = UntrustedRlp::new(&fat_rlp2); + + let code_map = HashMap::new(); + let (acc, maybe_code) = Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr2), fat_rlp2, &code_map).unwrap(); + assert!(maybe_code.is_none()); + assert_eq!(acc, account2); + + let (acc, maybe_code) = Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr1), fat_rlp1, &code_map).unwrap(); + assert_eq!(maybe_code, Some(b"this is definitely code".to_vec())); + assert_eq!(acc, account1); } } diff --git a/ethcore/src/snapshot/block.rs b/ethcore/src/snapshot/block.rs index 5cb1ed640..f317cf54e 100644 --- a/ethcore/src/snapshot/block.rs +++ b/ethcore/src/snapshot/block.rs @@ -21,6 +21,7 @@ use header::Header; use views::BlockView; use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View}; +use util::rlp::{Compressible, RlpType}; use util::{Bytes, Hashable, H256}; const HEADER_FIELDS: usize = 10; @@ -31,10 +32,10 @@ pub struct AbridgedBlock { } impl AbridgedBlock { - /// Create from a vector of bytes. Does no verification. - pub fn from_raw(rlp: Bytes) -> Self { + /// Create from rlp-compressed bytes. Does no verification. + pub fn from_raw(compressed: Bytes) -> Self { AbridgedBlock { - rlp: rlp, + rlp: compressed, } } @@ -78,7 +79,7 @@ impl AbridgedBlock { } AbridgedBlock { - rlp: stream.out(), + rlp: UntrustedRlp::new(stream.as_raw()).compress(RlpType::Blocks).to_vec(), } } @@ -86,7 +87,8 @@ impl AbridgedBlock { /// /// Will fail if contains invalid rlp. pub fn to_block(&self, parent_hash: H256, number: u64) -> Result { - let rlp = UntrustedRlp::new(&self.rlp); + let rlp = UntrustedRlp::new(&self.rlp).decompress(RlpType::Blocks); + let rlp = UntrustedRlp::new(&rlp); let mut header = Header { parent_hash: parent_hash, diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index d41d7cd2f..d4587fdba 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -35,6 +35,10 @@ pub enum Error { IncompleteChain, /// Old starting block in a pruned database. OldBlockPrunedDB, + /// Missing code. + MissingCode(Vec), + /// Unrecognized code encoding. + UnrecognizedCodeState(u8), /// Trie error. Trie(TrieError), /// Decoder error. @@ -51,6 +55,8 @@ impl fmt::Display for Error { Error::IncompleteChain => write!(f, "Cannot create snapshot due to incomplete chain."), Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \ a pruned database. Please re-run with the --pruning archive flag."), + Error::MissingCode(ref missing) => write!(f, "Incomplete snapshot: {} contract codes not found.", missing.len()), + Error::UnrecognizedCodeState(state) => write!(f, "Unrecognized code encoding ({})", state), Error::Io(ref err) => err.fmt(f), Error::Decoder(ref err) => err.fmt(f), Error::Trie(ref err) => err.fmt(f), diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 4e33c9ebc..84afb7cc1 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -16,7 +16,7 @@ //! Snapshot creation, restoration, and network service. -use std::collections::VecDeque; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -27,6 +27,7 @@ use ids::BlockID; use views::BlockView; use util::{Bytes, Hashable, HashDB, snappy}; +use util::memorydb::MemoryDB; use util::Mutex; use util::hash::{FixedHash, H256}; use util::journaldb::{self, Algorithm, JournalDB}; @@ -332,6 +333,8 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex(db: &HashDB, root: &H256, writer: &Mutex, state_root: H256, + code_map: HashMap, // maps code hashes to code itself. + missing_code: HashMap>, // maps code hashes to lists of accounts missing that code. } impl StateRebuilder { @@ -411,6 +416,8 @@ impl StateRebuilder { StateRebuilder { db: journaldb::new(db.clone(), pruning, ::db::COL_STATE), state_root: SHA3_NULL_RLP, + code_map: HashMap::new(), + missing_code: HashMap::new(), } } @@ -419,41 +426,57 @@ impl StateRebuilder { let rlp = UntrustedRlp::new(chunk); let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); let mut pairs = Vec::with_capacity(rlp.item_count()); - let backing = self.db.backing().clone(); // initialize the pairs vector with empty values so we have slots to write into. pairs.resize(rlp.item_count(), (H256::new(), Vec::new())); let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1; + // new code contained within this chunk. + let mut chunk_code = HashMap::new(); + // build account tries in parallel. // Todo [rob] keep a thread pool around so we don't do this per-chunk. try!(scope(|scope| { let mut handles = Vec::new(); for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) { - let mut db = self.db.boxed_clone(); - let handle: ScopedJoinHandle, ::error::Error>> = scope.spawn(move || { - try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk)); + let code_map = &self.code_map; + let handle: ScopedJoinHandle> = scope.spawn(move || { + let mut db = MemoryDB::new(); + let status = try!(rebuild_accounts(&mut db, account_chunk, out_pairs_chunk, code_map)); trace!(target: "snapshot", "thread rebuilt {} account tries", account_chunk.len()); - Ok(db) + Ok((db, status)) }); handles.push(handle); } - // commit all account tries to the db, but only in this thread. - let batch = backing.transaction(); + // consolidate all edits into the main overlay. for handle in handles { - let mut thread_db = try!(handle.join()); - try!(thread_db.inject(&batch)); - } - try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); + let (thread_db, status): (MemoryDB, _) = try!(handle.join()); + self.db.consolidate(thread_db); + chunk_code.extend(status.new_code); + + for (addr_hash, code_hash) in status.missing_code { + self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash); + } + } Ok::<_, ::error::Error>(()) })); + // patch up all missing code. must be done after collecting all new missing code entries. + for (code_hash, code) in chunk_code { + for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) { + let mut db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), addr_hash); + db.emplace(code_hash, code.clone()); + } + + self.code_map.insert(code_hash, code); + } + // batch trie writes { @@ -468,6 +491,7 @@ impl StateRebuilder { } } + let backing = self.db.backing().clone(); let batch = backing.transaction(); try!(self.db.inject(&batch)); try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); @@ -475,11 +499,36 @@ impl StateRebuilder { Ok(()) } + /// Check for accounts missing code. Once all chunks have been fed, there should + /// be none. + pub fn check_missing(&self) -> Result<(), Error> { + let missing = self.missing_code.keys().cloned().collect::>(); + match missing.is_empty() { + true => Ok(()), + false => Err(Error::MissingCode(missing)), + } + } + /// Get the state root of the rebuilder. pub fn state_root(&self) -> H256 { self.state_root } } -fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), ::error::Error> { +#[derive(Default)] +struct RebuiltStatus { + new_code: Vec<(H256, Bytes)>, // new code that's become available. + missing_code: Vec<(H256, H256)>, // accounts that are missing code. +} + +// rebuild a set of accounts and their storage. +// returns +fn rebuild_accounts( + db: &mut HashDB, + account_chunk: &[&[u8]], + out_chunk: &mut [(H256, Bytes)], + code_map: &HashMap +) -> Result +{ + let mut status = RebuiltStatus::default(); for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) { let account_rlp = UntrustedRlp::new(account_pair); @@ -491,14 +540,24 @@ fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mu let mut acct_db = AccountDBMut::from_hash(db, hash); // fill out the storage trie and code while decoding. - let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp)); + let (acc, maybe_code) = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp, code_map)); + + let code_hash = acc.code_hash().clone(); + match maybe_code { + Some(code) => status.new_code.push((code_hash, code)), + None => { + if code_hash != ::util::SHA3_EMPTY && !code_map.contains_key(&code_hash) { + status.missing_code.push((hash, code_hash)); + } + } + } acc.to_thin_rlp() }; *out = (hash, thin_rlp); } - Ok(()) + Ok(status) } /// Proportion of blocks which we will verify `PoW` for. diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 576e32c67..45e1184b4 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -125,6 +125,8 @@ impl Restoration { try!(self.state.feed(&self.snappy_buffer[..len])); if self.state_chunks_left.is_empty() { + try!(self.state.check_missing()); + let root = self.state.state_root(); if root != self.final_state_root { warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root); diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index 96cb88106..a293cdb44 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -72,6 +72,7 @@ fn snap_and_restore() { rebuilder.feed(&chunk).unwrap(); } + rebuilder.check_missing().unwrap(); assert_eq!(rebuilder.state_root(), state_root); new_db }; diff --git a/parity/snapshot.rs b/parity/snapshot.rs index c3e43e89f..650123d73 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -108,6 +108,7 @@ impl SnapshotCommand { let (service, _panic_handler) = try!(self.start_service()); warn!("Snapshot restoration is experimental and the format may be subject to change."); + warn!("On encountering an unexpected error, please ensure that you have a recent snapshot."); let snapshot = service.snapshot_service(); let reader = PackedReader::new(Path::new(&file)) diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs index 620728cd6..417d5b865 100644 --- a/util/src/journaldb/archivedb.rs +++ b/util/src/journaldb/archivedb.rs @@ -228,6 +228,10 @@ impl JournalDB for ArchiveDB { fn backing(&self) -> &Arc { &self.backing } + + fn consolidate(&mut self, with: MemoryDB) { + self.overlay.consolidate(with); + } } #[cfg(test)] diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index 4f52abcce..e2543d11c 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -539,6 +539,10 @@ impl JournalDB for EarlyMergeDB { Ok(ops) } + + fn consolidate(&mut self, with: MemoryDB) { + self.overlay.consolidate(with); + } } #[cfg(test)] diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 6e1068fb0..3d1d7e143 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -339,6 +339,10 @@ impl JournalDB for OverlayRecentDB { Ok(ops) } + + fn consolidate(&mut self, with: MemoryDB) { + self.transaction_overlay.consolidate(with); + } } impl HashDB for OverlayRecentDB { diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index 5a2d85c1c..6b37e451f 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -20,6 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use overlaydb::OverlayDB; +use memorydb::MemoryDB; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::traits::JournalDB; use kvdb::{Database, DBTransaction}; @@ -192,6 +193,18 @@ impl JournalDB for RefCountedDB { } self.forward.commit_to_batch(batch) } + + fn consolidate(&mut self, mut with: MemoryDB) { + for (key, (value, rc)) in with.drain() { + for _ in 0..rc { + self.emplace(key.clone(), value.clone()); + } + + for _ in rc..0 { + self.remove(&key); + } + } + } } #[cfg(test)] diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index 96715604e..1a00da1e4 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -61,6 +61,9 @@ pub trait JournalDB: HashDB { /// to the backing strage fn flush(&self) {} + /// Consolidate all the insertions and deletions in the given memory overlay. + fn consolidate(&mut self, overlay: ::memorydb::MemoryDB); + /// Commit all changes in a single batch #[cfg(test)] fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 4376d173c..468ad2ec3 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -174,6 +174,24 @@ impl MemoryDB { } } } + + /// Consolidate all the entries of `other` into `self`. + pub fn consolidate(&mut self, mut other: Self) { + for (key, (value, rc)) in other.drain() { + match self.data.entry(key) { + Entry::Occupied(mut entry) => { + if entry.get().1 < 0 { + entry.get_mut().0 = value; + } + + entry.get_mut().1 += rc; + } + Entry::Vacant(entry) => { + entry.insert((value, rc)); + } + } + } + } } static NULL_RLP_STATIC: [u8; 1] = [0x80; 1]; @@ -310,3 +328,21 @@ fn memorydb_remove_and_purge() { m.remove_and_purge(&hello_key); assert_eq!(m.raw(&hello_key), None); } + +#[test] +fn consolidate() { + let mut main = MemoryDB::new(); + let mut other = MemoryDB::new(); + let remove_key = other.insert(b"doggo"); + main.remove(&remove_key); + + let insert_key = other.insert(b"arf"); + main.emplace(insert_key, b"arf".to_vec()); + + main.consolidate(other); + + let overlay = main.drain(); + + assert_eq!(overlay.get(&remove_key).unwrap(), &(b"doggo".to_vec(), 0)); + assert_eq!(overlay.get(&insert_key).unwrap(), &(b"arf".to_vec(), 2)); +} \ No newline at end of file