earlydb optimizations

This commit is contained in:
debris 2017-08-27 18:17:55 +02:00
parent 15c3233376
commit 4ac95b44e2

View File

@ -18,6 +18,7 @@
use std::fmt; use std::fmt;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc; use std::sync::Arc;
use parking_lot::RwLock; use parking_lot::RwLock;
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
@ -159,49 +160,59 @@ impl EarlyMergeDB {
fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) { fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
for &(ref h, ref d) in inserts { for &(ref h, ref d) in inserts {
if let Some(c) = refs.get_mut(h) { match refs.entry(*h) {
Entry::Occupied(mut entry) => {
let info = entry.get_mut();
// already counting. increment. // already counting. increment.
c.queue_refs += 1; info.queue_refs += 1;
if trace { if trace {
trace!(target: "jdb.fine", " insert({}): In queue: Incrementing refs to {}", h, c.queue_refs); trace!(target: "jdb.fine", " insert({}): In queue: Incrementing refs to {}", h, info.queue_refs);
} }
continue; },
} Entry::Vacant(entry) => {
// this is the first entry for this node in the journal. // this is the first entry for this node in the journal.
if backing.get(col, h).expect("Low-level database error. Some issue with your hard disk?").is_some() { let in_archive = backing.get(col, h).expect("Low-level database error. Some issue with your hard disk?").is_some();
if in_archive {
// already in the backing DB. start counting, and remember it was already in. // already in the backing DB. start counting, and remember it was already in.
Self::set_already_in(batch, col, h); Self::set_already_in(batch, col, h);
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: true});
if trace { if trace {
trace!(target: "jdb.fine", " insert({}): New to queue, in DB: Recording and inserting into queue", h); trace!(target: "jdb.fine", " insert({}): New to queue, in DB: Recording and inserting into queue", h);
} }
continue; } else {
}
// Gets removed when a key leaves the journal, so should never be set when we're placing a new key. // Gets removed when a key leaves the journal, so should never be set when we're placing a new key.
//Self::reset_already_in(&h); //Self::reset_already_in(&h);
assert!(!Self::is_already_in(backing, col, &h)); assert!(!Self::is_already_in(backing, col, h));
batch.put(col, h, d);
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: false});
if trace { if trace {
trace!(target: "jdb.fine", " insert({}): New to queue, not in DB: Inserting into queue and DB", h); trace!(target: "jdb.fine", " insert({}): New to queue, not in DB: Inserting into queue and DB", h);
} }
batch.put(col, h, d);
}
entry.insert(RefInfo {
queue_refs: 1,
in_archive: in_archive,
});
},
}
} }
} }
fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) { fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs); trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs);
for h in inserts { for h in inserts {
if let Some(c) = refs.get_mut(h) { match refs.entry(*h) {
// already counting. increment. // already counting. increment.
c.queue_refs += 1; Entry::Occupied(mut entry) => {
continue; entry.get_mut().queue_refs += 1;
} },
// this is the first entry for this node in the journal. // this is the first entry for this node in the journal.
// it is initialised to 1 if it was already in. // it is initialised to 1 if it was already in.
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: Self::is_already_in(backing, col, h)}); Entry::Vacant(entry) => {
entry.insert(RefInfo {
queue_refs: 1,
in_archive: Self::is_already_in(backing, col, h),
});
},
}
} }
trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs); trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs);
} }
@ -213,50 +224,54 @@ impl EarlyMergeDB {
// (the latter option would then mean removing the RefInfo, since it would no longer be counted in the queue.) // (the latter option would then mean removing the RefInfo, since it would no longer be counted in the queue.)
// both are valid, but we switch between them depending on context. // both are valid, but we switch between them depending on context.
// All inserts in queue (i.e. those which may yet be reverted) have an entry in refs. // All inserts in queue (i.e. those which may yet be reverted) have an entry in refs.
for h in deletes.iter() { for h in deletes {
let mut n: Option<RefInfo> = None; match refs.entry(*h) {
if let Some(c) = refs.get_mut(h) { Entry::Occupied(mut entry) => {
if c.in_archive && from == RemoveFrom::Archive { if entry.get().in_archive && from == RemoveFrom::Archive {
c.in_archive = false; entry.get_mut().in_archive = false;
Self::reset_already_in(batch, col, h); Self::reset_already_in(batch, col, h);
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Reducing to queue only and recording", h); trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Reducing to queue only and recording", h);
} }
continue; continue;
} else if c.queue_refs > 1 { }
c.queue_refs -= 1; if entry.get().queue_refs > 1 {
entry.get_mut().queue_refs -= 1;
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): In queue > 1 refs: Decrementing ref count to {}", h, c.queue_refs); trace!(target: "jdb.fine", " remove({}): In queue > 1 refs: Decrementing ref count to {}", h, entry.get().queue_refs);
} }
continue; continue;
} else {
n = Some(c.clone());
} }
}
match n { let queue_refs = entry.get().queue_refs;
Some(RefInfo{queue_refs: 1, in_archive: true}) => { let in_archive = entry.get().in_archive;
refs.remove(h);
match (queue_refs, in_archive) {
(1, true) => {
entry.remove();
Self::reset_already_in(batch, col, h); Self::reset_already_in(batch, col, h);
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Removing from queue and leaving in archive", h); trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Removing from queue and leaving in archive", h);
} }
} },
Some(RefInfo{queue_refs: 1, in_archive: false}) => { (1, false) => {
refs.remove(h); entry.remove();
batch.delete(col, h); batch.delete(col, h);
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): Not in archive, only 1 ref in queue: Removing from queue and DB", h); trace!(target: "jdb.fine", " remove({}): Not in archive, only 1 ref in queue: Removing from queue and DB", h);
} }
},
_ => panic!("Invalid value in refs: {:?}", entry.get()),
} }
None => { },
Entry::Vacant(_entry) => {
// Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs. // Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs.
//assert!(!Self::is_already_in(db, &h)); //assert!(!Self::is_already_in(db, &h));
batch.delete(col, h); batch.delete(col, h);
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): Not in queue - MUST BE IN ARCHIVE: Removing from DB", h); trace!(target: "jdb.fine", " remove({}): Not in queue - MUST BE IN ARCHIVE: Removing from DB", h);
} }
} },
_ => panic!("Invalid value in refs: {:?}", n),
} }
} }
} }