split journaldb commit into two functions: journal_under and mark_canonical

This commit is contained in:
Robert Habermeier 2016-09-26 17:14:44 +02:00
parent 92451ef268
commit 238b4962f0
5 changed files with 277 additions and 248 deletions

View File

@ -35,8 +35,8 @@ const AUX_FLAG: u8 = 255;
/// ///
/// Like `OverlayDB`, there is a memory overlay; `commit()` must be called in order to /// Like `OverlayDB`, there is a memory overlay; `commit()` must be called in order to
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect /// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before /// immediately. As this is an "archive" database, nothing is ever removed. This means
/// the removals actually take effect. /// that the states of any block the node has ever processed will be accessible.
pub struct ArchiveDB { pub struct ArchiveDB {
overlay: MemoryDB, overlay: MemoryDB,
backing: Arc<Database>, backing: Arc<Database>,
@ -156,7 +156,7 @@ impl JournalDB for ArchiveDB {
self.latest_era.is_none() self.latest_era.is_none()
} }
fn commit(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256, _end: Option<(u64, H256)>) -> Result<u32, UtilError> { fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256) -> Result<u32, UtilError> {
let mut inserts = 0usize; let mut inserts = 0usize;
let mut deletes = 0usize; let mut deletes = 0usize;
@ -184,6 +184,11 @@ impl JournalDB for ArchiveDB {
Ok((inserts + deletes) as u32) Ok((inserts + deletes) as u32)
} }
fn mark_canonical(&mut self, _batch: &mut DBTransaction, _end_era: u64, _canon_id: &H256) -> Result<u32, UtilError> {
// keep everything! it's an archive, after all.
Ok(0)
}
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> { fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
let mut inserts = 0usize; let mut inserts = 0usize;
let mut deletes = 0usize; let mut deletes = 0usize;

View File

@ -61,6 +61,49 @@ enum RemoveFrom {
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect /// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before /// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect. /// the removals actually take effect.
///
/// journal format:
/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, n] => [ ... ]
///
/// When we make a new commit, we make a journal of all blocks in the recent history and record
/// all keys that were inserted and deleted. The journal is ordered by era; multiple commits can
/// share the same era. This forms a data structure similar to a queue but whose items are tuples.
/// By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history
/// into ancient history) then only one commit from the tuple is considered canonical. This commit
/// is kept in the main backing database, whereas any others from the same era are reverted.
///
/// It is possible that a key, properly available in the backing database be deleted and re-inserted
/// in the recent history queue, yet have both operations in commits that are eventually non-canonical.
/// To avoid the original, and still required, key from being deleted, we maintain a reference count
/// which includes an original key, if any.
///
/// The semantics of the `counter` are:
/// insert key k:
/// counter already contains k: count += 1
/// counter doesn't contain k:
/// backing db contains k: count = 1
/// backing db doesn't contain k: insert into backing db, count = 0
/// delete key k:
/// counter contains k (count is asserted to be non-zero):
/// count > 1: counter -= 1
/// count == 1: remove counter
/// count == 0: remove key from backing db
/// counter doesn't contain k: remove key from backing db
///
/// Practically, this means that for each commit block turning from recent to ancient we do the
/// following:
/// is_canonical:
/// inserts: Ignored (left alone in the backing database).
/// deletes: Enacted; however, recent history queue is checked for ongoing references. This is
/// reduced as a preference to deletion from the backing database.
/// !is_canonical:
/// inserts: Reverted; however, recent history queue is checked for ongoing references. This is
/// reduced as a preference to deletion from the backing database.
/// deletes: Ignored (they were never inserted).
///
/// TODO: store_reclaim_period
pub struct EarlyMergeDB { pub struct EarlyMergeDB {
overlay: MemoryDB, overlay: MemoryDB,
backing: Arc<Database>, backing: Arc<Database>,
@ -336,55 +379,12 @@ impl JournalDB for EarlyMergeDB {
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec()) self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
} }
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))] fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> { let trace = false;
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, n] => [ ... ]
// TODO: store reclaim_period.
// When we make a new commit, we make a journal of all blocks in the recent history and record
// all keys that were inserted and deleted. The journal is ordered by era; multiple commits can
// share the same era. This forms a data structure similar to a queue but whose items are tuples.
// By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history
// into ancient history) then only one commit from the tuple is considered canonical. This commit
// is kept in the main backing database, whereas any others from the same era are reverted.
//
// It is possible that a key, properly available in the backing database be deleted and re-inserted
// in the recent history queue, yet have both operations in commits that are eventually non-canonical.
// To avoid the original, and still required, key from being deleted, we maintain a reference count
// which includes an original key, if any.
//
// The semantics of the `counter` are:
// insert key k:
// counter already contains k: count += 1
// counter doesn't contain k:
// backing db contains k: count = 1
// backing db doesn't contain k: insert into backing db, count = 0
// delete key k:
// counter contains k (count is asserted to be non-zero):
// count > 1: counter -= 1
// count == 1: remove counter
// count == 0: remove key from backing db
// counter doesn't contain k: remove key from backing db
//
// Practically, this means that for each commit block turning from recent to ancient we do the
// following:
// is_canonical:
// inserts: Ignored (left alone in the backing database).
// deletes: Enacted; however, recent history queue is checked for ongoing references. This is
// reduced as a preference to deletion from the backing database.
// !is_canonical:
// inserts: Reverted; however, recent history queue is checked for ongoing references. This is
// reduced as a preference to deletion from the backing database.
// deletes: Ignored (they were never inserted).
//
// record new commit's details. // record new commit's details.
let mut refs = self.refs.as_ref().unwrap().write(); let mut refs = self.refs.as_ref().unwrap().write();
let trace = false;
{ {
let mut index = 0usize; let mut index = 0usize;
let mut last; let mut last;
@ -403,7 +403,7 @@ impl JournalDB for EarlyMergeDB {
let drained = self.overlay.drain(); let drained = self.overlay.drain();
if trace { if trace {
trace!(target: "jdb", "commit: #{} ({}), end era: {:?}", now, id, end); trace!(target: "jdb", "commit: #{} ({})", now, id);
} }
let removes: Vec<H256> = drained let removes: Vec<H256> = drained
@ -431,85 +431,86 @@ impl JournalDB for EarlyMergeDB {
inserts.iter().foreach(|&(k, _)| {r.append(&k);}); inserts.iter().foreach(|&(k, _)| {r.append(&k);});
r.append(&removes); r.append(&removes);
Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace); Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace);
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
if trace { if trace {
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
trace!(target: "jdb.ops", " Inserts: {:?}", ins);
trace!(target: "jdb.ops", " Deletes: {:?}", removes); trace!(target: "jdb.ops", " Deletes: {:?}", removes);
trace!(target: "jdb.ops", " Inserts: {:?}", ins);
} }
batch.put(self.column, &last, r.as_raw()); batch.put(self.column, &last, r.as_raw());
if self.latest_era.map_or(true, |e| now > e) { if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)); batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now); self.latest_era = Some(now);
} }
Ok((ins.len() + removes.len()) as u32)
} }
}
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
let trace = false;
let mut refs = self.refs.as_ref().unwrap().write();
// apply old commits' details // apply old commits' details
if let Some((end_era, canon_id)) = end { let mut index = 0usize;
let mut index = 0usize; let mut last;
let mut last;
while let Some(rlp_data) = try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})) {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
if canon_id == rlp.val_at(0) { while let Some(rlp_data) = try!(self.backing.get(self.column, {
// Collect keys to be removed. Canon block - remove the (enacted) deletes. let mut r = RlpStream::new_list(3);
let deletes: Vec<H256> = rlp.val_at(2); r.append(&end_era);
if trace { r.append(&index);
trace!(target: "jdb.ops", " Expunging: {:?}", deletes); r.append(&&PADDING[..]);
} last = r.drain();
Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive, trace); &last
})) {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
if trace { if canon_id == &rlp.val_at::<H256>(0) {
trace!(target: "jdb.ops", " Finalising: {:?}", inserts); // Collect keys to be removed. Canon block - remove the (enacted) deletes.
} let deletes: Vec<H256> = rlp.val_at(2);
for k in &inserts { trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
match refs.get(k).cloned() { Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive, trace);
None => {
// [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert trace!(target: "jdb.ops", " Finalising: {:?}", inserts);
// already expunged from the queue (which is allowed since the key is in the archive). for k in &inserts {
// leave well alone. match refs.get(k).cloned() {
} None => {
Some( RefInfo{queue_refs: 1, in_archive: false} ) => { // [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert
// just delete the refs entry. // already expunged from the queue (which is allowed since the key is in the archive).
refs.remove(k); // leave well alone.
} }
Some( RefInfo{queue_refs: x, in_archive: false} ) => { Some( RefInfo{queue_refs: 1, in_archive: false} ) => {
// must set already in; , // just delete the refs entry.
Self::set_already_in(batch, self.column, k); refs.remove(k);
refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true }); }
} Some( RefInfo{queue_refs: x, in_archive: false} ) => {
Some( RefInfo{in_archive: true, ..} ) => { // must set already in; ,
// Invalid! Reinserted the same key twice. Self::set_already_in(batch, self.column, k);
warn!("Key {} inserted twice into same fork.", k); refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true });
} }
Some( RefInfo{in_archive: true, ..} ) => {
// Invalid! Reinserted the same key twice.
warn!("Key {} inserted twice into same fork.", k);
} }
} }
} else {
// Collect keys to be removed. Non-canon block - remove the (reverted) inserts.
if trace {
trace!(target: "jdb.ops", " Reverting: {:?}", inserts);
}
Self::remove_keys(&inserts, &mut refs, batch, self.column, RemoveFrom::Queue, trace);
} }
} else {
// Collect keys to be removed. Non-canon block - remove the (reverted) inserts.
trace!(target: "jdb.ops", " Reverting: {:?}", inserts);
Self::remove_keys(&inserts, &mut refs, batch, self.column, RemoveFrom::Queue, trace);
}
batch.delete(self.column, &last); batch.delete(self.column, &last);
index += 1; index += 1;
}
if trace {
trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
}
} }
if trace { trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
trace!(target: "jdb", "OK: {:?}", refs.clone()); trace!(target: "jdb", "OK: {:?}", refs.clone());
}
Ok(0) Ok(0)
} }

