JournalDB inject (#1806)

* add inject to journaldb

* adjust docs

* add test; fix refcounteddb impl

* fewer panics, fail on invalid insertions or deletions
This commit is contained in:
Robert Habermeier 2016-08-03 16:34:32 +02:00 committed by Gav Wood
parent c5ffb5af79
commit 8c88e2a8cc
6 changed files with 188 additions and 18 deletions

View File

@ -28,14 +28,17 @@ use hash::H256;
pub enum BaseDataError { pub enum BaseDataError {
/// An entry was removed more times than inserted. /// An entry was removed more times than inserted.
NegativelyReferencedHash(H256), NegativelyReferencedHash(H256),
/// A committed value was inserted more than once.
AlreadyExists(H256),
} }
impl fmt::Display for BaseDataError { impl fmt::Display for BaseDataError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
BaseDataError::NegativelyReferencedHash(hash) => BaseDataError::NegativelyReferencedHash(hash) =>
f.write_fmt(format_args!("Entry {} removed from database more times \ write!(f, "Entry {} removed from database more times than it was added.", hash),
than it was added.", hash)), BaseDataError::AlreadyExists(hash) =>
write!(f, "Committed key already exists in database: {}", hash),
} }
} }
} }

View File

@ -185,6 +185,38 @@ impl JournalDB for ArchiveDB {
Ok((inserts + deletes) as u32) Ok((inserts + deletes) as u32)
} }
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;
for i in self.overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
if try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
try!(batch.put(self.column, &key, &value));
inserts += 1;
}
if rc < 0 {
assert!(rc == -1);
if try!(self.backing.get(self.column, &key)).is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
try!(batch.delete(self.column, &key));
deletes += 1;
}
}
for (mut key, value) in self.overlay.drain_aux().into_iter() {
key.push(AUX_FLAG);
try!(batch.put(self.column, &key, &value));
}
Ok((inserts + deletes) as u32)
}
fn latest_era(&self) -> Option<u64> { self.latest_era } fn latest_era(&self) -> Option<u64> { self.latest_era }
fn state(&self, id: &H256) -> Option<Bytes> { fn state(&self, id: &H256) -> Option<Bytes> {
@ -449,4 +481,19 @@ mod tests {
assert!(state.is_some()); assert!(state.is_some());
} }
} }
#[test]
fn inject() {
let temp = ::devtools::RandomTempPath::new();
let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();
assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();
assert!(jdb.get(&key).is_none());
}
} }

View File

@ -513,6 +513,32 @@ impl JournalDB for EarlyMergeDB {
Ok(0) Ok(0)
} }
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
let mut ops = 0;
for (key, (value, rc)) in self.overlay.drain() {
if rc != 0 { ops += 1 }
match rc {
0 => {}
1 => {
if try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
try!(batch.put(self.column, &key, &value))
}
-1 => {
if try!(self.backing.get(self.column, &key)).is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
try!(batch.delete(self.column, &key))
}
_ => panic!("Attempted to inject invalid state."),
}
}
Ok(ops)
}
} }
#[cfg(test)] #[cfg(test)]
@ -1045,4 +1071,19 @@ mod tests {
assert!(!jdb.contains(&bar)); assert!(!jdb.contains(&bar));
} }
} }
#[test]
fn inject() {
let temp = ::devtools::RandomTempPath::new();
let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();
assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();
assert!(jdb.get(&key).is_none());
}
} }

View File

