From 4af85b488b659d63d54f3d34cd5eb1af675feaf1 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 5 Feb 2016 22:54:33 +0100 Subject: [PATCH] Fixed an issue with forked counters --- util/src/journaldb.rs | 90 ++++++++++++++++++++++++++++++------------- 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 810b06727..2173fdeb6 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector}; #[cfg(test)] use std::env; @@ -105,6 +105,11 @@ impl JournalDB { // for each end_era that we journaled that we are no passing by, // we remove all of its removes assuming it is canonical and all // of its inserts otherwise. + // + // we also keep track of the counters for each key inserted in the journal to handle the following cases: + // key K is removed in block A(N) and re-inserted in block B(N + C) (where C < H). K must not be deleted from the DB. + // key K is added in block A(N) and reverted in block B(N + C) (where C < H). K must be deleted + // key K is added in blocks A(N) and A'(N) and is reverted in block B(N + C ) (where C < H). K must not be deleted // record new commit's details. let batch = WriteBatch::new(); @@ -125,6 +130,7 @@ impl JournalDB { let mut r = RlpStream::new_list(3); let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); + // Increase counter for each insrted key no matter if the block is canonical or not. for i in &inserts { *counters.entry(i.clone()).or_insert(0) += 1; } @@ -139,6 +145,7 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; + let mut canon_data: Option = None; while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); @@ -146,35 +153,26 @@ impl JournalDB { last = r.drain(); &last })) { - let to_add; - let rlp = Rlp::new(&rlp_data); - { - to_add = rlp.val_at(1); - for i in &to_add { - let delete_counter = { - if let Some(mut cnt) = counters.get_mut(i) { - *cnt -= 1; - *cnt == 0 - } - else { false } - - }; - if delete_counter { - counters.remove(i); - } - } + let canon = { + let rlp = Rlp::new(&rlp_data); + if canon_id != rlp.val_at(0) { + let to_add: Vec = rlp.val_at(1); + JournalDB::apply_removes(&to_add, &to_add, &mut counters, &batch); + false + } else { true } + }; + if canon { + canon_data = Some(rlp_data) } - let to_remove: Vec = if canon_id == rlp.val_at(0) {rlp.val_at(2)} else {to_add}; - for i in &to_remove { - if !counters.contains_key(i) { - batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); - } - } - try!(batch.delete(&last)); - trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len()); index += 1; } + // Canon must be commited last to handle a case when counter reaches 0 in a sibling block + if let Some(ref c) = canon_data { + let rlp = Rlp::new(&c); + let deleted = JournalDB::apply_removes(&rlp.val_at::>(1), &rlp.val_at::>(2), &mut counters, &batch); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deleted); + } try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); } @@ -200,6 +198,29 @@ impl JournalDB { Ok(ret) } + fn apply_removes(added: &[H256], removed: &[H256], counters: &mut HashMap, batch: &WriteBatch) -> usize { + let mut deleted = 0usize; + // Decrease the counters first + for i in added.iter() { + let delete_counter = { + if let Some(mut cnt) = counters.get_mut(i) { + *cnt -= 1; + *cnt == 0 + } + else { false } + }; + if delete_counter { + counters.remove(i); + } + } + // Remove only if counter reached zero + for i in removed.iter().filter(|i| !counters.contains_key(i)) { + batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); + deleted += 1; + } + deleted + } + fn payload(&self, key: &H256) -> Option { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } @@ -387,4 +408,21 @@ mod tests { jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } + + #[test] + fn fork_same_key() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + + let foo = jdb.insert(b"foo"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.insert(b"foo"); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + + jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } }