From 51c95d4d67643f03fc0fd11cf241308acb01eb5a Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 21:57:55 +0100 Subject: [PATCH 1/8] Implement option 1. --- util/src/journaldb.rs | 393 +++++++++++++++++++++--------------------- 1 file changed, 194 insertions(+), 199 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 01e53f819..5f94dcbeb 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,15 +20,12 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use kvdb::{Database, DBTransaction, DatabaseConfig}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; #[cfg(test)] use std::env; /// Implementation of the HashDB trait for a disk-backed database with a memory overlay -/// and, possibly, latent-removal semantics. -/// -/// If `counters` is `None`, then it behaves exactly like OverlayDB. If not it behaves -/// differently: +/// and latent-removal semantics. /// /// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to /// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect @@ -36,8 +33,8 @@ use std::env; /// the removals actually take effect. pub struct JournalDB { overlay: MemoryDB, - backing: Arc, - counters: Option>>>, + backing: Arc, + counters: Arc>>, } impl Clone for JournalDB { @@ -50,51 +47,33 @@ impl Clone for JournalDB { } } -// all keys must be at least 12 bytes -const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const LAST_ERA_KEY : [u8; 4] = [ b'l', b'a', b's', b't' ]; +const VERSION_KEY : [u8; 4] = [ b'j', b'v', b'e', b'r' ]; -const DB_VERSION : u32 = 3; -const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; - -const PADDING : [u8; 10] = [ 0u8; 10 ]; +const DB_VERSION: u32 = 1; impl JournalDB { - - /// Create a new instance from file - pub fn new(path: &str) -> JournalDB { - Self::from_prefs(path, true) + /// Create a new instance given a `backing` database. + pub fn new(backing: DB) -> JournalDB { + let db = Arc::new(backing); + JournalDB::new_with_arc(db) } - /// Create a new instance from file - pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB { - let opts = DatabaseConfig { - prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix - }; - let backing = Database::open(&opts, path).unwrap_or_else(|e| { - panic!("Error opening state db: {}", e); - }); - let with_journal; - if !backing.is_empty() { + /// Create a new instance given a shared `backing` database. + pub fn new_with_arc(backing: Arc) -> JournalDB { + if backing.iterator(IteratorMode::Start).next().is_some() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => { with_journal = true; }, - Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, + Ok(Some(DB_VERSION)) => {}, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); - with_journal = prefer_journal; + backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); } - - let counters = if with_journal { - Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) - } else { - None - }; + let counters = JournalDB::read_counters(&backing); JournalDB { overlay: MemoryDB::new(), - backing: Arc::new(backing), - counters: counters, + backing: backing, + counters: Arc::new(RwLock::new(counters)), } } @@ -103,55 +82,93 @@ impl JournalDB { pub fn new_temp() -> JournalDB { let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - Self::new(dir.to_str().unwrap()) + Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) } /// Check if this database has any commits pub fn is_empty(&self) -> bool { - self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + self.backing.get(&LAST_ERA_KEY).expect("Low level database error").is_none() } - /// Commit all recent insert operations. - pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - let have_counters = self.counters.is_some(); - if have_counters { - self.commit_with_counters(now, id, end) - } else { - self.commit_without_counters() + fn morph_key(key: &H256, index: u8) -> Bytes { + let mut ret = key.bytes().to_owned(); + ret.push(index); + ret + } + + // The next three are valid only as long as there is an insert operation of `key` in the journal. + fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]); } + fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)); } + fn is_already_in(backing: &DB, key: &H256) -> bool { + backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() + } + + fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &DB, counters: &mut HashMap, batch: &WriteBatch) { + for &(ref h, ref d) in inserts { + if let Some(c) = counters.get_mut(h) { + // already counting. increment. + *c += 1; + continue; + } + + // this is the first entry for this node in the journal. + if backing.get(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?").is_some() { + // already in the backing DB. start counting, and remember it was already in. + Self::set_already_in(batch, &h); + counters.insert(h.clone(), 1); + continue; + } + + // Gets removed when a key leaves the journal, so should never be set when we're placing a new key. + //Self::reset_already_in(&h); + assert!(!Self::is_already_in(backing, &h)); + batch.put(&h.bytes(), d); } } - /// Drain the overlay and place it into a batch for the DB. - fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { - let mut inserts = 0usize; - let mut deletes = 0usize; - for i in overlay.drain().into_iter() { - let (key, (value, rc)) = i; - if rc > 0 { - assert!(rc == 1); - batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); - inserts += 1; - } - if rc < 0 { - assert!(rc == -1); - deletes += 1; + fn replay_keys(inserts: &Vec, backing: &DB, counters: &mut HashMap) { + for h in inserts { + if let Some(c) = counters.get_mut(h) { + // already counting. increment. + *c += 1; + continue; } + + // this is the first entry for this node in the journal. + // it is initialised to 1 if it was already in. + counters.insert(h.clone(), if Self::is_already_in(backing, h) {1} else {0}); } - trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); - inserts + deletes } - /// Just commit the overlay into the backing DB. - fn commit_without_counters(&mut self) -> Result { - let batch = DBTransaction::new(); - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); - try!(self.backing.write(batch)); - Ok(ret as u32) + fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &WriteBatch) { + for h in deletes.into_iter() { + let mut n: Option = None; + if let Some(c) = counters.get_mut(&h) { + if *c > 1 { + *c -= 1; + continue; + } else { + n = Some(*c); + } + } + match &n { + &Some(i) if i == 1 => { + counters.remove(&h); + Self::reset_already_in(batch, &h); + } + &None => { + // Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs. + //assert!(!Self::is_already_in(db, &h)); + batch.delete(&h.bytes()); + } + _ => panic!("Invalid value in counters: {:?}", n), + } + } } /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -159,62 +176,84 @@ impl JournalDB { // TODO: store reclaim_period. - // when we make a new commit, we journal the inserts and removes. - // for each end_era that we journaled that we are no passing by, - // we remove all of its removes assuming it is canonical and all - // of its inserts otherwise. + // When we make a new commit, we make a journal of all blocks in the recent history and record + // all keys that were inserted and deleted. The journal is ordered by era; multiple commits can + // share the same era. This forms a data structure similar to a queue but whose items are tuples. + // By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history + // into ancient history) then only one commit from the tuple is considered canonical. This commit + // is kept in the main backing database, whereas any others from the same era are reverted. + // + // It is possible that a key, properly available in the backing database be deleted and re-inserted + // in the recent history queue, yet have both operations in commits that are eventually non-canonical. + // To avoid the original, and still required, key from being deleted, we maintain a reference count + // which includes an original key, if any. + // + // The semantics of the `counter` are: + // insert key k: + // counter already contains k: count += 1 + // counter doesn't contain k: + // backing db contains k: count = 1 + // backing db doesn't contain k: insert into backing db, count = 0 + // delete key k: + // counter contains k (count is asserted to be non-zero): + // count > 1: counter -= 1 + // count == 1: remove counter + // count == 0: remove key from backing db + // counter doesn't contain k: remove key from backing db // - // We also keep reference counters for each key inserted in the journal to handle - // the following cases where key K must not be deleted from the DB when processing removals : - // Given H is the journal size in eras, 0 <= C <= H. - // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). - // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). - // Key K is added in non-canonical era A'(N) canonical B(N + C). + // Practically, this means that for each commit block turning from recent to ancient we do the + // following: + // is_canonical: + // inserts: Ignored (left alone in the backing database). + // deletes: Enacted; however, recent history queue is checked for ongoing references. This is + // reduced as a preference to deletion from the backing database. + // !is_canonical: + // inserts: Reverted; however, recent history queue is checked for ongoing references. This is + // reduced as a preference to deletion from the backing database. + // deletes: Ignored (they were never inserted). // - // The counter is encreased each time a key is inserted in the journal in the commit. The list of insertions - // is saved with the era record. When the era becomes end_era and goes out of journal the counter is decreased - // and the key is safe to delete. // record new commit's details. - trace!("commit: #{} ({}), end era: {:?}", now, id, end); - let mut counters = self.counters.as_ref().unwrap().write().unwrap(); - let batch = DBTransaction::new(); + let batch = WriteBatch::new(); + let mut counters = self.counters.write().unwrap(); { let mut index = 0usize; let mut last; - while { - let record = try!(self.backing.get({ - let mut r = RlpStream::new_list(3); - r.append(&now); - r.append(&index); - r.append(&&PADDING[..]); - last = r.drain(); - &last - })); - match record { - Some(r) => { - assert!(&Rlp::new(&r).val_at::(0) != id); - true - }, - None => false, - } - } { + while try!(self.backing.get({ + let mut r = RlpStream::new_list(2); + r.append(&now); + r.append(&index); + last = r.drain(); + &last + })).is_some() { index += 1; } + let drained = self.overlay.drain(); + let removes: Vec = drained + .iter() + .filter_map(|(ref k, &(_, ref c))| if *c < 0 {Some(k.clone())} else {None}).cloned() + .collect(); + let inserts: Vec<(H256, Bytes)> = drained + .into_iter() + .filter_map(|(k, (v, r))| if r > 0 { assert!(r == 1); Some((k, v)) } else { assert!(r >= -1); None }) + .collect(); + let mut r = RlpStream::new_list(3); - let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); - // Increase counter for each inserted key no matter if the block is canonical or not. - for i in &inserts { - *counters.entry(i.clone()).or_insert(0) += 1; - } - let removes: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c < 0).map(|(key, _)| key.clone()).collect(); r.append(id); - r.append(&inserts); + + // Process the new inserts. + // We use the inserts for three things. For each: + // - we place into the backing DB or increment the counter if already in; + // - we note in the backing db that it was already in; + // - we write the key into our journal for this block; + + r.begin_list(inserts.len()); + inserts.iter().foreach(|&(k, _)| {r.append(&k);}); r.append(&removes); + Self::insert_keys(&inserts, &self.backing, &mut counters, &batch); try!(batch.put(&last, r.as_raw())); - try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); } // apply old commits' details @@ -222,105 +261,66 @@ impl JournalDB { let mut index = 0usize; let mut last; let mut to_remove: Vec = Vec::new(); - let mut canon_inserts: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ - let mut r = RlpStream::new_list(3); + let mut r = RlpStream::new_list(2); r.append(&end_era); r.append(&index); - r.append(&&PADDING[..]); last = r.drain(); &last })) { let rlp = Rlp::new(&rlp_data); - let mut inserts: Vec = rlp.val_at(1); - JournalDB::decrease_counters(&inserts, &mut counters); + let inserts: Vec = rlp.val_at(1); + let deletes: Vec = rlp.val_at(2); // Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical - if canon_id == rlp.val_at(0) { - let mut canon_deletes: Vec = rlp.val_at(2); - trace!("Purging nodes deleted from canon: {:?}", canon_deletes); - to_remove.append(&mut canon_deletes); - canon_inserts = inserts; - } - else { - trace!("Purging nodes inserted in non-canon: {:?}", inserts); - to_remove.append(&mut inserts); - } - trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): {} entries", end_era, index, rlp.val_at::(0), canon_id, to_remove.len()); + Self::kill_keys(if canon_id == rlp.val_at(0) {deletes} else {inserts}, &mut counters, &batch); try!(batch.delete(&last)); index += 1; } - - let canon_inserts = canon_inserts.drain(..).collect::>(); - // Purge removed keys if they are not referenced and not re-inserted in the canon commit - let mut deletes = 0; - trace!("Purging filtered nodes: {:?}", to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)).collect::>()); - for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { - try!(batch.delete(&h)); - deletes += 1; - } - trace!("Total nodes purged: {}", deletes); + try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } - // Commit overlay insertions - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); try!(self.backing.write(batch)); - Ok(ret as u32) - } - - - // Decrease counters for given keys. Deletes obsolete counters - fn decrease_counters(keys: &[H256], counters: &mut HashMap) { - for i in keys.iter() { - let delete_counter = { - let cnt = counters.get_mut(i).expect("Missing key counter"); - *cnt -= 1; - *cnt == 0 - }; - if delete_counter { - counters.remove(i); - } - } +// trace!("JournalDB::commit() deleted {} nodes", deletes); + Ok(0) } fn payload(&self, key: &H256) -> Option { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &Database) -> HashMap { - let mut res = HashMap::new(); - if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { - let mut era = decode::(&val); + fn read_counters(db: &DB) -> HashMap { + let mut counters = HashMap::new(); + if let Some(val) = db.get(&LAST_ERA_KEY).expect("Low-level database error.") { + let mut era = decode::(&val) + 1; loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ - let mut r = RlpStream::new_list(3); + let mut r = RlpStream::new_list(2); r.append(&era); r.append(&index); - r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); - let to_add: Vec = rlp.val_at(1); - for h in to_add { - *res.entry(h).or_insert(0) += 1; - } + let inserts: Vec = rlp.val_at(1); + Self::replay_keys(&inserts, db, &mut counters); index += 1; }; - if index == 0 || era == 0 { + if index == 0 { break; } - era -= 1; + era += 1; } } - trace!("Recovered {} counters", res.len()); - res + trace!("Recovered {} counters", counters.len()); + counters } } impl HashDB for JournalDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iter() { + for (key, _) in self.backing.iterator(IteratorMode::Start) { let h = H256::from_slice(key.deref()); ret.insert(h, 1); } @@ -368,6 +368,28 @@ mod tests { use super::*; use hashdb::*; + #[test] + fn insert_same_in_fork() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + + let x = jdb.insert(b"X"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), None).unwrap(); + jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap(); + jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.remove(&x); + jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap(); + let x = jdb.insert(b"X"); + jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap(); + jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap(); + + assert!(jdb.exists(&x)); + } + #[test] fn long_history() { // history is 3 @@ -487,31 +509,4 @@ mod tests { jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } - - #[test] - fn reopen() { - let mut dir = ::std::env::temp_dir(); - dir.push(H32::random().hex()); - - let foo = { - let mut jdb = JournalDB::new(dir.to_str().unwrap()); - // history is 1 - let foo = jdb.insert(b"foo"); - jdb.commit(0, &b"0".sha3(), None).unwrap(); - foo - }; - - { - let mut jdb = JournalDB::new(dir.to_str().unwrap()); - jdb.remove(&foo); - jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); - } - - { - let mut jdb = JournalDB::new(dir.to_str().unwrap()); - assert!(jdb.exists(&foo)); - jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); - assert!(!jdb.exists(&foo)); - } - } } From bfd882c7e03dee68d39c8722e65c9b395ed8b438 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 22:05:12 +0100 Subject: [PATCH 2/8] Fix warnings. --- util/src/journaldb.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 5f94dcbeb..b7e495503 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -97,8 +97,8 @@ impl JournalDB { } // The next three are valid only as long as there is an insert operation of `key` in the journal. - fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]); } - fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)); } + fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } + fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } fn is_already_in(backing: &DB, key: &H256) -> bool { backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() } @@ -122,7 +122,7 @@ impl JournalDB { // Gets removed when a key leaves the journal, so should never be set when we're placing a new key. //Self::reset_already_in(&h); assert!(!Self::is_already_in(backing, &h)); - batch.put(&h.bytes(), d); + batch.put(&h.bytes(), d).expect("Low-level database error. Some issue with your hard disk?"); } } @@ -159,7 +159,7 @@ impl JournalDB { &None => { // Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs. //assert!(!Self::is_already_in(db, &h)); - batch.delete(&h.bytes()); + batch.delete(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?"); } _ => panic!("Invalid value in counters: {:?}", n), } @@ -260,7 +260,6 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; - let mut to_remove: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); From bc2fb14b5d6fbfac97a9e1d03e8e3d93d2da0283 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 22:39:04 +0100 Subject: [PATCH 3/8] Add memory usage reports. Update to be similar to master. --- ethcore/src/client.rs | 8 +- parity/main.rs | 3 +- util/src/journaldb.rs | 213 +++++++++++++++++++++++++++++++++++------- util/src/memorydb.rs | 6 ++ 4 files changed, 194 insertions(+), 36 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 858185873..9688cc527 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -190,6 +190,8 @@ pub struct ClientReport { pub transactions_applied: usize, /// How much gas has been processed so far. pub gas_processed: U256, + /// Memory used by state DB + pub state_db_mem: usize, } impl ClientReport { @@ -222,7 +224,7 @@ pub struct Client where V: Verifier { } const HISTORY: u64 = 1000; -const CLIENT_DB_VER_STR: &'static str = "4.0"; +const CLIENT_DB_VER_STR: &'static str = "5.1"; impl Client { /// Create a new client with given spec and DB path. @@ -432,7 +434,9 @@ impl Client where V: Verifier { /// Get the report. pub fn report(&self) -> ClientReport { - self.report.read().unwrap().clone() + let mut report = self.report.read().unwrap().clone(); + report.state_db_mem = self.state_db.lock().unwrap().mem_used(); + report } /// Tick the client. diff --git a/parity/main.rs b/parity/main.rs index 3f4243a0a..605fb315d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -395,7 +395,7 @@ impl Informant { let sync_info = sync.status(); if let (_, _, &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { - println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} chain, {} queue, {} sync ]", + println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]", chain_info.best_block_number, chain_info.best_block_hash, (report.blocks_imported - last_report.blocks_imported) / dur, @@ -408,6 +408,7 @@ impl Informant { queue_info.unverified_queue_size, queue_info.verified_queue_size, + Informant::format_bytes(report.state_db_mem), Informant::format_bytes(cache_info.total()), Informant::format_bytes(queue_info.mem_used), Informant::format_bytes(sync_info.mem_used), diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index b7e495503..f04affb7b 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] use std::env; @@ -33,8 +33,8 @@ use std::env; /// the removals actually take effect. pub struct JournalDB { overlay: MemoryDB, - backing: Arc, - counters: Arc>>, + backing: Arc, + counters: Option>>>, } impl Clone for JournalDB { @@ -47,33 +47,50 @@ impl Clone for JournalDB { } } -const LAST_ERA_KEY : [u8; 4] = [ b'l', b'a', b's', b't' ]; -const VERSION_KEY : [u8; 4] = [ b'j', b'v', b'e', b'r' ]; +// all keys must be at least 12 bytes +const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const DB_VERSION: u32 = 1; +const DB_VERSION : u32 = 3; +const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; + +const PADDING : [u8; 10] = [ 0u8; 10 ]; impl JournalDB { - /// Create a new instance given a `backing` database. - pub fn new(backing: DB) -> JournalDB { - let db = Arc::new(backing); - JournalDB::new_with_arc(db) + /// Create a new instance from file + pub fn new(path: &str) -> JournalDB { + Self::from_prefs(path, true) } - /// Create a new instance given a shared `backing` database. - pub fn new_with_arc(backing: Arc) -> JournalDB { - if backing.iterator(IteratorMode::Start).next().is_some() { + /// Create a new instance from file + pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB { + let opts = DatabaseConfig { + prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix + }; + let backing = Database::open(&opts, path).unwrap_or_else(|e| { + panic!("Error opening state db: {}", e); + }); + let with_journal; + if !backing.is_empty() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => {}, + Ok(Some(DB_VERSION)) => { with_journal = true; }, + Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); + backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); + with_journal = prefer_journal; } - let counters = JournalDB::read_counters(&backing); + + let counters = if with_journal { + Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) + } else { + None + }; JournalDB { overlay: MemoryDB::new(), - backing: backing, - counters: Arc::new(RwLock::new(counters)), + backing: Arc::new(backing), + counters: counters, } } @@ -87,7 +104,45 @@ impl JournalDB { /// Check if this database has any commits pub fn is_empty(&self) -> bool { - self.backing.get(&LAST_ERA_KEY).expect("Low level database error").is_none() + self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + } + + /// Commit all recent insert operations. + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + let have_counters = self.counters.is_some(); + if have_counters { + self.commit_with_counters(now, id, end) + } else { + self.commit_without_counters() + } + } + + /// Drain the overlay and place it into a batch for the DB. + fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { + let mut inserts = 0usize; + let mut deletes = 0usize; + for i in overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc > 0 { + assert!(rc == 1); + batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); + inserts += 1; + } + if rc < 0 { + assert!(rc == -1); + deletes += 1; + } + } + trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); + inserts + deletes + } + + /// Just commit the overlay into the backing DB. + fn commit_without_counters(&mut self) -> Result { + let batch = DBTransaction::new(); + let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); + try!(self.backing.write(batch)); + Ok(ret as u32) } fn morph_key(key: &H256, index: u8) -> Bytes { @@ -97,13 +152,13 @@ impl JournalDB { } // The next three are valid only as long as there is an insert operation of `key` in the journal. - fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } - fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } - fn is_already_in(backing: &DB, key: &H256) -> bool { + fn set_already_in(batch: &DBTransaction, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } + fn reset_already_in(batch: &DBTransaction, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } + fn is_already_in(backing: &Database, key: &H256) -> bool { backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() } - fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &DB, counters: &mut HashMap, batch: &WriteBatch) { + fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &Database, counters: &mut HashMap, batch: &DBTransaction) { for &(ref h, ref d) in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -126,7 +181,7 @@ impl JournalDB { } } - fn replay_keys(inserts: &Vec, backing: &DB, counters: &mut HashMap) { + fn replay_keys(inserts: &Vec, backing: &Database, counters: &mut HashMap) { for h in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -140,7 +195,7 @@ impl JournalDB { } } - fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &WriteBatch) { + fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &DBTransaction) { for h in deletes.into_iter() { let mut n: Option = None; if let Some(c) = counters.get_mut(&h) { @@ -168,7 +223,7 @@ impl JournalDB { /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -214,8 +269,8 @@ impl JournalDB { // // record new commit's details. - let batch = WriteBatch::new(); - let mut counters = self.counters.write().unwrap(); + let batch = DBTransaction::new(); + let mut counters = self.counters.as_ref().unwrap().write().unwrap(); { let mut index = 0usize; let mut last; @@ -224,6 +279,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&now); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })).is_some() { @@ -264,6 +320,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&end_era); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })) { @@ -275,7 +332,7 @@ impl JournalDB { try!(batch.delete(&last)); index += 1; } - try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + try!(batch.put(&LATEST_ERA_KEY, &encode(&end_era))); trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } @@ -288,9 +345,9 @@ impl JournalDB { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &DB) -> HashMap { + fn read_counters(db: &Database) -> HashMap { let mut counters = HashMap::new(); - if let Some(val) = db.get(&LAST_ERA_KEY).expect("Low-level database error.") { + if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val) + 1; loop { let mut index = 0usize; @@ -298,6 +355,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&era); r.append(&index); + r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); @@ -314,12 +372,17 @@ impl JournalDB { trace!("Recovered {} counters", counters.len()); counters } -} + + /// Returns heap memory size used + pub fn mem_used(&self) -> usize { + self.overlay.mem_used() + match &self.counters { &Some(ref c) => c.read().unwrap().heap_size_of_children(), &None => 0 } + } + } impl HashDB for JournalDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iterator(IteratorMode::Start) { + for (key, _) in self.backing.iter() { let h = H256::from_slice(key.deref()); ret.insert(h, 1); } @@ -508,4 +571,88 @@ mod tests { jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } + + + #[test] + fn reopen() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.emplace(bar.clone(), b"bar".to_vec()); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + } + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + + #[test] + fn reopen_remove() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.insert(b"foo"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + #[test] + fn reopen_fork() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let (foo, bar, baz) = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + (foo, bar, baz) + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&baz)); + assert!(!jdb.exists(&bar)); + } + } } diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 680a6e1d0..9cd018935 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -21,6 +21,7 @@ use bytes::*; use rlp::*; use sha3::*; use hashdb::*; +use heapsize::*; use std::mem; use std::collections::HashMap; @@ -143,6 +144,11 @@ impl MemoryDB { } self.raw(key).unwrap() } + + /// Returns the size of allocated heap memory + pub fn mem_used(&self) -> usize { + self.data.heap_size_of_children() + } } static NULL_RLP_STATIC: [u8; 1] = [0x80; 1]; From 4230fdfffea3c897402eaa095a46d959928e4692 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 22:43:21 +0100 Subject: [PATCH 4/8] More veriosning fixups. --- util/src/journaldb.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index f04affb7b..925c1c065 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -99,7 +99,7 @@ impl JournalDB { pub fn new_temp() -> JournalDB { let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) + Self::new(dir.to_str().unwrap()) } /// Check if this database has any commits @@ -269,14 +269,15 @@ impl JournalDB { // // record new commit's details. - let batch = DBTransaction::new(); + trace!("commit: #{} ({}), end era: {:?}", now, id, end); let mut counters = self.counters.as_ref().unwrap().write().unwrap(); + let batch = DBTransaction::new(); { let mut index = 0usize; let mut last; while try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&now); r.append(&index); r.append(&&PADDING[..]); @@ -310,6 +311,7 @@ impl JournalDB { r.append(&removes); Self::insert_keys(&inserts, &self.backing, &mut counters, &batch); try!(batch.put(&last, r.as_raw())); + try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); } // apply old commits' details @@ -317,7 +319,7 @@ impl JournalDB { let mut index = 0usize; let mut last; while let Some(rlp_data) = try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&end_era); r.append(&index); r.append(&&PADDING[..]); @@ -352,7 +354,7 @@ impl JournalDB { loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&era); r.append(&index); r.append(&&PADDING[..]); From 0980c7130a5f299846876ec000b2fdbf8bc82c15 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 06:58:43 +0100 Subject: [PATCH 5/8] Fix replay_keys Counters should never have an entry with zero value. --- util/src/journaldb.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 925c1c065..a008fa47e 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -191,7 +191,9 @@ impl JournalDB { // this is the first entry for this node in the journal. // it is initialised to 1 if it was already in. - counters.insert(h.clone(), if Self::is_already_in(backing, h) {1} else {0}); + if Self::is_already_in(backing, h) { + counters.insert(h.clone(), 1); + } } } From fd87633db662126ea95b29c1027d2d88d7565639 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 07:57:50 +0100 Subject: [PATCH 6/8] Remove superfluous LATEST_KEY write. --- util/src/journaldb.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index a008fa47e..bb79a447c 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -336,7 +336,6 @@ impl JournalDB { try!(batch.delete(&last)); index += 1; } - try!(batch.put(&LATEST_ERA_KEY, &encode(&end_era))); trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } From 73207c23557e958761ea72b91ccdaee4ce5dd457 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 08:01:14 +0100 Subject: [PATCH 7/8] Revert accidental beta regressions. --- util/src/journaldb.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index bb79a447c..2e78f1cce 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -351,7 +351,7 @@ impl JournalDB { fn read_counters(db: &Database) -> HashMap { let mut counters = HashMap::new(); if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { - let mut era = decode::(&val) + 1; + let mut era = decode::(&val); loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ @@ -366,10 +366,10 @@ impl JournalDB { Self::replay_keys(&inserts, db, &mut counters); index += 1; }; - if index == 0 { + if index == 0 || era == 0 { break; } - era += 1; + era -= 1; } } trace!("Recovered {} counters", counters.len()); From 4d1effb008303ef0a4ce98134a7c8658031ed2f2 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 09:10:02 +0100 Subject: [PATCH 8/8] Fix tests. --- util/src/journaldb.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 2e78f1cce..57af857a9 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -182,6 +182,7 @@ impl JournalDB { } fn replay_keys(inserts: &Vec, backing: &Database, counters: &mut HashMap) { + println!("replay_keys: inserts={:?}, counters={:?}", inserts, counters); for h in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -192,9 +193,11 @@ impl JournalDB { // this is the first entry for this node in the journal. // it is initialised to 1 if it was already in. if Self::is_already_in(backing, h) { + println!("replace_keys: Key {} was already in!", h); counters.insert(h.clone(), 1); } } + println!("replay_keys: (end) counters={:?}", counters); } fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &DBTransaction) { @@ -361,6 +364,7 @@ impl JournalDB { r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { + println!("read_counters: era={}, index={}", era, index); let rlp = Rlp::new(&rlp_data); let inserts: Vec = rlp.val_at(1); Self::replay_keys(&inserts, db, &mut counters); @@ -617,17 +621,23 @@ mod tests { // history is 1 let foo = jdb.insert(b"foo"); jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + + // foo is ancient history. + jdb.insert(b"foo"); - jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); foo }; { let mut jdb = JournalDB::new(dir.to_str().unwrap()); jdb.remove(&foo); - jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); - assert!(jdb.exists(&foo)); jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.remove(&foo); + jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap(); + jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap(); assert!(!jdb.exists(&foo)); } }