From 238b4962f0597f9ee98db503924fb2022dd15c85 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 26 Sep 2016 17:14:44 +0200 Subject: [PATCH] split journaldb commit into two functions: journal_under and mark_canonical --- util/src/journaldb/archivedb.rs | 11 +- util/src/journaldb/earlymergedb.rs | 215 +++++++++++++------------- util/src/journaldb/overlayrecentdb.rs | 155 ++++++++++--------- util/src/journaldb/refcounteddb.rs | 126 +++++++-------- util/src/journaldb/traits.rs | 18 ++- 5 files changed, 277 insertions(+), 248 deletions(-) diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs index 5f1eb71d4..efedfb766 100644 --- a/util/src/journaldb/archivedb.rs +++ b/util/src/journaldb/archivedb.rs @@ -35,8 +35,8 @@ const AUX_FLAG: u8 = 255; /// /// Like `OverlayDB`, there is a memory overlay; `commit()` must be called in order to /// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect -/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before -/// the removals actually take effect. +/// immediately. As this is an "archive" database, nothing is ever removed. This means +/// that the states of any block the node has ever processed will be accessible. pub struct ArchiveDB { overlay: MemoryDB, backing: Arc, @@ -156,7 +156,7 @@ impl JournalDB for ArchiveDB { self.latest_era.is_none() } - fn commit(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256, _end: Option<(u64, H256)>) -> Result { + fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256) -> Result { let mut inserts = 0usize; let mut deletes = 0usize; @@ -184,6 +184,11 @@ impl JournalDB for ArchiveDB { Ok((inserts + deletes) as u32) } + fn mark_canonical(&mut self, _batch: &mut DBTransaction, _end_era: u64, _canon_id: &H256) -> Result { + // keep everything! it's an archive, after all. + Ok(0) + } + fn inject(&mut self, batch: &mut DBTransaction) -> Result { let mut inserts = 0usize; let mut deletes = 0usize; diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index bbb4ed827..ef9868d41 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -61,6 +61,49 @@ enum RemoveFrom { /// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect /// immediately. Rather some age (based on a linear but arbitrary metric) must pass before /// the removals actually take effect. +/// +/// journal format: +/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] +/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] +/// [era, n] => [ ... ] +/// +/// When we make a new commit, we make a journal of all blocks in the recent history and record +/// all keys that were inserted and deleted. The journal is ordered by era; multiple commits can +/// share the same era. This forms a data structure similar to a queue but whose items are tuples. +/// By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history +/// into ancient history) then only one commit from the tuple is considered canonical. This commit +/// is kept in the main backing database, whereas any others from the same era are reverted. +/// +/// It is possible that a key, properly available in the backing database be deleted and re-inserted +/// in the recent history queue, yet have both operations in commits that are eventually non-canonical. +/// To avoid the original, and still required, key from being deleted, we maintain a reference count +/// which includes an original key, if any. +/// +/// The semantics of the `counter` are: +/// insert key k: +/// counter already contains k: count += 1 +/// counter doesn't contain k: +/// backing db contains k: count = 1 +/// backing db doesn't contain k: insert into backing db, count = 0 +/// delete key k: +/// counter contains k (count is asserted to be non-zero): +/// count > 1: counter -= 1 +/// count == 1: remove counter +/// count == 0: remove key from backing db +/// counter doesn't contain k: remove key from backing db +/// +/// Practically, this means that for each commit block turning from recent to ancient we do the +/// following: +/// is_canonical: +/// inserts: Ignored (left alone in the backing database). +/// deletes: Enacted; however, recent history queue is checked for ongoing references. This is +/// reduced as a preference to deletion from the backing database. +/// !is_canonical: +/// inserts: Reverted; however, recent history queue is checked for ongoing references. This is +/// reduced as a preference to deletion from the backing database. +/// deletes: Ignored (they were never inserted). +/// +/// TODO: store_reclaim_period pub struct EarlyMergeDB { overlay: MemoryDB, backing: Arc, @@ -336,55 +379,12 @@ impl JournalDB for EarlyMergeDB { self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec()) } - #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] - fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - // journal format: - // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] - // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] - // [era, n] => [ ... ] - - // TODO: store reclaim_period. - - // When we make a new commit, we make a journal of all blocks in the recent history and record - // all keys that were inserted and deleted. The journal is ordered by era; multiple commits can - // share the same era. This forms a data structure similar to a queue but whose items are tuples. - // By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history - // into ancient history) then only one commit from the tuple is considered canonical. This commit - // is kept in the main backing database, whereas any others from the same era are reverted. - // - // It is possible that a key, properly available in the backing database be deleted and re-inserted - // in the recent history queue, yet have both operations in commits that are eventually non-canonical. - // To avoid the original, and still required, key from being deleted, we maintain a reference count - // which includes an original key, if any. - // - // The semantics of the `counter` are: - // insert key k: - // counter already contains k: count += 1 - // counter doesn't contain k: - // backing db contains k: count = 1 - // backing db doesn't contain k: insert into backing db, count = 0 - // delete key k: - // counter contains k (count is asserted to be non-zero): - // count > 1: counter -= 1 - // count == 1: remove counter - // count == 0: remove key from backing db - // counter doesn't contain k: remove key from backing db - // - // Practically, this means that for each commit block turning from recent to ancient we do the - // following: - // is_canonical: - // inserts: Ignored (left alone in the backing database). - // deletes: Enacted; however, recent history queue is checked for ongoing references. This is - // reduced as a preference to deletion from the backing database. - // !is_canonical: - // inserts: Reverted; however, recent history queue is checked for ongoing references. This is - // reduced as a preference to deletion from the backing database. - // deletes: Ignored (they were never inserted). - // + fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result { + let trace = false; // record new commit's details. let mut refs = self.refs.as_ref().unwrap().write(); - let trace = false; + { let mut index = 0usize; let mut last; @@ -403,7 +403,7 @@ impl JournalDB for EarlyMergeDB { let drained = self.overlay.drain(); if trace { - trace!(target: "jdb", "commit: #{} ({}), end era: {:?}", now, id, end); + trace!(target: "jdb", "commit: #{} ({})", now, id); } let removes: Vec = drained @@ -431,85 +431,86 @@ impl JournalDB for EarlyMergeDB { inserts.iter().foreach(|&(k, _)| {r.append(&k);}); r.append(&removes); Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace); + + let ins = inserts.iter().map(|&(k, _)| k).collect::>(); + if trace { - let ins = inserts.iter().map(|&(k, _)| k).collect::>(); - trace!(target: "jdb.ops", " Inserts: {:?}", ins); trace!(target: "jdb.ops", " Deletes: {:?}", removes); + trace!(target: "jdb.ops", " Inserts: {:?}", ins); } + batch.put(self.column, &last, r.as_raw()); if self.latest_era.map_or(true, |e| now > e) { batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)); self.latest_era = Some(now); } + + Ok((ins.len() + removes.len()) as u32) } + } + + #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] + fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result { + let trace = false; + + let mut refs = self.refs.as_ref().unwrap().write(); // apply old commits' details - if let Some((end_era, canon_id)) = end { - let mut index = 0usize; - let mut last; - while let Some(rlp_data) = try!(self.backing.get(self.column, { - let mut r = RlpStream::new_list(3); - r.append(&end_era); - r.append(&index); - r.append(&&PADDING[..]); - last = r.drain(); - &last - })) { - let rlp = Rlp::new(&rlp_data); - let inserts: Vec = rlp.val_at(1); + let mut index = 0usize; + let mut last; - if canon_id == rlp.val_at(0) { - // Collect keys to be removed. Canon block - remove the (enacted) deletes. - let deletes: Vec = rlp.val_at(2); - if trace { - trace!(target: "jdb.ops", " Expunging: {:?}", deletes); - } - Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive, trace); + while let Some(rlp_data) = try!(self.backing.get(self.column, { + let mut r = RlpStream::new_list(3); + r.append(&end_era); + r.append(&index); + r.append(&&PADDING[..]); + last = r.drain(); + &last + })) { + let rlp = Rlp::new(&rlp_data); + let inserts: Vec = rlp.val_at(1); - if trace { - trace!(target: "jdb.ops", " Finalising: {:?}", inserts); - } - for k in &inserts { - match refs.get(k).cloned() { - None => { - // [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert - // already expunged from the queue (which is allowed since the key is in the archive). - // leave well alone. - } - Some( RefInfo{queue_refs: 1, in_archive: false} ) => { - // just delete the refs entry. - refs.remove(k); - } - Some( RefInfo{queue_refs: x, in_archive: false} ) => { - // must set already in; , - Self::set_already_in(batch, self.column, k); - refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true }); - } - Some( RefInfo{in_archive: true, ..} ) => { - // Invalid! Reinserted the same key twice. - warn!("Key {} inserted twice into same fork.", k); - } + if canon_id == &rlp.val_at::(0) { + // Collect keys to be removed. Canon block - remove the (enacted) deletes. + let deletes: Vec = rlp.val_at(2); + trace!(target: "jdb.ops", " Expunging: {:?}", deletes); + Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive, trace); + + trace!(target: "jdb.ops", " Finalising: {:?}", inserts); + for k in &inserts { + match refs.get(k).cloned() { + None => { + // [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert + // already expunged from the queue (which is allowed since the key is in the archive). + // leave well alone. + } + Some( RefInfo{queue_refs: 1, in_archive: false} ) => { + // just delete the refs entry. + refs.remove(k); + } + Some( RefInfo{queue_refs: x, in_archive: false} ) => { + // must set already in; , + Self::set_already_in(batch, self.column, k); + refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true }); + } + Some( RefInfo{in_archive: true, ..} ) => { + // Invalid! Reinserted the same key twice. + warn!("Key {} inserted twice into same fork.", k); } } - } else { - // Collect keys to be removed. Non-canon block - remove the (reverted) inserts. - if trace { - trace!(target: "jdb.ops", " Reverting: {:?}", inserts); - } - Self::remove_keys(&inserts, &mut refs, batch, self.column, RemoveFrom::Queue, trace); } + } else { + // Collect keys to be removed. Non-canon block - remove the (reverted) inserts. + trace!(target: "jdb.ops", " Reverting: {:?}", inserts); + Self::remove_keys(&inserts, &mut refs, batch, self.column, RemoveFrom::Queue, trace); + } - batch.delete(self.column, &last); - index += 1; - } - if trace { - trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); - } + batch.delete(self.column, &last); + index += 1; } - if trace { - trace!(target: "jdb", "OK: {:?}", refs.clone()); - } + trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); + trace!(target: "jdb", "OK: {:?}", refs.clone()); Ok(0) } diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index bd14eb161..0d1149a85 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -222,92 +222,107 @@ impl JournalDB for OverlayRecentDB { .or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) } - fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - // record new commit's details. - trace!("commit: #{} ({}), end era: {:?}", now, id, end); + fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result { + trace!(target: "journaldb", "entry: #{} ({})", now, id); + let mut journal_overlay = self.journal_overlay.write(); + // flush previous changes journal_overlay.pending_overlay.clear(); - { - let mut r = RlpStream::new_list(3); - let mut tx = self.transaction_overlay.drain(); - let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect(); - let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect(); - // Increase counter for each inserted key no matter if the block is canonical or not. - let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None }); - r.append(id); - r.begin_list(inserted_keys.len()); - for (k, v) in insertions { - r.begin_list(2); - r.append(&k); - r.append(&v); - journal_overlay.backing_overlay.emplace(to_short_key(&k), v); - } - r.append(&removed_keys); - let mut k = RlpStream::new_list(3); - let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len()); - k.append(&now); - k.append(&index); - k.append(&&PADDING[..]); - batch.put_vec(self.column, &k.drain(), r.out()); - if journal_overlay.latest_era.map_or(true, |e| now > e) { - batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec()); - journal_overlay.latest_era = Some(now); - } - journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); + let mut r = RlpStream::new_list(3); + let mut tx = self.transaction_overlay.drain(); + let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect(); + let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect(); + let ops = inserted_keys.len() + removed_keys.len(); + + // Increase counter for each inserted key no matter if the block is canonical or not. + let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None }); + + r.append(id); + r.begin_list(inserted_keys.len()); + for (k, v) in insertions { + r.begin_list(2); + r.append(&k); + r.append(&v); + journal_overlay.backing_overlay.emplace(to_short_key(&k), v); + } + r.append(&removed_keys); + + let mut k = RlpStream::new_list(3); + let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len()); + k.append(&now); + k.append(&index); + k.append(&&PADDING[..]); + batch.put_vec(self.column, &k.drain(), r.out()); + if journal_overlay.latest_era.map_or(true, |e| now > e) { + batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec()); + journal_overlay.latest_era = Some(now); } + journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); + Ok(ops as u32) + } + + fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result { + trace!(target: "journaldb", "canonical: #{} ({})", end_era, canon_id); + + let mut journal_overlay = self.journal_overlay.write(); let journal_overlay = &mut *journal_overlay; + + let mut ops = 0; // apply old commits' details - if let Some((end_era, canon_id)) = end { - if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { - let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); - let mut canon_deletions: Vec = Vec::new(); - let mut overlay_deletions: Vec = Vec::new(); - let mut index = 0usize; - for mut journal in records.drain(..) { - //delete the record from the db - let mut r = RlpStream::new_list(3); - r.append(&end_era); - r.append(&index); - r.append(&&PADDING[..]); - batch.delete(self.column, &r.drain()); - trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); - { - if canon_id == journal.id { - for h in &journal.insertions { - if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) { - if rc > 0 { - canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy - } + if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { + let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); + let mut canon_deletions: Vec = Vec::new(); + let mut overlay_deletions: Vec = Vec::new(); + let mut index = 0usize; + for mut journal in records.drain(..) { + //delete the record from the db + let mut r = RlpStream::new_list(3); + r.append(&end_era); + r.append(&index); + r.append(&&PADDING[..]); + batch.delete(self.column, &r.drain()); + trace!(target: "journaldb", "Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); + { + if *canon_id == journal.id { + for h in &journal.insertions { + if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) { + if rc > 0 { + canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy } } - canon_deletions = journal.deletions; } - overlay_deletions.append(&mut journal.insertions); + canon_deletions = journal.deletions; } - index += 1; + overlay_deletions.append(&mut journal.insertions); } - // apply canon inserts first - for (k, v) in canon_insertions { - batch.put(self.column, &k, &v); - journal_overlay.pending_overlay.insert(to_short_key(&k), v); - } - // update the overlay - for k in overlay_deletions { - journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); - } - // apply canon deletions - for k in canon_deletions { - if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) { - batch.delete(self.column, &k); - } + index += 1; + } + + ops += canon_insertions.len(); + ops += canon_deletions.len(); + + // apply canon inserts first + for (k, v) in canon_insertions { + batch.put(self.column, &k, &v); + journal_overlay.pending_overlay.insert(to_short_key(&k), v); + } + // update the overlay + for k in overlay_deletions { + journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); + } + // apply canon deletions + for k in canon_deletions { + if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) { + batch.delete(self.column, &k); } } - journal_overlay.journal.remove(&end_era); } - Ok(0) + journal_overlay.journal.remove(&end_era); + + Ok(ops as u32) } fn flush(&self) { diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index 5e3f09606..e6a0f5dcc 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -34,6 +34,17 @@ use std::env; /// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect /// immediately. Rather some age (based on a linear but arbitrary metric) must pass before /// the removals actually take effect. +/// +/// journal format: +/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] +/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] +/// [era, n] => [ ... ] +/// +/// when we make a new commit, we journal the inserts and removes. +/// for each end_era that we journaled that we are no passing by, +/// we remove all of its removes assuming it is canonical and all +/// of its inserts otherwise. +// TODO: store last_era, reclaim_period. pub struct RefCountedDB { forward: OverlayDB, backing: Arc, @@ -109,77 +120,66 @@ impl JournalDB for RefCountedDB { self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec()) } - fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - // journal format: - // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] - // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] - // [era, n] => [ ... ] - - // TODO: store last_era, reclaim_period. - - // when we make a new commit, we journal the inserts and removes. - // for each end_era that we journaled that we are no passing by, - // we remove all of its removes assuming it is canonical and all - // of its inserts otherwise. - + fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result { // record new commit's details. - { - let mut index = 0usize; - let mut last; + let mut index = 0usize; + let mut last; - while try!(self.backing.get(self.column, { + while try!(self.backing.get(self.column, { + let mut r = RlpStream::new_list(3); + r.append(&now); + r.append(&index); + r.append(&&PADDING[..]); + last = r.drain(); + &last + })).is_some() { + index += 1; + } + + let mut r = RlpStream::new_list(3); + r.append(id); + r.append(&self.inserts); + r.append(&self.removes); + batch.put(self.column, &last, r.as_raw()); + + let ops = self.inserts.len() + self.removes.len(); + + trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes); + + self.inserts.clear(); + self.removes.clear(); + + if self.latest_era.map_or(true, |e| now > e) { + batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)); + self.latest_era = Some(now); + } + + Ok(ops as u32) + } + + fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result { + // apply old commits' details + let mut index = 0usize; + let mut last; + while let Some(rlp_data) = { + try!(self.backing.get(self.column, { let mut r = RlpStream::new_list(3); - r.append(&now); + r.append(&end_era); r.append(&index); r.append(&&PADDING[..]); last = r.drain(); &last - })).is_some() { - index += 1; - } - - let mut r = RlpStream::new_list(3); - r.append(id); - r.append(&self.inserts); - r.append(&self.removes); - batch.put(self.column, &last, r.as_raw()); - - trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes); - - self.inserts.clear(); - self.removes.clear(); - - if self.latest_era.map_or(true, |e| now > e) { - batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)); - self.latest_era = Some(now); - } - } - - // apply old commits' details - if let Some((end_era, canon_id)) = end { - let mut index = 0usize; - let mut last; - while let Some(rlp_data) = { -// trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index); - try!(self.backing.get(self.column, { - let mut r = RlpStream::new_list(3); - r.append(&end_era); - r.append(&index); - r.append(&&PADDING[..]); - last = r.drain(); - &last - })) - } { - let rlp = Rlp::new(&rlp_data); - let our_id: H256 = rlp.val_at(0); - let to_remove: Vec = rlp.val_at(if canon_id == our_id {2} else {1}); - trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove); - for i in &to_remove { - self.forward.remove(i); - } - batch.delete(self.column, &last); - index += 1; + })) + } { + let rlp = Rlp::new(&rlp_data); + let our_id: H256 = rlp.val_at(0); + let to_remove: Vec = rlp.val_at(if *canon_id == our_id {2} else {1}); + trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove); + for i in &to_remove { + self.forward.remove(i); } + batch.delete(self.column, &last); + index += 1; } let r = try!(self.forward.commit_to_batch(batch)); diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index 85cc7fe58..afa2bb9f4 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -35,9 +35,12 @@ pub trait JournalDB: HashDB { /// Get the latest era in the DB. None if there isn't yet any data in there. fn latest_era(&self) -> Option; - /// Commit all recent insert operations and canonical historical commits' removals from the - /// old era to the backing database, reverting any non-canonical historical commit's inserts. - fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result; + /// Journal recent database operations as being associated with a given era and id. + // TODO: give the overlay to this function so journaldbs don't manage the overlays themeselves. + fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result; + + /// Mark a given block as canonical, indicating that competing blocks' states may be pruned out. + fn mark_canonical(&mut self, batch: &mut DBTransaction, era: u64, id: &H256) -> Result; /// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions /// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated. @@ -68,8 +71,13 @@ pub trait JournalDB: HashDB { #[cfg(test)] fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { let mut batch = self.backing().transaction(); - let res = try!(self.commit(&mut batch, now, id, end)); - let result = self.backing().write(batch).map(|_| res).map_err(Into::into); + let mut ops = try!(self.journal_under(&mut batch, now, id)); + + if let Some((end_era, canon_id)) = end { + ops += try!(self.mark_canonical(&mut batch, end_era, &canon_id)); + } + + let result = self.backing().write(batch).map(|_| ops).map_err(Into::into); self.flush(); result }