Merge pull request #2329 from ethcore/journaldb_commit

Split journaldb commit into two functions: journal_under and mark_canonical
This commit is contained in:
Robert Habermeier 2016-10-14 13:57:17 +02:00 committed by GitHub
commit 19e6cbe0b2
7 changed files with 302 additions and 268 deletions

View File

@ -173,7 +173,7 @@ impl Client {
let mut state_db = StateDB::new(journal_db, config.state_cache_size);
if state_db.journal_db().is_empty() && try!(spec.ensure_db_good(&mut state_db)) {
let mut batch = DBTransaction::new(&db);
try!(state_db.commit(&mut batch, 0, &spec.genesis_header().hash(), None));
try!(state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash()));
try!(db.write(batch).map_err(ClientError::Database));
}
@ -414,13 +414,6 @@ impl Client {
let number = block.header().number();
let parent = block.header().parent_hash().clone();
let chain = self.chain.read();
// Are we committing an era?
let ancient = if number >= HISTORY {
let n = number - HISTORY;
Some((n, chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed")))
} else {
None
};
// Commit results
let receipts = block.receipts().to_owned();
@ -436,7 +429,13 @@ impl Client {
// already-imported block of the same number.
// TODO: Prove it with a test.
let mut state = block.drain();
state.commit(&mut batch, number, hash, ancient).expect("DB commit failed.");
state.journal_under(&mut batch, number, hash).expect("DB commit failed");
if number >= HISTORY {
let n = number - HISTORY;
state.mark_canonical(&mut batch, n, &chain.block_hash(n).unwrap()).expect("DB commit failed");
}
let route = chain.insert_block(&mut batch, block_data, receipts);
self.tracedb.read().import(&mut batch, TraceImportRequest {
@ -446,6 +445,7 @@ impl Client {
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
let is_canon = route.enacted.last().map_or(false, |h| h == hash);
state.sync_cache(&route.enacted, &route.retracted, is_canon);
// Final commit to the DB

View File

@ -182,19 +182,24 @@ impl StateDB {
Ok(())
}
/// Commit all recent insert operations and canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
pub fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
/// Journal all recent operations under the given era and ID.
pub fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
{
let mut bloom_lock = self.account_bloom.lock();
try!(Self::commit_bloom(batch, bloom_lock.drain_journal()));
}
let records = try!(self.db.commit(batch, now, id, end));
let records = try!(self.db.journal_under(batch, now, id));
self.commit_hash = Some(id.clone());
self.commit_number = Some(now);
Ok(records)
}
/// Mark a given candidate from an ancient era as canonical, enacting its removals from the
/// backing database and reverting any non-canonical historical commit's insertions.
pub fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
self.db.mark_canonical(batch, end_era, canon_id)
}
/// Propagate local cache into the global cache and synchonize
/// the global cache with the best block state.
/// This function updates the global cache by removing entries
@ -448,30 +453,30 @@ mod tests {
// balance [ 5 5 4 3 2 2 ]
let mut s = state_db.boxed_clone_canon(&root_parent);
s.add_to_account_cache(address, Some(Account::new_basic(2.into(), 0.into())), false);
s.commit(&mut batch, 0, &h0, None).unwrap();
s.journal_under(&mut batch, 0, &h0).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h0);
s.commit(&mut batch, 1, &h1a, None).unwrap();
s.journal_under(&mut batch, 1, &h1a).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h0);
s.add_to_account_cache(address, Some(Account::new_basic(3.into(), 0.into())), true);
s.commit(&mut batch, 1, &h1b, None).unwrap();
s.journal_under(&mut batch, 1, &h1b).unwrap();
s.sync_cache(&[], &[], false);
let mut s = state_db.boxed_clone_canon(&h1b);
s.add_to_account_cache(address, Some(Account::new_basic(4.into(), 0.into())), true);
s.commit(&mut batch, 2, &h2b, None).unwrap();
s.journal_under(&mut batch, 2, &h2b).unwrap();
s.sync_cache(&[], &[], false);
let mut s = state_db.boxed_clone_canon(&h1a);
s.add_to_account_cache(address, Some(Account::new_basic(5.into(), 0.into())), true);
s.commit(&mut batch, 2, &h2a, None).unwrap();
s.journal_under(&mut batch, 2, &h2a).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h2a);
s.commit(&mut batch, 3, &h3a, None).unwrap();
s.journal_under(&mut batch, 3, &h3a).unwrap();
s.sync_cache(&[], &[], true);
let s = state_db.boxed_clone_canon(&h3a);
@ -489,7 +494,7 @@ mod tests {
// reorg to 3b
// blocks [ 3b(c) 3a 2a 2b(c) 1b 1a 0 ]
let mut s = state_db.boxed_clone_canon(&h2b);
s.commit(&mut batch, 3, &h3b, None).unwrap();
s.journal_under(&mut batch, 3, &h3b).unwrap();
s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], true);
let s = state_db.boxed_clone_canon(&h3a);
assert!(s.get_cached_account(&address).is_none());

View File

@ -35,8 +35,8 @@ const AUX_FLAG: u8 = 255;
///
/// Like `OverlayDB`, there is a memory overlay; `commit()` must be called in order to
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect.
/// immediately. As this is an "archive" database, nothing is ever removed. This means
/// that the states of any block the node has ever processed will be accessible.
pub struct ArchiveDB {
overlay: MemoryDB,
backing: Arc<Database>,
@ -156,7 +156,7 @@ impl JournalDB for ArchiveDB {
self.latest_era.is_none()
}
fn commit(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256, _end: Option<(u64, H256)>) -> Result<u32, UtilError> {
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;
@ -184,6 +184,11 @@ impl JournalDB for ArchiveDB {
Ok((inserts + deletes) as u32)
}
fn mark_canonical(&mut self, _batch: &mut DBTransaction, _end_era: u64, _canon_id: &H256) -> Result<u32, UtilError> {
// keep everything! it's an archive, after all.
Ok(0)
}
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;

View File

@ -61,6 +61,49 @@ enum RemoveFrom {
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect.
///
/// journal format:
/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, n] => [ ... ]
///
/// When we make a new commit, we make a journal of all blocks in the recent history and record
/// all keys that were inserted and deleted. The journal is ordered by era; multiple commits can
/// share the same era. This forms a data structure similar to a queue but whose items are tuples.
/// By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history
/// into ancient history) then only one commit from the tuple is considered canonical. This commit
/// is kept in the main backing database, whereas any others from the same era are reverted.
///
/// It is possible that a key, properly available in the backing database be deleted and re-inserted
/// in the recent history queue, yet have both operations in commits that are eventually non-canonical.
/// To avoid the original, and still required, key from being deleted, we maintain a reference count
/// which includes an original key, if any.
///
/// The semantics of the `counter` are:
/// insert key k:
/// counter already contains k: count += 1
/// counter doesn't contain k:
/// backing db contains k: count = 1
/// backing db doesn't contain k: insert into backing db, count = 0
/// delete key k:
/// counter contains k (count is asserted to be non-zero):
/// count > 1: counter -= 1
/// count == 1: remove counter
/// count == 0: remove key from backing db
/// counter doesn't contain k: remove key from backing db
///
/// Practically, this means that for each commit block turning from recent to ancient we do the
/// following:
/// is_canonical:
/// inserts: Ignored (left alone in the backing database).
/// deletes: Enacted; however, recent history queue is checked for ongoing references. This is
/// reduced as a preference to deletion from the backing database.
/// !is_canonical:
/// inserts: Reverted; however, recent history queue is checked for ongoing references. This is
/// reduced as a preference to deletion from the backing database.
/// deletes: Ignored (they were never inserted).
///
/// TODO: store_reclaim_period
pub struct EarlyMergeDB {
overlay: MemoryDB,
backing: Arc<Database>,
@ -336,55 +379,12 @@ impl JournalDB for EarlyMergeDB {
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
}
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, n] => [ ... ]
// TODO: store reclaim_period.
// When we make a new commit, we make a journal of all blocks in the recent history and record
// all keys that were inserted and deleted. The journal is ordered by era; multiple commits can
// share the same era. This forms a data structure similar to a queue but whose items are tuples.
// By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history
// into ancient history) then only one commit from the tuple is considered canonical. This commit
// is kept in the main backing database, whereas any others from the same era are reverted.
//
// It is possible that a key, properly available in the backing database be deleted and re-inserted
// in the recent history queue, yet have both operations in commits that are eventually non-canonical.
// To avoid the original, and still required, key from being deleted, we maintain a reference count
// which includes an original key, if any.
//
// The semantics of the `counter` are:
// insert key k:
// counter already contains k: count += 1
// counter doesn't contain k:
// backing db contains k: count = 1
// backing db doesn't contain k: insert into backing db, count = 0
// delete key k:
// counter contains k (count is asserted to be non-zero):
// count > 1: counter -= 1
// count == 1: remove counter
// count == 0: remove key from backing db
// counter doesn't contain k: remove key from backing db
//
// Practically, this means that for each commit block turning from recent to ancient we do the
// following:
// is_canonical:
// inserts: Ignored (left alone in the backing database).
// deletes: Enacted; however, recent history queue is checked for ongoing references. This is
// reduced as a preference to deletion from the backing database.
// !is_canonical:
// inserts: Reverted; however, recent history queue is checked for ongoing references. This is
// reduced as a preference to deletion from the backing database.
// deletes: Ignored (they were never inserted).
//
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
let trace = false;
// record new commit's details.
let mut refs = self.refs.as_ref().unwrap().write();
let trace = false;
{
let mut index = 0usize;
let mut last;
@ -403,7 +403,7 @@ impl JournalDB for EarlyMergeDB {
let drained = self.overlay.drain();
if trace {
trace!(target: "jdb", "commit: #{} ({}), end era: {:?}", now, id, end);
trace!(target: "jdb", "commit: #{} ({})", now, id);
}
let removes: Vec<H256> = drained
@ -431,85 +431,86 @@ impl JournalDB for EarlyMergeDB {
inserts.iter().foreach(|&(k, _)| {r.append(&k);});
r.append(&removes);
Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace);
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
if trace {
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
trace!(target: "jdb.ops", " Inserts: {:?}", ins);
trace!(target: "jdb.ops", " Deletes: {:?}", removes);
trace!(target: "jdb.ops", " Inserts: {:?}", ins);
}
batch.put(self.column, &last, r.as_raw());
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
Ok((ins.len() + removes.len()) as u32)
}
}
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
let trace = false;
let mut refs = self.refs.as_ref().unwrap().write();
// apply old commits' details
if let Some((end_era, canon_id)) = end {
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})) {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
let mut index = 0usize;
let mut last;
if canon_id == rlp.val_at(0) {
// Collect keys to be removed. Canon block - remove the (enacted) deletes.
let deletes: Vec<H256> = rlp.val_at(2);
if trace {
trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
}
Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive, trace);
while let Some(rlp_data) = try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})) {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
if trace {
trace!(target: "jdb.ops", " Finalising: {:?}", inserts);
}
for k in &inserts {
match refs.get(k).cloned() {
None => {
// [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert
// already expunged from the queue (which is allowed since the key is in the archive).
// leave well alone.
}
Some( RefInfo{queue_refs: 1, in_archive: false} ) => {
// just delete the refs entry.
refs.remove(k);
}
Some( RefInfo{queue_refs: x, in_archive: false} ) => {
// must set already in; ,
Self::set_already_in(batch, self.column, k);
refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true });
}
Some( RefInfo{in_archive: true, ..} ) => {
// Invalid! Reinserted the same key twice.
warn!("Key {} inserted twice into same fork.", k);
}
if canon_id == &rlp.val_at::<H256>(0) {
// Collect keys to be removed. Canon block - remove the (enacted) deletes.
let deletes: Vec<H256> = rlp.val_at(2);
trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive, trace);
trace!(target: "jdb.ops", " Finalising: {:?}", inserts);
for k in &inserts {
match refs.get(k).cloned() {
None => {
// [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert
// already expunged from the queue (which is allowed since the key is in the archive).
// leave well alone.
}
Some( RefInfo{queue_refs: 1, in_archive: false} ) => {
// just delete the refs entry.
refs.remove(k);
}
Some( RefInfo{queue_refs: x, in_archive: false} ) => {
// must set already in; ,
Self::set_already_in(batch, self.column, k);
refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true });
}
Some( RefInfo{in_archive: true, ..} ) => {
// Invalid! Reinserted the same key twice.
warn!("Key {} inserted twice into same fork.", k);
}
}
} else {
// Collect keys to be removed. Non-canon block - remove the (reverted) inserts.
if trace {
trace!(target: "jdb.ops", " Reverting: {:?}", inserts);
}
Self::remove_keys(&inserts, &mut refs, batch, self.column, RemoveFrom::Queue, trace);
}
} else {
// Collect keys to be removed. Non-canon block - remove the (reverted) inserts.
trace!(target: "jdb.ops", " Reverting: {:?}", inserts);
Self::remove_keys(&inserts, &mut refs, batch, self.column, RemoveFrom::Queue, trace);
}
batch.delete(self.column, &last);
index += 1;
}
if trace {
trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
}
batch.delete(self.column, &last);
index += 1;
}
if trace {
trace!(target: "jdb", "OK: {:?}", refs.clone());
}
trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
trace!(target: "jdb", "OK: {:?}", refs.clone());
Ok(0)
}

