Merge pull request #6393 from paritytech/earlymergedb_optimizations

earlydb optimizations
This commit is contained in:
Marek Kotewicz 2017-08-28 18:46:41 +02:00 committed by GitHub
commit 17fa7edbbf

View File

@ -160,33 +160,38 @@ impl EarlyMergeDB {
fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) { fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
for &(ref h, ref d) in inserts { for &(ref h, ref d) in inserts {
if let Some(c) = refs.get_mut(h) { match refs.entry(*h) {
// already counting. increment. Entry::Occupied(mut entry) => {
c.queue_refs += 1; let info = entry.get_mut();
if trace { // already counting. increment.
trace!(target: "jdb.fine", " insert({}): In queue: Incrementing refs to {}", h, c.queue_refs); info.queue_refs += 1;
} if trace {
continue; trace!(target: "jdb.fine", " insert({}): In queue: Incrementing refs to {}", h, info.queue_refs);
} }
},
// this is the first entry for this node in the journal. Entry::Vacant(entry) => {
if backing.get(col, h).expect("Low-level database error. Some issue with your hard disk?").is_some() { // this is the first entry for this node in the journal.
// already in the backing DB. start counting, and remember it was already in. let in_archive = backing.get(col, h).expect("Low-level database error. Some issue with your hard disk?").is_some();
Self::set_already_in(batch, col, h); if in_archive {
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: true}); // already in the backing DB. start counting, and remember it was already in.
if trace { Self::set_already_in(batch, col, h);
trace!(target: "jdb.fine", " insert({}): New to queue, in DB: Recording and inserting into queue", h); if trace {
} trace!(target: "jdb.fine", " insert({}): New to queue, in DB: Recording and inserting into queue", h);
continue; }
} } else {
// Gets removed when a key leaves the journal, so should never be set when we're placing a new key.
// Gets removed when a key leaves the journal, so should never be set when we're placing a new key. //Self::reset_already_in(&h);
//Self::reset_already_in(&h); assert!(!Self::is_already_in(backing, col, h));
assert!(!Self::is_already_in(backing, col, &h)); if trace {
batch.put(col, h, d); trace!(target: "jdb.fine", " insert({}): New to queue, not in DB: Inserting into queue and DB", h);
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: false}); }
if trace { batch.put(col, h, d);
trace!(target: "jdb.fine", " insert({}): New to queue, not in DB: Inserting into queue and DB", h); }
entry.insert(RefInfo {
queue_refs: 1,
in_archive: in_archive,
});
},
} }
} }
} }
@ -194,15 +199,20 @@ impl EarlyMergeDB {
fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) { fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs); trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs);
for h in inserts { for h in inserts {
if let Some(c) = refs.get_mut(h) { match refs.entry(*h) {
// already counting. increment. // already counting. increment.
c.queue_refs += 1; Entry::Occupied(mut entry) => {
continue; entry.get_mut().queue_refs += 1;
},
// this is the first entry for this node in the journal.
// it is initialised to 1 if it was already in.
Entry::Vacant(entry) => {
entry.insert(RefInfo {
queue_refs: 1,
in_archive: Self::is_already_in(backing, col, h),
});
},
} }
// this is the first entry for this node in the journal.
// it is initialised to 1 if it was already in.
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: Self::is_already_in(backing, col, h)});
} }
trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs); trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs);
} }
@ -214,50 +224,54 @@ impl EarlyMergeDB {
// (the latter option would then mean removing the RefInfo, since it would no longer be counted in the queue.) // (the latter option would then mean removing the RefInfo, since it would no longer be counted in the queue.)
// both are valid, but we switch between them depending on context. // both are valid, but we switch between them depending on context.
// All inserts in queue (i.e. those which may yet be reverted) have an entry in refs. // All inserts in queue (i.e. those which may yet be reverted) have an entry in refs.
for h in deletes.iter() { for h in deletes {
let mut n: Option<RefInfo> = None; match refs.entry(*h) {
if let Some(c) = refs.get_mut(h) { Entry::Occupied(mut entry) => {
if c.in_archive && from == RemoveFrom::Archive { if entry.get().in_archive && from == RemoveFrom::Archive {
c.in_archive = false; entry.get_mut().in_archive = false;
Self::reset_already_in(batch, col, h); Self::reset_already_in(batch, col, h);
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Reducing to queue only and recording", h); trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Reducing to queue only and recording", h);
}
continue;
} }
continue; if entry.get().queue_refs > 1 {
} else if c.queue_refs > 1 { entry.get_mut().queue_refs -= 1;
c.queue_refs -= 1; if trace {
if trace { trace!(target: "jdb.fine", " remove({}): In queue > 1 refs: Decrementing ref count to {}", h, entry.get().queue_refs);
trace!(target: "jdb.fine", " remove({}): In queue > 1 refs: Decrementing ref count to {}", h, c.queue_refs); }
continue;
} }
continue;
} else { let queue_refs = entry.get().queue_refs;
n = Some(c.clone()); let in_archive = entry.get().in_archive;
}
} match (queue_refs, in_archive) {
match n { (1, true) => {
Some(RefInfo{queue_refs: 1, in_archive: true}) => { entry.remove();
refs.remove(h); Self::reset_already_in(batch, col, h);
Self::reset_already_in(batch, col, h); if trace {
if trace { trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Removing from queue and leaving in archive", h);
trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Removing from queue and leaving in archive", h); }
},
(1, false) => {
entry.remove();
batch.delete(col, h);
if trace {
trace!(target: "jdb.fine", " remove({}): Not in archive, only 1 ref in queue: Removing from queue and DB", h);
}
},
_ => panic!("Invalid value in refs: {:?}", entry.get()),
} }
} },
Some(RefInfo{queue_refs: 1, in_archive: false}) => { Entry::Vacant(_entry) => {
refs.remove(h);
batch.delete(col, h);
if trace {
trace!(target: "jdb.fine", " remove({}): Not in archive, only 1 ref in queue: Removing from queue and DB", h);
}
}
None => {
// Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs. // Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs.
//assert!(!Self::is_already_in(db, &h)); //assert!(!Self::is_already_in(db, &h));
batch.delete(col, h); batch.delete(col, h);
if trace { if trace {
trace!(target: "jdb.fine", " remove({}): Not in queue - MUST BE IN ARCHIVE: Removing from DB", h); trace!(target: "jdb.fine", " remove({}): Not in queue - MUST BE IN ARCHIVE: Removing from DB", h);
} }
} },
_ => panic!("Invalid value in refs: {:?}", n),
} }
} }
} }