View File

@ -222,92 +222,107 @@ impl JournalDB for OverlayRecentDB {
.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) .or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
} }
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> { fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
// record new commit's details. trace!(target: "journaldb", "entry: #{} ({})", now, id);
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut journal_overlay = self.journal_overlay.write(); let mut journal_overlay = self.journal_overlay.write();
// flush previous changes // flush previous changes
journal_overlay.pending_overlay.clear(); journal_overlay.pending_overlay.clear();
{
let mut r = RlpStream::new_list(3);
let mut tx = self.transaction_overlay.drain();
let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect();
let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect();
// Increase counter for each inserted key no matter if the block is canonical or not.
let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None });
r.append(id);
r.begin_list(inserted_keys.len());
for (k, v) in insertions {
r.begin_list(2);
r.append(&k);
r.append(&v);
journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
}
r.append(&removed_keys);
let mut k = RlpStream::new_list(3); let mut r = RlpStream::new_list(3);
let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len()); let mut tx = self.transaction_overlay.drain();
k.append(&now); let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect();
k.append(&index); let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect();
k.append(&&PADDING[..]); let ops = inserted_keys.len() + removed_keys.len();
batch.put_vec(self.column, &k.drain(), r.out());
if journal_overlay.latest_era.map_or(true, |e| now > e) { // Increase counter for each inserted key no matter if the block is canonical or not.
batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec()); let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None });
journal_overlay.latest_era = Some(now);
} r.append(id);
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); r.begin_list(inserted_keys.len());
for (k, v) in insertions {
r.begin_list(2);
r.append(&k);
r.append(&v);
journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
}
r.append(&removed_keys);
let mut k = RlpStream::new_list(3);
let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len());
k.append(&now);
k.append(&index);
k.append(&&PADDING[..]);
batch.put_vec(self.column, &k.drain(), r.out());
if journal_overlay.latest_era.map_or(true, |e| now > e) {
batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec());
journal_overlay.latest_era = Some(now);
} }
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
Ok(ops as u32)
}
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
trace!(target: "journaldb", "canonical: #{} ({})", end_era, canon_id);
let mut journal_overlay = self.journal_overlay.write();
let journal_overlay = &mut *journal_overlay; let journal_overlay = &mut *journal_overlay;
let mut ops = 0;
// apply old commits' details // apply old commits' details
if let Some((end_era, canon_id)) = end { if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) {
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new();
let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); let mut canon_deletions: Vec<H256> = Vec::new();
let mut canon_deletions: Vec<H256> = Vec::new(); let mut overlay_deletions: Vec<H256> = Vec::new();
let mut overlay_deletions: Vec<H256> = Vec::new(); let mut index = 0usize;
let mut index = 0usize; for mut journal in records.drain(..) {
for mut journal in records.drain(..) { //delete the record from the db
//delete the record from the db let mut r = RlpStream::new_list(3);
let mut r = RlpStream::new_list(3); r.append(&end_era);
r.append(&end_era); r.append(&index);
r.append(&index); r.append(&&PADDING[..]);
r.append(&&PADDING[..]); batch.delete(self.column, &r.drain());
batch.delete(self.column, &r.drain()); trace!(target: "journaldb", "Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); {
{ if *canon_id == journal.id {
if canon_id == journal.id { for h in &journal.insertions {
for h in &journal.insertions { if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) { if rc > 0 {
if rc > 0 { canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy
canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy
}
} }
} }
canon_deletions = journal.deletions;
} }
overlay_deletions.append(&mut journal.insertions); canon_deletions = journal.deletions;
} }
index += 1; overlay_deletions.append(&mut journal.insertions);
} }
// apply canon inserts first index += 1;
for (k, v) in canon_insertions { }
batch.put(self.column, &k, &v);
journal_overlay.pending_overlay.insert(to_short_key(&k), v); ops += canon_insertions.len();
} ops += canon_deletions.len();
// update the overlay
for k in overlay_deletions { // apply canon inserts first
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); for (k, v) in canon_insertions {
} batch.put(self.column, &k, &v);
// apply canon deletions journal_overlay.pending_overlay.insert(to_short_key(&k), v);
for k in canon_deletions { }
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) { // update the overlay
batch.delete(self.column, &k); for k in overlay_deletions {
} journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
}
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
batch.delete(self.column, &k);
} }
} }
journal_overlay.journal.remove(&end_era);
} }
Ok(0) journal_overlay.journal.remove(&end_era);
Ok(ops as u32)
} }
fn flush(&self) { fn flush(&self) {

View File

@ -34,6 +34,17 @@ use std::env;
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect /// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before /// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect. /// the removals actually take effect.
///
/// journal format:
/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, n] => [ ... ]
///
/// when we make a new commit, we journal the inserts and removes.
/// for each end_era that we journaled that we are no passing by,
/// we remove all of its removes assuming it is canonical and all
/// of its inserts otherwise.
// TODO: store last_era, reclaim_period.
pub struct RefCountedDB { pub struct RefCountedDB {
forward: OverlayDB, forward: OverlayDB,
backing: Arc<Database>, backing: Arc<Database>,
@ -109,77 +120,66 @@ impl JournalDB for RefCountedDB {
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec()) self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
} }
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> { fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, n] => [ ... ]
// TODO: store last_era, reclaim_period.
// when we make a new commit, we journal the inserts and removes.
// for each end_era that we journaled that we are no passing by,
// we remove all of its removes assuming it is canonical and all
// of its inserts otherwise.
// record new commit's details. // record new commit's details.
{ let mut index = 0usize;
let mut index = 0usize; let mut last;
let mut last;
while try!(self.backing.get(self.column, { while try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})).is_some() {
index += 1;
}
let mut r = RlpStream::new_list(3);
r.append(id);
r.append(&self.inserts);
r.append(&self.removes);
batch.put(self.column, &last, r.as_raw());
let ops = self.inserts.len() + self.removes.len();
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
self.inserts.clear();
self.removes.clear();
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
Ok(ops as u32)
}
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
// apply old commits' details
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = {
try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3); let mut r = RlpStream::new_list(3);
r.append(&now); r.append(&end_era);
r.append(&index); r.append(&index);
r.append(&&PADDING[..]); r.append(&&PADDING[..]);
last = r.drain(); last = r.drain();
&last &last
})).is_some() { }))
index += 1; } {
} let rlp = Rlp::new(&rlp_data);
let our_id: H256 = rlp.val_at(0);
let mut r = RlpStream::new_list(3); let to_remove: Vec<H256> = rlp.val_at(if *canon_id == our_id {2} else {1});
r.append(id); trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
r.append(&self.inserts); for i in &to_remove {
r.append(&self.removes); self.forward.remove(i);
batch.put(self.column, &last, r.as_raw());
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
self.inserts.clear();
self.removes.clear();
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
}
// apply old commits' details
if let Some((end_era, canon_id)) = end {
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = {
// trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index);
try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
}))
} {
let rlp = Rlp::new(&rlp_data);
let our_id: H256 = rlp.val_at(0);
let to_remove: Vec<H256> = rlp.val_at(if canon_id == our_id {2} else {1});
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
for i in &to_remove {
self.forward.remove(i);
}
batch.delete(self.column, &last);
index += 1;
} }
batch.delete(self.column, &last);
index += 1;
} }
let r = try!(self.forward.commit_to_batch(batch)); let r = try!(self.forward.commit_to_batch(batch));

