Snapshot optimizations (#1991)
* apply RLP compression to abridged blocks * add memorydb consolidate * code hash optimization * add warning to snapshot restoration CLI
This commit is contained in:
parent
09e0842f56
commit
b18407b9e3
@ -22,6 +22,34 @@ use util::rlp::{Rlp, RlpStream, Stream, UntrustedRlp, View};
|
||||
use util::trie::{TrieDB, Trie};
|
||||
use snapshot::Error;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
// whether an encoded account has code and how it is referred to.
|
||||
#[repr(u8)]
|
||||
enum CodeState {
|
||||
// the account has no code.
|
||||
Empty = 0,
|
||||
// raw code is encoded.
|
||||
Inline = 1,
|
||||
// the code is referred to by hash.
|
||||
Hash = 2,
|
||||
}
|
||||
|
||||
impl CodeState {
|
||||
fn from(x: u8) -> Result<Self, Error> {
|
||||
match x {
|
||||
0 => Ok(CodeState::Empty),
|
||||
1 => Ok(CodeState::Inline),
|
||||
2 => Ok(CodeState::Hash),
|
||||
_ => Err(Error::UnrecognizedCodeState(x))
|
||||
}
|
||||
}
|
||||
|
||||
fn raw(self) -> u8 {
|
||||
self as u8
|
||||
}
|
||||
}
|
||||
|
||||
// An alternate account structure from ::account::Account.
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub struct Account {
|
||||
@ -58,7 +86,7 @@ impl Account {
|
||||
|
||||
// walk the account's storage trie, returning an RLP item containing the
|
||||
// account properties and the storage.
|
||||
pub fn to_fat_rlp(&self, acct_db: &AccountDB) -> Result<Bytes, Error> {
|
||||
pub fn to_fat_rlp(&self, acct_db: &AccountDB, used_code: &mut HashSet<H256>) -> Result<Bytes, Error> {
|
||||
let db = try!(TrieDB::new(acct_db, &self.storage_root));
|
||||
|
||||
let mut pairs = Vec::new();
|
||||
@ -81,11 +109,14 @@ impl Account {
|
||||
|
||||
// [has_code, code_hash].
|
||||
if self.code_hash == SHA3_EMPTY {
|
||||
account_stream.append(&false).append_empty_data();
|
||||
account_stream.append(&CodeState::Empty.raw()).append_empty_data();
|
||||
} else if used_code.contains(&self.code_hash) {
|
||||
account_stream.append(&CodeState::Hash.raw()).append(&self.code_hash);
|
||||
} else {
|
||||
match acct_db.get(&self.code_hash) {
|
||||
Some(c) => {
|
||||
account_stream.append(&true).append(&c);
|
||||
used_code.insert(self.code_hash.clone());
|
||||
account_stream.append(&CodeState::Inline.raw()).append(&c);
|
||||
}
|
||||
None => {
|
||||
warn!("code lookup failed during snapshot");
|
||||
@ -100,16 +131,39 @@ impl Account {
|
||||
}
|
||||
|
||||
// decode a fat rlp, and rebuild the storage trie as we go.
|
||||
pub fn from_fat_rlp(acct_db: &mut AccountDBMut, rlp: UntrustedRlp) -> Result<Self, Error> {
|
||||
// returns the account structure along with its newly recovered code,
|
||||
// if it exists.
|
||||
pub fn from_fat_rlp(
|
||||
acct_db: &mut AccountDBMut,
|
||||
rlp: UntrustedRlp,
|
||||
code_map: &HashMap<H256, Bytes>,
|
||||
) -> Result<(Self, Option<Bytes>), Error> {
|
||||
use util::{TrieDBMut, TrieMut};
|
||||
|
||||
let nonce = try!(rlp.val_at(0));
|
||||
let balance = try!(rlp.val_at(1));
|
||||
let code_hash = if try!(rlp.val_at(2)) {
|
||||
let code_state: CodeState = {
|
||||
let raw: u8 = try!(rlp.val_at(2));
|
||||
try!(CodeState::from(raw))
|
||||
};
|
||||
|
||||
// load the code if it exists.
|
||||
let (code_hash, new_code) = match code_state {
|
||||
CodeState::Empty => (SHA3_EMPTY, None),
|
||||
CodeState::Inline => {
|
||||
let code: Bytes = try!(rlp.val_at(3));
|
||||
acct_db.insert(&code)
|
||||
} else {
|
||||
SHA3_EMPTY
|
||||
let code_hash = acct_db.insert(&code);
|
||||
|
||||
(code_hash, Some(code))
|
||||
}
|
||||
CodeState::Hash => {
|
||||
let code_hash = try!(rlp.val_at(3));
|
||||
if let Some(code) = code_map.get(&code_hash) {
|
||||
acct_db.emplace(code_hash.clone(), code.clone());
|
||||
}
|
||||
|
||||
(code_hash, None)
|
||||
}
|
||||
};
|
||||
|
||||
let mut storage_root = H256::zero();
|
||||
@ -124,12 +178,20 @@ impl Account {
|
||||
try!(storage_trie.insert(&k, &v));
|
||||
}
|
||||
}
|
||||
Ok(Account {
|
||||
|
||||
let acc = Account {
|
||||
nonce: nonce,
|
||||
balance: balance,
|
||||
storage_root: storage_root,
|
||||
code_hash: code_hash,
|
||||
})
|
||||
};
|
||||
|
||||
Ok((acc, new_code))
|
||||
}
|
||||
|
||||
/// Get the account's code hash.
|
||||
pub fn code_hash(&self) -> &H256 {
|
||||
&self.code_hash
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -145,9 +207,11 @@ mod tests {
|
||||
use snapshot::tests::helpers::fill_storage;
|
||||
|
||||
use util::{SHA3_NULL_RLP, SHA3_EMPTY};
|
||||
use util::{Address, FixedHash, H256};
|
||||
use util::{Address, FixedHash, H256, HashDB};
|
||||
use util::rlp::{UntrustedRlp, View};
|
||||
|
||||
use std::collections::{HashSet, HashMap};
|
||||
|
||||
use super::Account;
|
||||
|
||||
#[test]
|
||||
@ -166,9 +230,9 @@ mod tests {
|
||||
let thin_rlp = account.to_thin_rlp();
|
||||
assert_eq!(Account::from_thin_rlp(&thin_rlp), account);
|
||||
|
||||
let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap();
|
||||
let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr), &mut Default::default()).unwrap();
|
||||
let fat_rlp = UntrustedRlp::new(&fat_rlp);
|
||||
assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account);
|
||||
assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, &Default::default()).unwrap().0, account);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -192,8 +256,59 @@ mod tests {
|
||||
let thin_rlp = account.to_thin_rlp();
|
||||
assert_eq!(Account::from_thin_rlp(&thin_rlp), account);
|
||||
|
||||
let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr)).unwrap();
|
||||
let fat_rlp = account.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr), &mut Default::default()).unwrap();
|
||||
let fat_rlp = UntrustedRlp::new(&fat_rlp);
|
||||
assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp).unwrap(), account);
|
||||
assert_eq!(Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr), fat_rlp, &Default::default()).unwrap().0, account);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encoding_code() {
|
||||
let mut db = get_temp_journal_db();
|
||||
let mut db = &mut **db;
|
||||
|
||||
let addr1 = Address::random();
|
||||
let addr2 = Address::random();
|
||||
|
||||
let code_hash = {
|
||||
let mut acct_db = AccountDBMut::new(db.as_hashdb_mut(), &addr1);
|
||||
acct_db.insert(b"this is definitely code")
|
||||
};
|
||||
|
||||
{
|
||||
let mut acct_db = AccountDBMut::new(db.as_hashdb_mut(), &addr2);
|
||||
acct_db.emplace(code_hash.clone(), b"this is definitely code".to_vec());
|
||||
}
|
||||
|
||||
let account1 = Account {
|
||||
nonce: 50.into(),
|
||||
balance: 123456789.into(),
|
||||
storage_root: SHA3_NULL_RLP,
|
||||
code_hash: code_hash,
|
||||
};
|
||||
|
||||
let account2 = Account {
|
||||
nonce: 400.into(),
|
||||
balance: 98765432123456789usize.into(),
|
||||
storage_root: SHA3_NULL_RLP,
|
||||
code_hash: code_hash,
|
||||
};
|
||||
|
||||
let mut used_code = HashSet::new();
|
||||
|
||||
let fat_rlp1 = account1.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr1), &mut used_code).unwrap();
|
||||
let fat_rlp2 = account2.to_fat_rlp(&AccountDB::new(db.as_hashdb(), &addr2), &mut used_code).unwrap();
|
||||
assert_eq!(used_code.len(), 1);
|
||||
|
||||
let fat_rlp1 = UntrustedRlp::new(&fat_rlp1);
|
||||
let fat_rlp2 = UntrustedRlp::new(&fat_rlp2);
|
||||
|
||||
let code_map = HashMap::new();
|
||||
let (acc, maybe_code) = Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr2), fat_rlp2, &code_map).unwrap();
|
||||
assert!(maybe_code.is_none());
|
||||
assert_eq!(acc, account2);
|
||||
|
||||
let (acc, maybe_code) = Account::from_fat_rlp(&mut AccountDBMut::new(db.as_hashdb_mut(), &addr1), fat_rlp1, &code_map).unwrap();
|
||||
assert_eq!(maybe_code, Some(b"this is definitely code".to_vec()));
|
||||
assert_eq!(acc, account1);
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ use header::Header;
|
||||
|
||||
use views::BlockView;
|
||||
use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View};
|
||||
use util::rlp::{Compressible, RlpType};
|
||||
use util::{Bytes, Hashable, H256};
|
||||
|
||||
const HEADER_FIELDS: usize = 10;
|
||||
@ -31,10 +32,10 @@ pub struct AbridgedBlock {
|
||||
}
|
||||
|
||||
impl AbridgedBlock {
|
||||
/// Create from a vector of bytes. Does no verification.
|
||||
pub fn from_raw(rlp: Bytes) -> Self {
|
||||
/// Create from rlp-compressed bytes. Does no verification.
|
||||
pub fn from_raw(compressed: Bytes) -> Self {
|
||||
AbridgedBlock {
|
||||
rlp: rlp,
|
||||
rlp: compressed,
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,7 +79,7 @@ impl AbridgedBlock {
|
||||
}
|
||||
|
||||
AbridgedBlock {
|
||||
rlp: stream.out(),
|
||||
rlp: UntrustedRlp::new(stream.as_raw()).compress(RlpType::Blocks).to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,7 +87,8 @@ impl AbridgedBlock {
|
||||
///
|
||||
/// Will fail if contains invalid rlp.
|
||||
pub fn to_block(&self, parent_hash: H256, number: u64) -> Result<Block, DecoderError> {
|
||||
let rlp = UntrustedRlp::new(&self.rlp);
|
||||
let rlp = UntrustedRlp::new(&self.rlp).decompress(RlpType::Blocks);
|
||||
let rlp = UntrustedRlp::new(&rlp);
|
||||
|
||||
let mut header = Header {
|
||||
parent_hash: parent_hash,
|
||||
|
@ -35,6 +35,10 @@ pub enum Error {
|
||||
IncompleteChain,
|
||||
/// Old starting block in a pruned database.
|
||||
OldBlockPrunedDB,
|
||||
/// Missing code.
|
||||
MissingCode(Vec<H256>),
|
||||
/// Unrecognized code encoding.
|
||||
UnrecognizedCodeState(u8),
|
||||
/// Trie error.
|
||||
Trie(TrieError),
|
||||
/// Decoder error.
|
||||
@ -51,6 +55,8 @@ impl fmt::Display for Error {
|
||||
Error::IncompleteChain => write!(f, "Cannot create snapshot due to incomplete chain."),
|
||||
Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \
|
||||
a pruned database. Please re-run with the --pruning archive flag."),
|
||||
Error::MissingCode(ref missing) => write!(f, "Incomplete snapshot: {} contract codes not found.", missing.len()),
|
||||
Error::UnrecognizedCodeState(state) => write!(f, "Unrecognized code encoding ({})", state),
|
||||
Error::Io(ref err) => err.fmt(f),
|
||||
Error::Decoder(ref err) => err.fmt(f),
|
||||
Error::Trie(ref err) => err.fmt(f),
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
//! Snapshot creation, restoration, and network service.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
|
||||
@ -27,6 +27,7 @@ use ids::BlockID;
|
||||
use views::BlockView;
|
||||
|
||||
use util::{Bytes, Hashable, HashDB, snappy};
|
||||
use util::memorydb::MemoryDB;
|
||||
use util::Mutex;
|
||||
use util::hash::{FixedHash, H256};
|
||||
use util::journaldb::{self, Algorithm, JournalDB};
|
||||
@ -332,6 +333,8 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter +
|
||||
progress: progress,
|
||||
};
|
||||
|
||||
let mut used_code = HashSet::new();
|
||||
|
||||
// account_key here is the address' hash.
|
||||
for (account_key, account_data) in account_trie.iter() {
|
||||
let account = Account::from_thin_rlp(account_data);
|
||||
@ -339,7 +342,7 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter +
|
||||
|
||||
let account_db = AccountDB::from_hash(db, account_key_hash);
|
||||
|
||||
let fat_rlp = try!(account.to_fat_rlp(&account_db));
|
||||
let fat_rlp = try!(account.to_fat_rlp(&account_db, &mut used_code));
|
||||
let compressed_rlp = UntrustedRlp::new(&fat_rlp).compress(RlpType::Snapshot).to_vec();
|
||||
try!(chunker.push(account_key, compressed_rlp));
|
||||
}
|
||||
@ -403,6 +406,8 @@ impl ManifestData {
|
||||
pub struct StateRebuilder {
|
||||
db: Box<JournalDB>,
|
||||
state_root: H256,
|
||||
code_map: HashMap<H256, Bytes>, // maps code hashes to code itself.
|
||||
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
|
||||
}
|
||||
|
||||
impl StateRebuilder {
|
||||
@ -411,6 +416,8 @@ impl StateRebuilder {
|
||||
StateRebuilder {
|
||||
db: journaldb::new(db.clone(), pruning, ::db::COL_STATE),
|
||||
state_root: SHA3_NULL_RLP,
|
||||
code_map: HashMap::new(),
|
||||
missing_code: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -419,41 +426,57 @@ impl StateRebuilder {
|
||||
let rlp = UntrustedRlp::new(chunk);
|
||||
let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect();
|
||||
let mut pairs = Vec::with_capacity(rlp.item_count());
|
||||
let backing = self.db.backing().clone();
|
||||
|
||||
// initialize the pairs vector with empty values so we have slots to write into.
|
||||
pairs.resize(rlp.item_count(), (H256::new(), Vec::new()));
|
||||
|
||||
let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1;
|
||||
|
||||
// new code contained within this chunk.
|
||||
let mut chunk_code = HashMap::new();
|
||||
|
||||
// build account tries in parallel.
|
||||
// Todo [rob] keep a thread pool around so we don't do this per-chunk.
|
||||
try!(scope(|scope| {
|
||||
let mut handles = Vec::new();
|
||||
for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) {
|
||||
let mut db = self.db.boxed_clone();
|
||||
let handle: ScopedJoinHandle<Result<Box<JournalDB>, ::error::Error>> = scope.spawn(move || {
|
||||
try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk));
|
||||
let code_map = &self.code_map;
|
||||
let handle: ScopedJoinHandle<Result<_, ::error::Error>> = scope.spawn(move || {
|
||||
let mut db = MemoryDB::new();
|
||||
let status = try!(rebuild_accounts(&mut db, account_chunk, out_pairs_chunk, code_map));
|
||||
|
||||
trace!(target: "snapshot", "thread rebuilt {} account tries", account_chunk.len());
|
||||
Ok(db)
|
||||
Ok((db, status))
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// commit all account tries to the db, but only in this thread.
|
||||
let batch = backing.transaction();
|
||||
// consolidate all edits into the main overlay.
|
||||
for handle in handles {
|
||||
let mut thread_db = try!(handle.join());
|
||||
try!(thread_db.inject(&batch));
|
||||
}
|
||||
try!(backing.write(batch).map_err(::util::UtilError::SimpleString));
|
||||
let (thread_db, status): (MemoryDB, _) = try!(handle.join());
|
||||
self.db.consolidate(thread_db);
|
||||
|
||||
chunk_code.extend(status.new_code);
|
||||
|
||||
for (addr_hash, code_hash) in status.missing_code {
|
||||
self.missing_code.entry(code_hash).or_insert_with(Vec::new).push(addr_hash);
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, ::error::Error>(())
|
||||
}));
|
||||
|
||||
// patch up all missing code. must be done after collecting all new missing code entries.
|
||||
for (code_hash, code) in chunk_code {
|
||||
for addr_hash in self.missing_code.remove(&code_hash).unwrap_or_else(Vec::new) {
|
||||
let mut db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), addr_hash);
|
||||
db.emplace(code_hash, code.clone());
|
||||
}
|
||||
|
||||
self.code_map.insert(code_hash, code);
|
||||
}
|
||||
|
||||
|
||||
// batch trie writes
|
||||
{
|
||||
@ -468,6 +491,7 @@ impl StateRebuilder {
|
||||
}
|
||||
}
|
||||
|
||||
let backing = self.db.backing().clone();
|
||||
let batch = backing.transaction();
|
||||
try!(self.db.inject(&batch));
|
||||
try!(backing.write(batch).map_err(::util::UtilError::SimpleString));
|
||||
@ -475,11 +499,36 @@ impl StateRebuilder {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check for accounts missing code. Once all chunks have been fed, there should
|
||||
/// be none.
|
||||
pub fn check_missing(&self) -> Result<(), Error> {
|
||||
let missing = self.missing_code.keys().cloned().collect::<Vec<_>>();
|
||||
match missing.is_empty() {
|
||||
true => Ok(()),
|
||||
false => Err(Error::MissingCode(missing)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the state root of the rebuilder.
|
||||
pub fn state_root(&self) -> H256 { self.state_root }
|
||||
}
|
||||
|
||||
fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), ::error::Error> {
|
||||
#[derive(Default)]
|
||||
struct RebuiltStatus {
|
||||
new_code: Vec<(H256, Bytes)>, // new code that's become available.
|
||||
missing_code: Vec<(H256, H256)>, // accounts that are missing code.
|
||||
}
|
||||
|
||||
// rebuild a set of accounts and their storage.
|
||||
// returns
|
||||
fn rebuild_accounts(
|
||||
db: &mut HashDB,
|
||||
account_chunk: &[&[u8]],
|
||||
out_chunk: &mut [(H256, Bytes)],
|
||||
code_map: &HashMap<H256, Bytes>
|
||||
) -> Result<RebuiltStatus, ::error::Error>
|
||||
{
|
||||
let mut status = RebuiltStatus::default();
|
||||
for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) {
|
||||
let account_rlp = UntrustedRlp::new(account_pair);
|
||||
|
||||
@ -491,14 +540,24 @@ fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mu
|
||||
let mut acct_db = AccountDBMut::from_hash(db, hash);
|
||||
|
||||
// fill out the storage trie and code while decoding.
|
||||
let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp));
|
||||
let (acc, maybe_code) = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp, code_map));
|
||||
|
||||
let code_hash = acc.code_hash().clone();
|
||||
match maybe_code {
|
||||
Some(code) => status.new_code.push((code_hash, code)),
|
||||
None => {
|
||||
if code_hash != ::util::SHA3_EMPTY && !code_map.contains_key(&code_hash) {
|
||||
status.missing_code.push((hash, code_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
acc.to_thin_rlp()
|
||||
};
|
||||
|
||||
*out = (hash, thin_rlp);
|
||||
}
|
||||
Ok(())
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
/// Proportion of blocks which we will verify `PoW` for.
|
||||
|
@ -125,6 +125,8 @@ impl Restoration {
|
||||
try!(self.state.feed(&self.snappy_buffer[..len]));
|
||||
|
||||
if self.state_chunks_left.is_empty() {
|
||||
try!(self.state.check_missing());
|
||||
|
||||
let root = self.state.state_root();
|
||||
if root != self.final_state_root {
|
||||
warn!("Final restored state has wrong state root: expected {:?}, got {:?}", root, self.final_state_root);
|
||||
|
@ -72,6 +72,7 @@ fn snap_and_restore() {
|
||||
rebuilder.feed(&chunk).unwrap();
|
||||
}
|
||||
|
||||
rebuilder.check_missing().unwrap();
|
||||
assert_eq!(rebuilder.state_root(), state_root);
|
||||
new_db
|
||||
};
|
||||
|
@ -108,6 +108,7 @@ impl SnapshotCommand {
|
||||
let (service, _panic_handler) = try!(self.start_service());
|
||||
|
||||
warn!("Snapshot restoration is experimental and the format may be subject to change.");
|
||||
warn!("On encountering an unexpected error, please ensure that you have a recent snapshot.");
|
||||
|
||||
let snapshot = service.snapshot_service();
|
||||
let reader = PackedReader::new(Path::new(&file))
|
||||
|
@ -228,6 +228,10 @@ impl JournalDB for ArchiveDB {
|
||||
fn backing(&self) -> &Arc<Database> {
|
||||
&self.backing
|
||||
}
|
||||
|
||||
fn consolidate(&mut self, with: MemoryDB) {
|
||||
self.overlay.consolidate(with);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -539,6 +539,10 @@ impl JournalDB for EarlyMergeDB {
|
||||
|
||||
Ok(ops)
|
||||
}
|
||||
|
||||
fn consolidate(&mut self, with: MemoryDB) {
|
||||
self.overlay.consolidate(with);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -339,6 +339,10 @@ impl JournalDB for OverlayRecentDB {
|
||||
|
||||
Ok(ops)
|
||||
}
|
||||
|
||||
fn consolidate(&mut self, with: MemoryDB) {
|
||||
self.transaction_overlay.consolidate(with);
|
||||
}
|
||||
}
|
||||
|
||||
impl HashDB for OverlayRecentDB {
|
||||
|
@ -20,6 +20,7 @@ use common::*;
|
||||
use rlp::*;
|
||||
use hashdb::*;
|
||||
use overlaydb::OverlayDB;
|
||||
use memorydb::MemoryDB;
|
||||
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
|
||||
use super::traits::JournalDB;
|
||||
use kvdb::{Database, DBTransaction};
|
||||
@ -192,6 +193,18 @@ impl JournalDB for RefCountedDB {
|
||||
}
|
||||
self.forward.commit_to_batch(batch)
|
||||
}
|
||||
|
||||
fn consolidate(&mut self, mut with: MemoryDB) {
|
||||
for (key, (value, rc)) in with.drain() {
|
||||
for _ in 0..rc {
|
||||
self.emplace(key.clone(), value.clone());
|
||||
}
|
||||
|
||||
for _ in rc..0 {
|
||||
self.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -61,6 +61,9 @@ pub trait JournalDB: HashDB {
|
||||
/// to the backing strage
|
||||
fn flush(&self) {}
|
||||
|
||||
/// Consolidate all the insertions and deletions in the given memory overlay.
|
||||
fn consolidate(&mut self, overlay: ::memorydb::MemoryDB);
|
||||
|
||||
/// Commit all changes in a single batch
|
||||
#[cfg(test)]
|
||||
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||
|
@ -174,6 +174,24 @@ impl MemoryDB {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Consolidate all the entries of `other` into `self`.
|
||||
pub fn consolidate(&mut self, mut other: Self) {
|
||||
for (key, (value, rc)) in other.drain() {
|
||||
match self.data.entry(key) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if entry.get().1 < 0 {
|
||||
entry.get_mut().0 = value;
|
||||
}
|
||||
|
||||
entry.get_mut().1 += rc;
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert((value, rc));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static NULL_RLP_STATIC: [u8; 1] = [0x80; 1];
|
||||
@ -310,3 +328,21 @@ fn memorydb_remove_and_purge() {
|
||||
m.remove_and_purge(&hello_key);
|
||||
assert_eq!(m.raw(&hello_key), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn consolidate() {
|
||||
let mut main = MemoryDB::new();
|
||||
let mut other = MemoryDB::new();
|
||||
let remove_key = other.insert(b"doggo");
|
||||
main.remove(&remove_key);
|
||||
|
||||
let insert_key = other.insert(b"arf");
|
||||
main.emplace(insert_key, b"arf".to_vec());
|
||||
|
||||
main.consolidate(other);
|
||||
|
||||
let overlay = main.drain();
|
||||
|
||||
assert_eq!(overlay.get(&remove_key).unwrap(), &(b"doggo".to_vec(), 0));
|
||||
assert_eq!(overlay.get(&insert_key).unwrap(), &(b"arf".to_vec(), 2));
|
||||
}
|
Loading…
Reference in New Issue
Block a user