diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 858185873..2b5ec5ccb 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -190,6 +190,8 @@ pub struct ClientReport { pub transactions_applied: usize, /// How much gas has been processed so far. pub gas_processed: U256, + /// Memory used by state DB + pub state_db_mem: usize, } impl ClientReport { @@ -222,7 +224,7 @@ pub struct Client where V: Verifier { } const HISTORY: u64 = 1000; -const CLIENT_DB_VER_STR: &'static str = "4.0"; +const CLIENT_DB_VER_STR: &'static str = "5.0"; impl Client { /// Create a new client with given spec and DB path. @@ -432,7 +434,9 @@ impl Client where V: Verifier { /// Get the report. pub fn report(&self) -> ClientReport { - self.report.read().unwrap().clone() + let mut report = self.report.read().unwrap().clone(); + report.state_db_mem = self.state_db.lock().unwrap().mem_used(); + report } /// Tick the client. diff --git a/parity/main.rs b/parity/main.rs index 3f4243a0a..605fb315d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -395,7 +395,7 @@ impl Informant { let sync_info = sync.status(); if let (_, _, &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { - println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} chain, {} queue, {} sync ]", + println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]", chain_info.best_block_number, chain_info.best_block_hash, (report.blocks_imported - last_report.blocks_imported) / dur, @@ -408,6 +408,7 @@ impl Informant { queue_info.unverified_queue_size, queue_info.verified_queue_size, + Informant::format_bytes(report.state_db_mem), Informant::format_bytes(cache_info.total()), Informant::format_bytes(queue_info.mem_used), Informant::format_bytes(sync_info.mem_used), diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 01e53f819..48bd94d64 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -35,17 +35,36 @@ use std::env; /// immediately. Rather some age (based on a linear but arbitrary metric) must pass before /// the removals actually take effect. pub struct JournalDB { - overlay: MemoryDB, + transaction_overlay: MemoryDB, backing: Arc, - counters: Option>>>, + journal_overlay: Option>>, +} + +struct JournalOverlay { + backing_overlay: MemoryDB, + journal: VecDeque +} + +struct JournalEntry { + id: H256, + index: usize, + era: u64, + insertions: Vec, + deletions: Vec, +} + +impl HeapSizeOf for JournalEntry { + fn heap_size_of_children(&self) -> usize { + self.insertions.heap_size_of_children() + self.deletions.heap_size_of_children() + } } impl Clone for JournalDB { fn clone(&self) -> JournalDB { JournalDB { - overlay: MemoryDB::new(), + transaction_overlay: MemoryDB::new(), backing: self.backing.clone(), - counters: self.counters.clone(), + journal_overlay: self.journal_overlay.clone(), } } } @@ -60,7 +79,6 @@ const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; const PADDING : [u8; 10] = [ 0u8; 10 ]; impl JournalDB { - /// Create a new instance from file pub fn new(path: &str) -> JournalDB { Self::from_prefs(path, true) @@ -86,15 +104,16 @@ impl JournalDB { with_journal = prefer_journal; } - let counters = if with_journal { - Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) + + let journal_overlay = if with_journal { + Some(Arc::new(RwLock::new(JournalDB::read_overlay(&backing)))) } else { None }; JournalDB { - overlay: MemoryDB::new(), + transaction_overlay: MemoryDB::new(), backing: Arc::new(backing), - counters: counters, + journal_overlay: journal_overlay, } } @@ -113,71 +132,48 @@ impl JournalDB { /// Commit all recent insert operations. pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - let have_counters = self.counters.is_some(); - if have_counters { - self.commit_with_counters(now, id, end) + let have_journal_overlay = self.journal_overlay.is_some(); + if have_journal_overlay { + self.commit_with_overlay(now, id, end) } else { - self.commit_without_counters() + self.commit_without_overlay() } } /// Drain the overlay and place it into a batch for the DB. fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { - let mut inserts = 0usize; - let mut deletes = 0usize; + let mut insertions = 0usize; + let mut deletions = 0usize; for i in overlay.drain().into_iter() { let (key, (value, rc)) = i; if rc > 0 { assert!(rc == 1); batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); - inserts += 1; + insertions += 1; } if rc < 0 { assert!(rc == -1); - deletes += 1; + deletions += 1; } } - trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); - inserts + deletes + trace!("commit: Inserted {}, Deleted {} nodes", insertions, deletions); + insertions + deletions } - /// Just commit the overlay into the backing DB. - fn commit_without_counters(&mut self) -> Result { + /// Just commit the transaction overlay into the backing DB. + fn commit_without_overlay(&mut self) -> Result { let batch = DBTransaction::new(); - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); + let ret = Self::batch_overlay_insertions(&mut self.transaction_overlay, &batch); try!(self.backing.write(batch)); Ok(ret as u32) } /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - // journal format: - // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] - // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] - // [era, n] => [ ... ] - - // TODO: store reclaim_period. - - // when we make a new commit, we journal the inserts and removes. - // for each end_era that we journaled that we are no passing by, - // we remove all of its removes assuming it is canonical and all - // of its inserts otherwise. - // - // We also keep reference counters for each key inserted in the journal to handle - // the following cases where key K must not be deleted from the DB when processing removals : - // Given H is the journal size in eras, 0 <= C <= H. - // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). - // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). - // Key K is added in non-canonical era A'(N) canonical B(N + C). - // - // The counter is encreased each time a key is inserted in the journal in the commit. The list of insertions - // is saved with the era record. When the era becomes end_era and goes out of journal the counter is decreased - // and the key is safe to delete. - + fn commit_with_overlay(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // record new commit's details. trace!("commit: #{} ({}), end era: {:?}", now, id, end); - let mut counters = self.counters.as_ref().unwrap().write().unwrap(); + let mut journal_overlay = self.journal_overlay.as_mut().unwrap().write().unwrap(); let batch = DBTransaction::new(); { let mut index = 0usize; @@ -204,90 +200,83 @@ impl JournalDB { } let mut r = RlpStream::new_list(3); - let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); + let mut tx = self.transaction_overlay.drain(); + let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect(); + let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect(); // Increase counter for each inserted key no matter if the block is canonical or not. - for i in &inserts { - *counters.entry(i.clone()).or_insert(0) += 1; - } - let removes: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c < 0).map(|(key, _)| key.clone()).collect(); + let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None }); r.append(id); - r.append(&inserts); - r.append(&removes); + r.begin_list(inserted_keys.len()); + for (k, v) in insertions { + r.begin_list(2); + r.append(&k); + r.append(&v); + journal_overlay.backing_overlay.emplace(k, v); + } + r.append(&removed_keys); try!(batch.put(&last, r.as_raw())); try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); + journal_overlay.journal.push_back(JournalEntry { id: id.clone(), index: index, era: now, insertions: inserted_keys, deletions: removed_keys }); } // apply old commits' details + if let Some((end_era, canon_id)) = end { - let mut index = 0usize; - let mut last; - let mut to_remove: Vec = Vec::new(); - let mut canon_inserts: Vec = Vec::new(); - while let Some(rlp_data) = try!(self.backing.get({ + let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); + let mut canon_deletions: Vec = Vec::new(); + let mut overlay_deletions: Vec = Vec::new(); + while journal_overlay.journal.front().map_or(false, |e| e.era <= end_era) { + let mut journal = journal_overlay.journal.pop_front().unwrap(); + //delete the record from the db let mut r = RlpStream::new_list(3); - r.append(&end_era); - r.append(&index); + r.append(&journal.era); + r.append(&journal.index); r.append(&&PADDING[..]); - last = r.drain(); - &last - })) { - let rlp = Rlp::new(&rlp_data); - let mut inserts: Vec = rlp.val_at(1); - JournalDB::decrease_counters(&inserts, &mut counters); - // Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical - if canon_id == rlp.val_at(0) { - let mut canon_deletes: Vec = rlp.val_at(2); - trace!("Purging nodes deleted from canon: {:?}", canon_deletes); - to_remove.append(&mut canon_deletes); - canon_inserts = inserts; + try!(batch.delete(&r.drain())); + trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, journal.index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); + { + if canon_id == journal.id { + for h in &journal.insertions { + match journal_overlay.backing_overlay.raw(&h) { + Some(&(ref d, rc)) if rc > 0 => canon_insertions.push((h.clone(), d.clone())), //TODO: optimizie this to avoid data copy + _ => () + } + } + canon_deletions = journal.deletions; + } + overlay_deletions.append(&mut journal.insertions); } - else { - trace!("Purging nodes inserted in non-canon: {:?}", inserts); - to_remove.append(&mut inserts); + if canon_id == journal.id { } - trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): {} entries", end_era, index, rlp.val_at::(0), canon_id, to_remove.len()); - try!(batch.delete(&last)); - index += 1; } - - let canon_inserts = canon_inserts.drain(..).collect::>(); - // Purge removed keys if they are not referenced and not re-inserted in the canon commit - let mut deletes = 0; - trace!("Purging filtered nodes: {:?}", to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)).collect::>()); - for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { - try!(batch.delete(&h)); - deletes += 1; + // apply canon inserts first + for (k, v) in canon_insertions { + try!(batch.put(&k, &v)); } - trace!("Total nodes purged: {}", deletes); + // clean the overlay + for k in overlay_deletions { + journal_overlay.backing_overlay.kill(&k); + } + // apply removes + for k in canon_deletions { + if !journal_overlay.backing_overlay.exists(&k) { + try!(batch.delete(&k)); + } + } + journal_overlay.backing_overlay.purge(); } - - // Commit overlay insertions - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); try!(self.backing.write(batch)); - Ok(ret as u32) - } - - - // Decrease counters for given keys. Deletes obsolete counters - fn decrease_counters(keys: &[H256], counters: &mut HashMap) { - for i in keys.iter() { - let delete_counter = { - let cnt = counters.get_mut(i).expect("Missing key counter"); - *cnt -= 1; - *cnt == 0 - }; - if delete_counter { - counters.remove(i); - } - } + Ok(0 as u32) } fn payload(&self, key: &H256) -> Option { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &Database) -> HashMap { - let mut res = HashMap::new(); + fn read_overlay(db: &Database) -> JournalOverlay { + let mut journal = VecDeque::new(); + let mut overlay = MemoryDB::new(); + let mut count = 0; if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val); loop { @@ -300,10 +289,24 @@ impl JournalDB { &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); - let to_add: Vec = rlp.val_at(1); - for h in to_add { - *res.entry(h).or_insert(0) += 1; + let id: H256 = rlp.val_at(0); + let insertions = rlp.at(1); + let deletions: Vec = rlp.val_at(2); + let mut inserted_keys = Vec::new(); + for r in insertions.iter() { + let k: H256 = r.val_at(0); + let v: Bytes = r.val_at(1); + overlay.emplace(k.clone(), v); + inserted_keys.push(k); + count += 1; } + journal.push_front(JournalEntry { + id: id, + index: index, + era: era, + insertions: inserted_keys, + deletions: deletions, + }); index += 1; }; if index == 0 || era == 0 { @@ -312,8 +315,19 @@ impl JournalDB { era -= 1; } } - trace!("Recovered {} counters", res.len()); - res + trace!("Recovered {} overlay entries, {} journal entries", count, journal.len()); + JournalOverlay { backing_overlay: overlay, journal: journal } + } + + /// Returns heap memory size used + pub fn mem_used(&self) -> usize { + let mut mem = self.transaction_overlay.mem_used(); + if let Some(ref overlay) = self.journal_overlay.as_ref() { + let overlay = overlay.read().unwrap(); + mem += overlay.backing_overlay.mem_used(); + mem += overlay.journal.heap_size_of_children(); + } + mem } } @@ -325,7 +339,7 @@ impl HashDB for JournalDB { ret.insert(h, 1); } - for (key, refs) in self.overlay.keys().into_iter() { + for (key, refs) in self.transaction_overlay.keys().into_iter() { let refs = *ret.get(&key).unwrap_or(&0) + refs; ret.insert(key, refs); } @@ -333,15 +347,23 @@ impl HashDB for JournalDB { } fn lookup(&self, key: &H256) -> Option<&[u8]> { - let k = self.overlay.raw(key); + let k = self.transaction_overlay.raw(key); match k { Some(&(ref d, rc)) if rc > 0 => Some(d), _ => { - if let Some(x) = self.payload(key) { - Some(&self.overlay.denote(key, x).0) - } - else { - None + let v = self.journal_overlay.as_ref().map_or(None, |ref j| j.read().unwrap().backing_overlay.lookup(key).map(|v| v.to_vec())); + match v { + Some(x) => { + Some(&self.transaction_overlay.denote(key, x).0) + } + _ => { + if let Some(x) = self.payload(key) { + Some(&self.transaction_overlay.denote(key, x).0) + } + else { + None + } + } } } } @@ -352,13 +374,13 @@ impl HashDB for JournalDB { } fn insert(&mut self, value: &[u8]) -> H256 { - self.overlay.insert(value) + self.transaction_overlay.insert(value) } fn emplace(&mut self, key: H256, value: Bytes) { - self.overlay.emplace(key, value); + self.transaction_overlay.emplace(key, value); } fn kill(&mut self, key: &H256) { - self.overlay.kill(key); + self.transaction_overlay.kill(key); } } @@ -492,11 +514,13 @@ mod tests { fn reopen() { let mut dir = ::std::env::temp_dir(); dir.push(H32::random().hex()); + let bar = H256::random(); let foo = { let mut jdb = JournalDB::new(dir.to_str().unwrap()); // history is 1 let foo = jdb.insert(b"foo"); + jdb.emplace(bar.clone(), b"bar".to_vec()); jdb.commit(0, &b"0".sha3(), None).unwrap(); foo }; @@ -510,8 +534,62 @@ mod tests { { let mut jdb = JournalDB::new(dir.to_str().unwrap()); assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); assert!(!jdb.exists(&foo)); } } + + #[test] + fn reopen_remove() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.insert(b"foo"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + #[test] + fn reopen_fork() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let (foo, bar, baz) = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + (foo, bar, baz) + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&baz)); + assert!(!jdb.exists(&bar)); + } + } } diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 680a6e1d0..9cd018935 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -21,6 +21,7 @@ use bytes::*; use rlp::*; use sha3::*; use hashdb::*; +use heapsize::*; use std::mem; use std::collections::HashMap; @@ -143,6 +144,11 @@ impl MemoryDB { } self.raw(key).unwrap() } + + /// Returns the size of allocated heap memory + pub fn mem_used(&self) -> usize { + self.data.heap_size_of_children() + } } static NULL_RLP_STATIC: [u8; 1] = [0x80; 1];