View File

@ -222,92 +222,107 @@ impl JournalDB for OverlayRecentDB {
.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
}
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// record new commit's details.
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
trace!(target: "journaldb", "entry: #{} ({})", now, id);
let mut journal_overlay = self.journal_overlay.write();
// flush previous changes
journal_overlay.pending_overlay.clear();
{
let mut r = RlpStream::new_list(3);
let mut tx = self.transaction_overlay.drain();
let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect();
let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect();
// Increase counter for each inserted key no matter if the block is canonical or not.
let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None });
r.append(id);
r.begin_list(inserted_keys.len());
for (k, v) in insertions {
r.begin_list(2);
r.append(&k);
r.append(&v);
journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
}
r.append(&removed_keys);
let mut k = RlpStream::new_list(3);
let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len());
k.append(&now);
k.append(&index);
k.append(&&PADDING[..]);
batch.put_vec(self.column, &k.drain(), r.out());
if journal_overlay.latest_era.map_or(true, |e| now > e) {
batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec());
journal_overlay.latest_era = Some(now);
}
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
let mut r = RlpStream::new_list(3);
let mut tx = self.transaction_overlay.drain();
let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect();
let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect();
let ops = inserted_keys.len() + removed_keys.len();
// Increase counter for each inserted key no matter if the block is canonical or not.
let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None });
r.append(id);
r.begin_list(inserted_keys.len());
for (k, v) in insertions {
r.begin_list(2);
r.append(&k);
r.append(&v);
journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
}
r.append(&removed_keys);
let mut k = RlpStream::new_list(3);
let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len());
k.append(&now);
k.append(&index);
k.append(&&PADDING[..]);
batch.put_vec(self.column, &k.drain(), r.out());
if journal_overlay.latest_era.map_or(true, |e| now > e) {
batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec());
journal_overlay.latest_era = Some(now);
}
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
Ok(ops as u32)
}
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
trace!(target: "journaldb", "canonical: #{} ({})", end_era, canon_id);
let mut journal_overlay = self.journal_overlay.write();
let journal_overlay = &mut *journal_overlay;
let mut ops = 0;
// apply old commits' details
if let Some((end_era, canon_id)) = end {
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) {
let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new();
let mut canon_deletions: Vec<H256> = Vec::new();
let mut overlay_deletions: Vec<H256> = Vec::new();
let mut index = 0usize;
for mut journal in records.drain(..) {
//delete the record from the db
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
batch.delete(self.column, &r.drain());
trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
{
if canon_id == journal.id {
for h in &journal.insertions {
if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if rc > 0 {
canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy
}
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) {
let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new();
let mut canon_deletions: Vec<H256> = Vec::new();
let mut overlay_deletions: Vec<H256> = Vec::new();
let mut index = 0usize;
for mut journal in records.drain(..) {
//delete the record from the db
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
batch.delete(self.column, &r.drain());
trace!(target: "journaldb", "Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
{
if *canon_id == journal.id {
for h in &journal.insertions {
if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if rc > 0 {
canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy
}
}
canon_deletions = journal.deletions;
}
overlay_deletions.append(&mut journal.insertions);
canon_deletions = journal.deletions;
}
index += 1;
overlay_deletions.append(&mut journal.insertions);
}
// apply canon inserts first
for (k, v) in canon_insertions {
batch.put(self.column, &k, &v);
journal_overlay.pending_overlay.insert(to_short_key(&k), v);
}
// update the overlay
for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
}
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
batch.delete(self.column, &k);
}
index += 1;
}
ops += canon_insertions.len();
ops += canon_deletions.len();
// apply canon inserts first
for (k, v) in canon_insertions {
batch.put(self.column, &k, &v);
journal_overlay.pending_overlay.insert(to_short_key(&k), v);
}
// update the overlay
for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
}
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
batch.delete(self.column, &k);
}
}
journal_overlay.journal.remove(&end_era);
}
Ok(0)
journal_overlay.journal.remove(&end_era);
Ok(ops as u32)
}
fn flush(&self) {

View File

@ -34,6 +34,17 @@ use std::env;
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect.
///
/// journal format:
/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, n] => [ ... ]
///
/// when we make a new commit, we journal the inserts and removes.
/// for each end_era that we journaled that we are no passing by,
/// we remove all of its removes assuming it is canonical and all
/// of its inserts otherwise.
// TODO: store last_era, reclaim_period.
pub struct RefCountedDB {
forward: OverlayDB,
backing: Arc<Database>,
@ -109,77 +120,66 @@ impl JournalDB for RefCountedDB {
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
}
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, n] => [ ... ]
// TODO: store last_era, reclaim_period.
// when we make a new commit, we journal the inserts and removes.
// for each end_era that we journaled that we are no passing by,
// we remove all of its removes assuming it is canonical and all
// of its inserts otherwise.
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
// record new commit's details.
{
let mut index = 0usize;
let mut last;
let mut index = 0usize;
let mut last;
while try!(self.backing.get(self.column, {
while try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})).is_some() {
index += 1;
}
let mut r = RlpStream::new_list(3);
r.append(id);
r.append(&self.inserts);
r.append(&self.removes);
batch.put(self.column, &last, r.as_raw());
let ops = self.inserts.len() + self.removes.len();
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
self.inserts.clear();
self.removes.clear();
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
Ok(ops as u32)
}
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
// apply old commits' details
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = {
try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})).is_some() {
index += 1;
}
let mut r = RlpStream::new_list(3);
r.append(id);
r.append(&self.inserts);
r.append(&self.removes);
batch.put(self.column, &last, r.as_raw());
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
self.inserts.clear();
self.removes.clear();
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
}
// apply old commits' details
if let Some((end_era, canon_id)) = end {
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = {
// trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index);
try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
}))
} {
let rlp = Rlp::new(&rlp_data);
let our_id: H256 = rlp.val_at(0);
let to_remove: Vec<H256> = rlp.val_at(if canon_id == our_id {2} else {1});
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
for i in &to_remove {
self.forward.remove(i);
}
batch.delete(self.column, &last);
index += 1;
}))
} {
let rlp = Rlp::new(&rlp_data);
let our_id: H256 = rlp.val_at(0);
let to_remove: Vec<H256> = rlp.val_at(if *canon_id == our_id {2} else {1});
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
for i in &to_remove {
self.forward.remove(i);
}
batch.delete(self.column, &last);
index += 1;
}
let r = try!(self.forward.commit_to_batch(batch));

View File

@ -35,9 +35,12 @@ pub trait JournalDB: HashDB {
/// Get the latest era in the DB. None if there isn't yet any data in there.
fn latest_era(&self) -> Option<u64>;
/// Commit all recent insert operations and canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
/// Journal recent database operations as being associated with a given era and id.
// TODO: give the overlay to this function so journaldbs don't manage the overlays themeselves.
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError>;
/// Mark a given block as canonical, indicating that competing blocks' states may be pruned out.
fn mark_canonical(&mut self, batch: &mut DBTransaction, era: u64, id: &H256) -> Result<u32, UtilError>;
/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
@ -68,8 +71,13 @@ pub trait JournalDB: HashDB {
#[cfg(test)]
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let mut batch = self.backing().transaction();
let res = try!(self.commit(&mut batch, now, id, end));
let result = self.backing().write(batch).map(|_| res).map_err(Into::into);
let mut ops = try!(self.journal_under(&mut batch, now, id));
if let Some((end_era, canon_id)) = end {
ops += try!(self.mark_canonical(&mut batch, end_era, &canon_id));
}
let result = self.backing().write(batch).map(|_| ops).map_err(Into::into);
self.flush();
result
}