@ -46,7 +46,7 @@ use super::JournalDB;
/// ///
/// Commit workflow: /// Commit workflow:
/// 1. Create a new journal record from the transaction overlay. /// 1. Create a new journal record from the transaction overlay.
/// 2. Inseart each node from the transaction overlay into the History overlay increasing reference /// 2. Insert each node from the transaction overlay into the History overlay increasing reference
/// count if it is already there. Note that the reference counting is managed by `MemoryDB` /// count if it is already there. Note that the reference counting is managed by `MemoryDB`
/// 3. Clear the transaction overlay. /// 3. Clear the transaction overlay.
/// 4. For a canonical journal record that becomes ancient inserts its insertions into the disk DB /// 4. For a canonical journal record that becomes ancient inserts its insertions into the disk DB
@ -155,7 +155,7 @@ impl OverlayRecentDB {
for r in insertions.iter() { for r in insertions.iter() {
let k: H256 = r.val_at(0); let k: H256 = r.val_at(0);
let v: Bytes = r.val_at(1); let v: Bytes = r.val_at(1);
overlay.emplace(OverlayRecentDB::to_short_key(&k), v); overlay.emplace(to_short_key(&k), v);
inserted_keys.push(k); inserted_keys.push(k);
count += 1; count += 1;
} }
@ -176,12 +176,13 @@ impl OverlayRecentDB {
JournalOverlay { backing_overlay: overlay, journal: journal, latest_era: latest_era } JournalOverlay { backing_overlay: overlay, journal: journal, latest_era: latest_era }
} }
#[inline] }
fn to_short_key(key: &H256) -> H256 {
let mut k = H256::new(); #[inline]
k[0..DB_PREFIX_LEN].copy_from_slice(&key[0..DB_PREFIX_LEN]); fn to_short_key(key: &H256) -> H256 {
k let mut k = H256::new();
} k[0..DB_PREFIX_LEN].copy_from_slice(&key[0..DB_PREFIX_LEN]);
k
} }
impl JournalDB for OverlayRecentDB { impl JournalDB for OverlayRecentDB {
@ -208,7 +209,7 @@ impl JournalDB for OverlayRecentDB {
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era } fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
fn state(&self, key: &H256) -> Option<Bytes> { fn state(&self, key: &H256) -> Option<Bytes> {
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec());
v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
} }
@ -229,7 +230,7 @@ impl JournalDB for OverlayRecentDB {
r.begin_list(2); r.begin_list(2);
r.append(&k); r.append(&k);
r.append(&v); r.append(&v);
journal_overlay.backing_overlay.emplace(OverlayRecentDB::to_short_key(&k), v); journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
} }
r.append(&removed_keys); r.append(&removed_keys);
@ -246,7 +247,7 @@ impl JournalDB for OverlayRecentDB {
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
} }
let journal_overlay = journal_overlay.deref_mut(); let journal_overlay = &mut *journal_overlay;
// apply old commits' details // apply old commits' details
if let Some((end_era, canon_id)) = end { if let Some((end_era, canon_id)) = end {
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) {
@ -265,7 +266,7 @@ impl JournalDB for OverlayRecentDB {
{ {
if canon_id == journal.id { if canon_id == journal.id {
for h in &journal.insertions { for h in &journal.insertions {
if let Some(&(ref d, rc)) = journal_overlay.backing_overlay.raw(&OverlayRecentDB::to_short_key(h)) { if let Some(&(ref d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if rc > 0 { if rc > 0 {
canon_insertions.push((h.clone(), d.clone())); //TODO: optimize this to avoid data copy canon_insertions.push((h.clone(), d.clone())); //TODO: optimize this to avoid data copy
} }
@ -283,11 +284,11 @@ impl JournalDB for OverlayRecentDB {
} }
// update the overlay // update the overlay
for k in overlay_deletions { for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&OverlayRecentDB::to_short_key(&k)); journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
} }
// apply canon deletions // apply canon deletions
for k in canon_deletions { for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&OverlayRecentDB::to_short_key(&k)) { if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
try!(batch.delete(self.column, &k)); try!(batch.delete(self.column, &k));
} }
} }
@ -297,6 +298,31 @@ impl JournalDB for OverlayRecentDB {
Ok(0) Ok(0)
} }
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
let mut ops = 0;
for (key, (value, rc)) in self.transaction_overlay.drain() {
if rc != 0 { ops += 1 }
match rc {
0 => {}
1 => {
if try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
try!(batch.put(self.column, &key, &value))
}
-1 => {
if try!(self.backing.get(self.column, &key)).is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
try!(batch.delete(self.column, &key))
}
_ => panic!("Attempted to inject invalid state."),
}
}
Ok(ops)
}
} }
impl HashDB for OverlayRecentDB { impl HashDB for OverlayRecentDB {
@ -319,7 +345,7 @@ impl HashDB for OverlayRecentDB {
match k { match k {
Some(&(ref d, rc)) if rc > 0 => Some(d), Some(&(ref d, rc)) if rc > 0 => Some(d),
_ => { _ => {
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec());
match v { match v {
Some(x) => { Some(x) => {
Some(&self.transaction_overlay.denote(key, x).0) Some(&self.transaction_overlay.denote(key, x).0)
@ -879,4 +905,19 @@ mod tests {
assert!(jdb.contains(&foo)); assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar)); assert!(jdb.contains(&bar));
} }
#[test]
fn inject() {
let temp = ::devtools::RandomTempPath::new();
let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();
assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();
assert!(jdb.get(&key).is_none());
}
} }

View File

@ -184,6 +184,14 @@ impl JournalDB for RefCountedDB {
let r = try!(self.forward.commit_to_batch(&batch)); let r = try!(self.forward.commit_to_batch(&batch));
Ok(r) Ok(r)
} }
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError> {
self.inserts.clear();
for remove in self.removes.drain(..) {
self.forward.remove(&remove);
}
self.forward.commit_to_batch(&batch)
}
} }
#[cfg(test)] #[cfg(test)]
@ -298,4 +306,17 @@ mod tests {
assert!(!jdb.contains(&baz)); assert!(!jdb.contains(&baz));
assert!(!jdb.contains(&bar)); assert!(!jdb.contains(&bar));
} }
#[test]
fn inject() {
let mut jdb = RefCountedDB::new_temp();
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();
assert_eq!(jdb.get(&key).unwrap(), b"dog");
jdb.remove(&key);
jdb.inject_batch().unwrap();
assert!(jdb.get(&key).is_none());
}
} }

View File

@ -39,6 +39,15 @@ pub trait JournalDB : HashDB + Send + Sync {
/// old era to the backing database, reverting any non-canonical historical commit's inserts. /// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>; fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
///
/// Any keys or values inserted or deleted must be completely independent of those affected
/// by any previous `commit` operations. Essentially, this means that `inject` can be used
/// either to restore a state to a fresh database, or to insert data which may only be journalled
/// from this point onwards.
fn inject(&mut self, batch: &DBTransaction) -> Result<u32, UtilError>;
/// State data query /// State data query
fn state(&self, _id: &H256) -> Option<Bytes>; fn state(&self, _id: &H256) -> Option<Bytes>;
@ -48,11 +57,19 @@ pub trait JournalDB : HashDB + Send + Sync {
/// Get backing database. /// Get backing database.
fn backing(&self) -> &Arc<Database>; fn backing(&self) -> &Arc<Database>;
#[cfg(test)]
/// Commit all changes in a single batch /// Commit all changes in a single batch
#[cfg(test)]
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> { fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let batch = self.backing().transaction(); let batch = self.backing().transaction();
let res = try!(self.commit(&batch, now, id, end)); let res = try!(self.commit(&batch, now, id, end));
self.backing().write(batch).map(|_| res).map_err(Into::into) self.backing().write(batch).map(|_| res).map_err(Into::into)
} }
/// Inject all changes in a single batch.
#[cfg(test)]
fn inject_batch(&mut self) -> Result<u32, UtilError> {
let batch = self.backing().transaction();
let res = try!(self.inject(&batch));
self.backing().write(batch).map(|_| res).map_err(Into::into)
}
} }