diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 2173fdeb6..e805f0a60 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; #[cfg(test)] use std::env; @@ -92,7 +92,6 @@ impl JournalDB { /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - #[allow(cyclomatic_complexity)] pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -106,10 +105,16 @@ impl JournalDB { // we remove all of its removes assuming it is canonical and all // of its inserts otherwise. // - // we also keep track of the counters for each key inserted in the journal to handle the following cases: - // key K is removed in block A(N) and re-inserted in block B(N + C) (where C < H). K must not be deleted from the DB. - // key K is added in block A(N) and reverted in block B(N + C) (where C < H). K must be deleted - // key K is added in blocks A(N) and A'(N) and is reverted in block B(N + C ) (where C < H). K must not be deleted + // We also keep reference counters for each key inserted in the journal to handle + // the following cases where key K must not be deleted from the DB when processing removals : + // Given H is the journal size in eras, 0 <= C <= H. + // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). + // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). + // Key K is added in non-canonical era A'(N) canonical B(N + C). + // + // The counter is encreased each time a key is inserted in the journal in the commit. The list of insertions + // is saved with the era record. When the era becomes end_era and goes out of journal the counter is decreased + // and the key is safe to delete. // record new commit's details. let batch = WriteBatch::new(); @@ -130,7 +135,7 @@ impl JournalDB { let mut r = RlpStream::new_list(3); let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); - // Increase counter for each insrted key no matter if the block is canonical or not. + // Increase counter for each inserted key no matter if the block is canonical or not. for i in &inserts { *counters.entry(i.clone()).or_insert(0) += 1; } @@ -145,7 +150,8 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; - let mut canon_data: Option = None; + let mut to_remove: Vec = Vec::new(); + let mut canon_inserts: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); @@ -153,30 +159,33 @@ impl JournalDB { last = r.drain(); &last })) { - let canon = { - let rlp = Rlp::new(&rlp_data); - if canon_id != rlp.val_at(0) { - let to_add: Vec = rlp.val_at(1); - JournalDB::apply_removes(&to_add, &to_add, &mut counters, &batch); - false - } else { true } - }; - if canon { - canon_data = Some(rlp_data) + let rlp = Rlp::new(&rlp_data); + let inserts: Vec = rlp.val_at(1); + JournalDB::decrease_counters(&inserts, &mut counters); + // Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical + if canon_id == rlp.val_at(0) { + to_remove.extend(rlp.at(2).iter().map(|r| r.as_val::())); + canon_inserts = inserts; + } + else { + to_remove.extend(inserts); } try!(batch.delete(&last)); index += 1; } - // Canon must be commited last to handle a case when counter reaches 0 in a sibling block - if let Some(ref c) = canon_data { - let rlp = Rlp::new(&c); - let deleted = JournalDB::apply_removes(&rlp.val_at::>(1), &rlp.val_at::>(2), &mut counters, &batch); - trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deleted); - } + let canon_inserts = canon_inserts.drain(..).collect::>(); + // Purge removed keys if they are not referenced and not re-inserted in the canon commit + let mut deletes = 0; + for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { + try!(batch.delete(&h)); + deletes += 1; + } try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes); } + // Commit overlay insertions let mut ret = 0u32; let mut deletes = 0usize; for i in self.overlay.drain().into_iter() { @@ -198,10 +207,10 @@ impl JournalDB { Ok(ret) } - fn apply_removes(added: &[H256], removed: &[H256], counters: &mut HashMap, batch: &WriteBatch) -> usize { - let mut deleted = 0usize; - // Decrease the counters first - for i in added.iter() { + + // Decrease counters for given keys. Deletes obsolete counters + fn decrease_counters(keys: &[H256], counters: &mut HashMap) { + for i in keys.iter() { let delete_counter = { if let Some(mut cnt) = counters.get_mut(i) { *cnt -= 1; @@ -213,12 +222,6 @@ impl JournalDB { counters.remove(i); } } - // Remove only if counter reached zero - for i in removed.iter().filter(|i| !counters.contains_key(i)) { - batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); - deleted += 1; - } - deleted } fn payload(&self, key: &H256) -> Option {