diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 858185873..9688cc527 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -190,6 +190,8 @@ pub struct ClientReport { pub transactions_applied: usize, /// How much gas has been processed so far. pub gas_processed: U256, + /// Memory used by state DB + pub state_db_mem: usize, } impl ClientReport { @@ -222,7 +224,7 @@ pub struct Client where V: Verifier { } const HISTORY: u64 = 1000; -const CLIENT_DB_VER_STR: &'static str = "4.0"; +const CLIENT_DB_VER_STR: &'static str = "5.1"; impl Client { /// Create a new client with given spec and DB path. @@ -432,7 +434,9 @@ impl Client where V: Verifier { /// Get the report. pub fn report(&self) -> ClientReport { - self.report.read().unwrap().clone() + let mut report = self.report.read().unwrap().clone(); + report.state_db_mem = self.state_db.lock().unwrap().mem_used(); + report } /// Tick the client. diff --git a/parity/main.rs b/parity/main.rs index 3f4243a0a..605fb315d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -395,7 +395,7 @@ impl Informant { let sync_info = sync.status(); if let (_, _, &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { - println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} chain, {} queue, {} sync ]", + println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]", chain_info.best_block_number, chain_info.best_block_hash, (report.blocks_imported - last_report.blocks_imported) / dur, @@ -408,6 +408,7 @@ impl Informant { queue_info.unverified_queue_size, queue_info.verified_queue_size, + Informant::format_bytes(report.state_db_mem), Informant::format_bytes(cache_info.total()), Informant::format_bytes(queue_info.mem_used), Informant::format_bytes(sync_info.mem_used), diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index b7e495503..f04affb7b 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] use std::env; @@ -33,8 +33,8 @@ use std::env; /// the removals actually take effect. pub struct JournalDB { overlay: MemoryDB, - backing: Arc, - counters: Arc>>, + backing: Arc, + counters: Option>>>, } impl Clone for JournalDB { @@ -47,33 +47,50 @@ impl Clone for JournalDB { } } -const LAST_ERA_KEY : [u8; 4] = [ b'l', b'a', b's', b't' ]; -const VERSION_KEY : [u8; 4] = [ b'j', b'v', b'e', b'r' ]; +// all keys must be at least 12 bytes +const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const DB_VERSION: u32 = 1; +const DB_VERSION : u32 = 3; +const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; + +const PADDING : [u8; 10] = [ 0u8; 10 ]; impl JournalDB { - /// Create a new instance given a `backing` database. - pub fn new(backing: DB) -> JournalDB { - let db = Arc::new(backing); - JournalDB::new_with_arc(db) + /// Create a new instance from file + pub fn new(path: &str) -> JournalDB { + Self::from_prefs(path, true) } - /// Create a new instance given a shared `backing` database. - pub fn new_with_arc(backing: Arc) -> JournalDB { - if backing.iterator(IteratorMode::Start).next().is_some() { + /// Create a new instance from file + pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB { + let opts = DatabaseConfig { + prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix + }; + let backing = Database::open(&opts, path).unwrap_or_else(|e| { + panic!("Error opening state db: {}", e); + }); + let with_journal; + if !backing.is_empty() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => {}, + Ok(Some(DB_VERSION)) => { with_journal = true; }, + Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); + backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); + with_journal = prefer_journal; } - let counters = JournalDB::read_counters(&backing); + + let counters = if with_journal { + Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) + } else { + None + }; JournalDB { overlay: MemoryDB::new(), - backing: backing, - counters: Arc::new(RwLock::new(counters)), + backing: Arc::new(backing), + counters: counters, } } @@ -87,7 +104,45 @@ impl JournalDB { /// Check if this database has any commits pub fn is_empty(&self) -> bool { - self.backing.get(&LAST_ERA_KEY).expect("Low level database error").is_none() + self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + } + + /// Commit all recent insert operations. + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + let have_counters = self.counters.is_some(); + if have_counters { + self.commit_with_counters(now, id, end) + } else { + self.commit_without_counters() + } + } + + /// Drain the overlay and place it into a batch for the DB. + fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { + let mut inserts = 0usize; + let mut deletes = 0usize; + for i in overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc > 0 { + assert!(rc == 1); + batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); + inserts += 1; + } + if rc < 0 { + assert!(rc == -1); + deletes += 1; + } + } + trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); + inserts + deletes + } + + /// Just commit the overlay into the backing DB. + fn commit_without_counters(&mut self) -> Result { + let batch = DBTransaction::new(); + let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); + try!(self.backing.write(batch)); + Ok(ret as u32) } fn morph_key(key: &H256, index: u8) -> Bytes { @@ -97,13 +152,13 @@ impl JournalDB { } // The next three are valid only as long as there is an insert operation of `key` in the journal. - fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } - fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } - fn is_already_in(backing: &DB, key: &H256) -> bool { + fn set_already_in(batch: &DBTransaction, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } + fn reset_already_in(batch: &DBTransaction, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } + fn is_already_in(backing: &Database, key: &H256) -> bool { backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() } - fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &DB, counters: &mut HashMap, batch: &WriteBatch) { + fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &Database, counters: &mut HashMap, batch: &DBTransaction) { for &(ref h, ref d) in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -126,7 +181,7 @@ impl JournalDB { } } - fn replay_keys(inserts: &Vec, backing: &DB, counters: &mut HashMap) { + fn replay_keys(inserts: &Vec, backing: &Database, counters: &mut HashMap) { for h in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -140,7 +195,7 @@ impl JournalDB { } } - fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &WriteBatch) { + fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &DBTransaction) { for h in deletes.into_iter() { let mut n: Option = None; if let Some(c) = counters.get_mut(&h) { @@ -168,7 +223,7 @@ impl JournalDB { /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -214,8 +269,8 @@ impl JournalDB { // // record new commit's details. - let batch = WriteBatch::new(); - let mut counters = self.counters.write().unwrap(); + let batch = DBTransaction::new(); + let mut counters = self.counters.as_ref().unwrap().write().unwrap(); { let mut index = 0usize; let mut last; @@ -224,6 +279,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&now); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })).is_some() { @@ -264,6 +320,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&end_era); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })) { @@ -275,7 +332,7 @@ impl JournalDB { try!(batch.delete(&last)); index += 1; } - try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + try!(batch.put(&LATEST_ERA_KEY, &encode(&end_era))); trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } @@ -288,9 +345,9 @@ impl JournalDB { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &DB) -> HashMap { + fn read_counters(db: &Database) -> HashMap { let mut counters = HashMap::new(); - if let Some(val) = db.get(&LAST_ERA_KEY).expect("Low-level database error.") { + if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val) + 1; loop { let mut index = 0usize; @@ -298,6 +355,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&era); r.append(&index); + r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); @@ -314,12 +372,17 @@ impl JournalDB { trace!("Recovered {} counters", counters.len()); counters } -} + + /// Returns heap memory size used + pub fn mem_used(&self) -> usize { + self.overlay.mem_used() + match &self.counters { &Some(ref c) => c.read().unwrap().heap_size_of_children(), &None => 0 } + } + } impl HashDB for JournalDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iterator(IteratorMode::Start) { + for (key, _) in self.backing.iter() { let h = H256::from_slice(key.deref()); ret.insert(h, 1); } @@ -508,4 +571,88 @@ mod tests { jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } + + + #[test] + fn reopen() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.emplace(bar.clone(), b"bar".to_vec()); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + } + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + + #[test] + fn reopen_remove() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.insert(b"foo"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + #[test] + fn reopen_fork() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let (foo, bar, baz) = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + (foo, bar, baz) + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&baz)); + assert!(!jdb.exists(&bar)); + } + } } diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 680a6e1d0..9cd018935 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -21,6 +21,7 @@ use bytes::*; use rlp::*; use sha3::*; use hashdb::*; +use heapsize::*; use std::mem; use std::collections::HashMap; @@ -143,6 +144,11 @@ impl MemoryDB { } self.raw(key).unwrap() } + + /// Returns the size of allocated heap memory + pub fn mem_used(&self) -> usize { + self.data.heap_size_of_children() + } } static NULL_RLP_STATIC: [u8; 1] = [0x80; 1];