diff --git a/util/src/error.rs b/util/src/error.rs index bc57cfc55..1460c152c 100644 --- a/util/src/error.rs +++ b/util/src/error.rs @@ -28,14 +28,17 @@ use hash::H256; pub enum BaseDataError { /// An entry was removed more times than inserted. NegativelyReferencedHash(H256), + /// A committed value was inserted more than once. + AlreadyExists(H256), } impl fmt::Display for BaseDataError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { BaseDataError::NegativelyReferencedHash(hash) => - f.write_fmt(format_args!("Entry {} removed from database more times \ - than it was added.", hash)), + write!(f, "Entry {} removed from database more times than it was added.", hash), + BaseDataError::AlreadyExists(hash) => + write!(f, "Committed key already exists in database: {}", hash), } } } diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs index b8c5d1664..b6792bfc7 100644 --- a/util/src/journaldb/archivedb.rs +++ b/util/src/journaldb/archivedb.rs @@ -185,6 +185,38 @@ impl JournalDB for ArchiveDB { Ok((inserts + deletes) as u32) } + fn inject(&mut self, batch: &DBTransaction) -> Result { + let mut inserts = 0usize; + let mut deletes = 0usize; + + for i in self.overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc > 0 { + assert!(rc == 1); + if try!(self.backing.get(self.column, &key)).is_some() { + return Err(BaseDataError::AlreadyExists(key).into()); + } + try!(batch.put(self.column, &key, &value)); + inserts += 1; + } + if rc < 0 { + assert!(rc == -1); + if try!(self.backing.get(self.column, &key)).is_none() { + return Err(BaseDataError::NegativelyReferencedHash(key).into()); + } + try!(batch.delete(self.column, &key)); + deletes += 1; + } + } + + for (mut key, value) in self.overlay.drain_aux().into_iter() { + key.push(AUX_FLAG); + try!(batch.put(self.column, &key, &value)); + } + + Ok((inserts + deletes) as u32) + } + fn latest_era(&self) -> Option { self.latest_era } fn state(&self, id: &H256) -> Option { @@ -449,4 +481,19 @@ mod tests { assert!(state.is_some()); } } + + #[test] + fn inject() { + let temp = ::devtools::RandomTempPath::new(); + + let mut jdb = new_db(temp.as_path().as_path()); + let key = jdb.insert(b"dog"); + jdb.inject_batch().unwrap(); + + assert_eq!(jdb.get(&key).unwrap(), b"dog"); + jdb.remove(&key); + jdb.inject_batch().unwrap(); + + assert!(jdb.get(&key).is_none()); + } } diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index 908c5cadf..8a58a276b 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -513,6 +513,32 @@ impl JournalDB for EarlyMergeDB { Ok(0) } + + fn inject(&mut self, batch: &DBTransaction) -> Result { + let mut ops = 0; + for (key, (value, rc)) in self.overlay.drain() { + if rc != 0 { ops += 1 } + + match rc { + 0 => {} + 1 => { + if try!(self.backing.get(self.column, &key)).is_some() { + return Err(BaseDataError::AlreadyExists(key).into()); + } + try!(batch.put(self.column, &key, &value)) + } + -1 => { + if try!(self.backing.get(self.column, &key)).is_none() { + return Err(BaseDataError::NegativelyReferencedHash(key).into()); + } + try!(batch.delete(self.column, &key)) + } + _ => panic!("Attempted to inject invalid state."), + } + } + + Ok(ops) + } } #[cfg(test)] @@ -1045,4 +1071,19 @@ mod tests { assert!(!jdb.contains(&bar)); } } + + #[test] + fn inject() { + let temp = ::devtools::RandomTempPath::new(); + + let mut jdb = new_db(temp.as_path().as_path()); + let key = jdb.insert(b"dog"); + jdb.inject_batch().unwrap(); + + assert_eq!(jdb.get(&key).unwrap(), b"dog"); + jdb.remove(&key); + jdb.inject_batch().unwrap(); + + assert!(jdb.get(&key).is_none()); + } } diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 186fd4040..597bf2b02 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -46,7 +46,7 @@ use super::JournalDB; /// /// Commit workflow: /// 1. Create a new journal record from the transaction overlay. -/// 2. Inseart each node from the transaction overlay into the History overlay increasing reference +/// 2. Insert each node from the transaction overlay into the History overlay increasing reference /// count if it is already there. Note that the reference counting is managed by `MemoryDB` /// 3. Clear the transaction overlay. /// 4. For a canonical journal record that becomes ancient inserts its insertions into the disk DB @@ -155,7 +155,7 @@ impl OverlayRecentDB { for r in insertions.iter() { let k: H256 = r.val_at(0); let v: Bytes = r.val_at(1); - overlay.emplace(OverlayRecentDB::to_short_key(&k), v); + overlay.emplace(to_short_key(&k), v); inserted_keys.push(k); count += 1; } @@ -176,12 +176,13 @@ impl OverlayRecentDB { JournalOverlay { backing_overlay: overlay, journal: journal, latest_era: latest_era } } - #[inline] - fn to_short_key(key: &H256) -> H256 { - let mut k = H256::new(); - k[0..DB_PREFIX_LEN].copy_from_slice(&key[0..DB_PREFIX_LEN]); - k - } +} + +#[inline] +fn to_short_key(key: &H256) -> H256 { + let mut k = H256::new(); + k[0..DB_PREFIX_LEN].copy_from_slice(&key[0..DB_PREFIX_LEN]); + k } impl JournalDB for OverlayRecentDB { @@ -208,7 +209,7 @@ impl JournalDB for OverlayRecentDB { fn latest_era(&self) -> Option { self.journal_overlay.read().latest_era } fn state(&self, key: &H256) -> Option { - let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); + let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec()); v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) } @@ -229,7 +230,7 @@ impl JournalDB for OverlayRecentDB { r.begin_list(2); r.append(&k); r.append(&v); - journal_overlay.backing_overlay.emplace(OverlayRecentDB::to_short_key(&k), v); + journal_overlay.backing_overlay.emplace(to_short_key(&k), v); } r.append(&removed_keys); @@ -246,7 +247,7 @@ impl JournalDB for OverlayRecentDB { journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); } - let journal_overlay = journal_overlay.deref_mut(); + let journal_overlay = &mut *journal_overlay; // apply old commits' details if let Some((end_era, canon_id)) = end { if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { @@ -265,7 +266,7 @@ impl JournalDB for OverlayRecentDB { { if canon_id == journal.id { for h in &journal.insertions { - if let Some(&(ref d, rc)) = journal_overlay.backing_overlay.raw(&OverlayRecentDB::to_short_key(h)) { + if let Some(&(ref d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) { if rc > 0 { canon_insertions.push((h.clone(), d.clone())); //TODO: optimize this to avoid data copy } @@ -283,11 +284,11 @@ impl JournalDB for OverlayRecentDB { } // update the overlay for k in overlay_deletions { - journal_overlay.backing_overlay.remove_and_purge(&OverlayRecentDB::to_short_key(&k)); + journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); } // apply canon deletions for k in canon_deletions { - if !journal_overlay.backing_overlay.contains(&OverlayRecentDB::to_short_key(&k)) { + if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) { try!(batch.delete(self.column, &k)); } } @@ -297,6 +298,31 @@ impl JournalDB for OverlayRecentDB { Ok(0) } + fn inject(&mut self, batch: &DBTransaction) -> Result { + let mut ops = 0; + for (key, (value, rc)) in self.transaction_overlay.drain() { + if rc != 0 { ops += 1 } + + match rc { + 0 => {} + 1 => { + if try!(self.backing.get(self.column, &key)).is_some() { + return Err(BaseDataError::AlreadyExists(key).into()); + } + try!(batch.put(self.column, &key, &value)) + } + -1 => { + if try!(self.backing.get(self.column, &key)).is_none() { + return Err(BaseDataError::NegativelyReferencedHash(key).into()); + } + try!(batch.delete(self.column, &key)) + } + _ => panic!("Attempted to inject invalid state."), + } + } + + Ok(ops) + } } impl HashDB for OverlayRecentDB { @@ -319,7 +345,7 @@ impl HashDB for OverlayRecentDB { match k { Some(&(ref d, rc)) if rc > 0 => Some(d), _ => { - let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); + let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec()); match v { Some(x) => { Some(&self.transaction_overlay.denote(key, x).0) @@ -879,4 +905,19 @@ mod tests { assert!(jdb.contains(&foo)); assert!(jdb.contains(&bar)); } + + #[test] + fn inject() { + let temp = ::devtools::RandomTempPath::new(); + + let mut jdb = new_db(temp.as_path().as_path()); + let key = jdb.insert(b"dog"); + jdb.inject_batch().unwrap(); + + assert_eq!(jdb.get(&key).unwrap(), b"dog"); + jdb.remove(&key); + jdb.inject_batch().unwrap(); + + assert!(jdb.get(&key).is_none()); + } } diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index 2fcbd4851..a584d7b29 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -184,6 +184,14 @@ impl JournalDB for RefCountedDB { let r = try!(self.forward.commit_to_batch(&batch)); Ok(r) } + + fn inject(&mut self, batch: &DBTransaction) -> Result { + self.inserts.clear(); + for remove in self.removes.drain(..) { + self.forward.remove(&remove); + } + self.forward.commit_to_batch(&batch) + } } #[cfg(test)] @@ -298,4 +306,17 @@ mod tests { assert!(!jdb.contains(&baz)); assert!(!jdb.contains(&bar)); } + + #[test] + fn inject() { + let mut jdb = RefCountedDB::new_temp(); + let key = jdb.insert(b"dog"); + jdb.inject_batch().unwrap(); + + assert_eq!(jdb.get(&key).unwrap(), b"dog"); + jdb.remove(&key); + jdb.inject_batch().unwrap(); + + assert!(jdb.get(&key).is_none()); + } } diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index 8fd597349..8dc825457 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -39,6 +39,15 @@ pub trait JournalDB : HashDB + Send + Sync { /// old era to the backing database, reverting any non-canonical historical commit's inserts. fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result; + /// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions + /// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated. + /// + /// Any keys or values inserted or deleted must be completely independent of those affected + /// by any previous `commit` operations. Essentially, this means that `inject` can be used + /// either to restore a state to a fresh database, or to insert data which may only be journalled + /// from this point onwards. + fn inject(&mut self, batch: &DBTransaction) -> Result; + /// State data query fn state(&self, _id: &H256) -> Option; @@ -48,11 +57,19 @@ pub trait JournalDB : HashDB + Send + Sync { /// Get backing database. fn backing(&self) -> &Arc; - #[cfg(test)] /// Commit all changes in a single batch + #[cfg(test)] fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { let batch = self.backing().transaction(); let res = try!(self.commit(&batch, now, id, end)); self.backing().write(batch).map(|_| res).map_err(Into::into) } + + /// Inject all changes in a single batch. + #[cfg(test)] + fn inject_batch(&mut self) -> Result { + let batch = self.backing().transaction(); + let res = try!(self.inject(&batch)); + self.backing().write(batch).map(|_| res).map_err(Into::into) + } }