View File

@ -35,9 +35,12 @@ pub trait JournalDB: HashDB {
/// Get the latest era in the DB. None if there isn't yet any data in there. /// Get the latest era in the DB. None if there isn't yet any data in there.
fn latest_era(&self) -> Option<u64>; fn latest_era(&self) -> Option<u64>;
/// Commit all recent insert operations and canonical historical commits' removals from the /// Journal recent database operations as being associated with a given era and id.
/// old era to the backing database, reverting any non-canonical historical commit's inserts. // TODO: give the overlay to this function so journaldbs don't manage the overlays themeselves.
fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>; fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError>;
/// Mark a given block as canonical, indicating that competing blocks' states may be pruned out.
fn mark_canonical(&mut self, batch: &mut DBTransaction, era: u64, id: &H256) -> Result<u32, UtilError>;
/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions /// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated. /// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
@ -68,8 +71,13 @@ pub trait JournalDB: HashDB {
#[cfg(test)] #[cfg(test)]
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> { fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let mut batch = self.backing().transaction(); let mut batch = self.backing().transaction();
let res = try!(self.commit(&mut batch, now, id, end)); let mut ops = try!(self.journal_under(&mut batch, now, id));
let result = self.backing().write(batch).map(|_| res).map_err(Into::into);
if let Some((end_era, canon_id)) = end {
ops += try!(self.mark_canonical(&mut batch, end_era, &canon_id));
}
let result = self.backing().write(batch).map(|_| ops).map_err(Into::into);
self.flush(); self.flush();
result result
} }