Save nodes removed from backing_overlay until commit (#1917)
This commit is contained in:
parent
ca54b8e493
commit
0e7b06d3eb
@ -66,7 +66,8 @@ pub struct OverlayRecentDB {
|
|||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq)]
|
||||||
struct JournalOverlay {
|
struct JournalOverlay {
|
||||||
backing_overlay: MemoryDB,
|
backing_overlay: MemoryDB, // Nodes added in the history period
|
||||||
|
pending_overlay: H256FastMap<Bytes>, // Nodes being transfered from backing_overlay to backing db
|
||||||
journal: HashMap<u64, Vec<JournalEntry>>,
|
journal: HashMap<u64, Vec<JournalEntry>>,
|
||||||
latest_era: Option<u64>,
|
latest_era: Option<u64>,
|
||||||
}
|
}
|
||||||
@ -173,7 +174,11 @@ impl OverlayRecentDB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!("Recovered {} overlay entries, {} journal entries", count, journal.len());
|
trace!("Recovered {} overlay entries, {} journal entries", count, journal.len());
|
||||||
JournalOverlay { backing_overlay: overlay, journal: journal, latest_era: latest_era }
|
JournalOverlay {
|
||||||
|
backing_overlay: overlay,
|
||||||
|
pending_overlay: HashMap::default(),
|
||||||
|
journal: journal,
|
||||||
|
latest_era: latest_era }
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -194,6 +199,7 @@ impl JournalDB for OverlayRecentDB {
|
|||||||
let mut mem = self.transaction_overlay.mem_used();
|
let mut mem = self.transaction_overlay.mem_used();
|
||||||
let overlay = self.journal_overlay.read();
|
let overlay = self.journal_overlay.read();
|
||||||
mem += overlay.backing_overlay.mem_used();
|
mem += overlay.backing_overlay.mem_used();
|
||||||
|
mem += overlay.pending_overlay.heap_size_of_children();
|
||||||
mem += overlay.journal.heap_size_of_children();
|
mem += overlay.journal.heap_size_of_children();
|
||||||
mem
|
mem
|
||||||
}
|
}
|
||||||
@ -209,14 +215,19 @@ impl JournalDB for OverlayRecentDB {
|
|||||||
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
|
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
|
||||||
|
|
||||||
fn state(&self, key: &H256) -> Option<Bytes> {
|
fn state(&self, key: &H256) -> Option<Bytes> {
|
||||||
let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec());
|
let journal_overlay = self.journal_overlay.read();
|
||||||
v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
|
let key = to_short_key(key);
|
||||||
|
journal_overlay.backing_overlay.get(&key).map(|v| v.to_vec())
|
||||||
|
.or_else(|| journal_overlay.pending_overlay.get(&key).map(|v| v.clone()))
|
||||||
|
.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||||
// record new commit's details.
|
// record new commit's details.
|
||||||
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
|
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
|
||||||
let mut journal_overlay = self.journal_overlay.write();
|
let mut journal_overlay = self.journal_overlay.write();
|
||||||
|
// flush previous changes
|
||||||
|
journal_overlay.pending_overlay.clear();
|
||||||
{
|
{
|
||||||
let mut r = RlpStream::new_list(3);
|
let mut r = RlpStream::new_list(3);
|
||||||
let mut tx = self.transaction_overlay.drain();
|
let mut tx = self.transaction_overlay.drain();
|
||||||
@ -280,7 +291,8 @@ impl JournalDB for OverlayRecentDB {
|
|||||||
}
|
}
|
||||||
// apply canon inserts first
|
// apply canon inserts first
|
||||||
for (k, v) in canon_insertions {
|
for (k, v) in canon_insertions {
|
||||||
try!(batch.put_vec(self.column, &k, v));
|
try!(batch.put(self.column, &k, &v));
|
||||||
|
journal_overlay.pending_overlay.insert(to_short_key(&k), v);
|
||||||
}
|
}
|
||||||
// update the overlay
|
// update the overlay
|
||||||
for k in overlay_deletions {
|
for k in overlay_deletions {
|
||||||
@ -298,6 +310,10 @@ impl JournalDB for OverlayRecentDB {
|
|||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn flush(&self) {
|
||||||
|
self.journal_overlay.write().pending_overlay.clear();
|
||||||
|
}
|
||||||
|
|
||||||
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
|
||||||
let mut ops = 0;
|
let mut ops = 0;
|
||||||
for (key, (value, rc)) in self.transaction_overlay.drain() {
|
for (key, (value, rc)) in self.transaction_overlay.drain() {
|
||||||
@ -345,7 +361,12 @@ impl HashDB for OverlayRecentDB {
|
|||||||
match k {
|
match k {
|
||||||
Some((d, rc)) if rc > 0 => Some(d),
|
Some((d, rc)) if rc > 0 => Some(d),
|
||||||
_ => {
|
_ => {
|
||||||
let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec());
|
let v = {
|
||||||
|
let journal_overlay = self.journal_overlay.read();
|
||||||
|
let key = to_short_key(key);
|
||||||
|
journal_overlay.backing_overlay.get(&key).map(|v| v.to_vec())
|
||||||
|
.or_else(|| journal_overlay.pending_overlay.get(&key).map(|v| v.clone()))
|
||||||
|
};
|
||||||
match v {
|
match v {
|
||||||
Some(x) => {
|
Some(x) => {
|
||||||
Some(self.transaction_overlay.denote(key, x).0)
|
Some(self.transaction_overlay.denote(key, x).0)
|
||||||
|
@ -57,12 +57,18 @@ pub trait JournalDB: HashDB {
|
|||||||
/// Get backing database.
|
/// Get backing database.
|
||||||
fn backing(&self) -> &Arc<Database>;
|
fn backing(&self) -> &Arc<Database>;
|
||||||
|
|
||||||
|
/// Clear internal strucutres. This should called after changes have been written
|
||||||
|
/// to the backing strage
|
||||||
|
fn flush(&self) {}
|
||||||
|
|
||||||
/// Commit all changes in a single batch
|
/// Commit all changes in a single batch
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||||
let batch = self.backing().transaction();
|
let batch = self.backing().transaction();
|
||||||
let res = try!(self.commit(&batch, now, id, end));
|
let res = try!(self.commit(&batch, now, id, end));
|
||||||
self.backing().write(batch).map(|_| res).map_err(Into::into)
|
let result = self.backing().write(batch).map(|_| res).map_err(Into::into);
|
||||||
|
self.flush();
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inject all changes in a single batch.
|
/// Inject all changes in a single batch.
|
||||||
|
Loading…
Reference in New Issue
Block a user