Moves journaldb sources to a separate crate (#6693)

This commit is contained in:
Dmitry Kashitsyn
2017-10-16 21:12:54 +07:00
parent 98d0ef3fff
commit e2b96e1fe0
8 changed files with 33 additions and 2 deletions

View File

@@ -1,468 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Disk-backed `HashDB` implementation.
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use rlp::*;
use hashdb::*;
use super::super::memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{KeyValueDB, DBTransaction};
use bigint::hash::H256;
use error::{BaseDataError, UtilError};
use bytes::Bytes;
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
///
/// Like `OverlayDB`, there is a memory overlay; `commit()` must be called in order to
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. As this is an "archive" database, nothing is ever removed. This means
/// that the states of any block the node has ever processed will be accessible.
pub struct ArchiveDB {
overlay: MemoryDB,
backing: Arc<KeyValueDB>,
latest_era: Option<u64>,
column: Option<u32>,
}
impl ArchiveDB {
/// Create a new instance from a key-value db.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> ArchiveDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
ArchiveDB {
overlay: MemoryDB::new(),
backing: backing,
latest_era: latest_era,
column: col,
}
}
fn payload(&self, key: &H256) -> Option<DBValue> {
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?")
}
}
impl HashDB for ArchiveDB {
fn keys(&self) -> HashMap<H256, i32> {
let mut ret: HashMap<H256, i32> = self.backing.iter(self.column)
.map(|(key, _)| (H256::from_slice(&*key), 1))
.collect();
for (key, refs) in self.overlay.keys() {
match ret.entry(key) {
Entry::Occupied(mut entry) => {
*entry.get_mut() += refs;
},
Entry::Vacant(entry) => {
entry.insert(refs);
}
}
}
ret
}
fn get(&self, key: &H256) -> Option<DBValue> {
if let Some((d, rc)) = self.overlay.raw(key) {
if rc > 0 {
return Some(d);
}
}
self.payload(key)
}
fn contains(&self, key: &H256) -> bool {
self.get(key).is_some()
}
fn insert(&mut self, value: &[u8]) -> H256 {
self.overlay.insert(value)
}
fn emplace(&mut self, key: H256, value: DBValue) {
self.overlay.emplace(key, value);
}
fn remove(&mut self, key: &H256) {
self.overlay.remove(key);
}
}
impl JournalDB for ArchiveDB {
fn boxed_clone(&self) -> Box<JournalDB> {
Box::new(ArchiveDB {
overlay: self.overlay.clone(),
backing: self.backing.clone(),
latest_era: self.latest_era,
column: self.column.clone(),
})
}
fn mem_used(&self) -> usize {
self.overlay.mem_used()
}
fn is_empty(&self) -> bool {
self.latest_era.is_none()
}
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, _id: &H256) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;
for i in self.overlay.drain() {
let (key, (value, rc)) = i;
if rc > 0 {
batch.put(self.column, &key, &value);
inserts += 1;
}
if rc < 0 {
assert!(rc == -1);
deletes += 1;
}
}
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
Ok((inserts + deletes) as u32)
}
fn mark_canonical(&mut self, _batch: &mut DBTransaction, _end_era: u64, _canon_id: &H256) -> Result<u32, UtilError> {
// keep everything! it's an archive, after all.
Ok(0)
}
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;
for i in self.overlay.drain() {
let (key, (value, rc)) = i;
if rc > 0 {
if self.backing.get(self.column, &key)?.is_some() {
return Err(BaseDataError::AlreadyExists(key).into());
}
batch.put(self.column, &key, &value);
inserts += 1;
}
if rc < 0 {
assert!(rc == -1);
if self.backing.get(self.column, &key)?.is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into());
}
batch.delete(self.column, &key);
deletes += 1;
}
}
Ok((inserts + deletes) as u32)
}
fn latest_era(&self) -> Option<u64> { self.latest_era }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.into_vec())
}
fn is_pruned(&self) -> bool { false }
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}
fn consolidate(&mut self, with: MemoryDB) {
self.overlay.consolidate(with);
}
}
#[cfg(test)]
mod tests {
#![cfg_attr(feature="dev", allow(blacklisted_name))]
#![cfg_attr(feature="dev", allow(similar_names))]
use keccak::keccak;
use hashdb::{HashDB, DBValue};
use super::*;
use journaldb::traits::JournalDB;
use kvdb_memorydb;
#[test]
fn insert_same_in_fork() {
// history is 1
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let x = jdb.insert(b"X");
jdb.commit_batch(1, &keccak(b"1"), None).unwrap();
jdb.commit_batch(2, &keccak(b"2"), None).unwrap();
jdb.commit_batch(3, &keccak(b"1002a"), Some((1, keccak(b"1")))).unwrap();
jdb.commit_batch(4, &keccak(b"1003a"), Some((2, keccak(b"2")))).unwrap();
jdb.remove(&x);
jdb.commit_batch(3, &keccak(b"1002b"), Some((1, keccak(b"1")))).unwrap();
let x = jdb.insert(b"X");
jdb.commit_batch(4, &keccak(b"1003b"), Some((2, keccak(b"2")))).unwrap();
jdb.commit_batch(5, &keccak(b"1004a"), Some((3, keccak(b"1002a")))).unwrap();
jdb.commit_batch(6, &keccak(b"1005a"), Some((4, keccak(b"1003a")))).unwrap();
assert!(jdb.contains(&x));
}
#[test]
fn long_history() {
// history is 3
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let h = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.commit_batch(1, &keccak(b"1"), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit_batch(2, &keccak(b"2"), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit_batch(3, &keccak(b"3"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&h));
jdb.commit_batch(4, &keccak(b"4"), Some((1, keccak(b"1")))).unwrap();
assert!(jdb.contains(&h));
}
#[test]
#[should_panic]
fn multiple_owed_removal_not_allowed() {
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let h = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.remove(&h);
// commit_batch would call journal_under(),
// and we don't allow multiple owned removals.
jdb.commit_batch(1, &keccak(b"1"), None).unwrap();
}
#[test]
fn complex() {
// history is 1
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit_batch(1, &keccak(b"1"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit_batch(2, &keccak(b"2"), Some((1, keccak(b"1")))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&baz));
jdb.remove(&foo);
jdb.commit_batch(3, &keccak(b"3"), Some((2, keccak(b"2")))).unwrap();
assert!(jdb.contains(&foo));
jdb.commit_batch(4, &keccak(b"4"), Some((3, keccak(b"3")))).unwrap();
}
#[test]
fn fork() {
// history is 1
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit_batch(1, &keccak(b"1a"), Some((0, keccak(b"0")))).unwrap();
jdb.remove(&bar);
jdb.commit_batch(1, &keccak(b"1b"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.commit_batch(2, &keccak(b"2b"), Some((1, keccak(b"1b")))).unwrap();
assert!(jdb.contains(&foo));
}
#[test]
fn overwrite() {
// history is 1
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let foo = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit_batch(1, &keccak(b"1"), Some((0, keccak(b"0")))).unwrap();
jdb.insert(b"foo");
assert!(jdb.contains(&foo));
jdb.commit_batch(2, &keccak(b"2"), Some((1, keccak(b"1")))).unwrap();
assert!(jdb.contains(&foo));
jdb.commit_batch(3, &keccak(b"2"), Some((0, keccak(b"2")))).unwrap();
assert!(jdb.contains(&foo));
}
#[test]
fn fork_same_key() {
// history is 1
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
let foo = jdb.insert(b"foo");
jdb.commit_batch(1, &keccak(b"1a"), Some((0, keccak(b"0")))).unwrap();
jdb.insert(b"foo");
jdb.commit_batch(1, &keccak(b"1b"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&foo));
jdb.commit_batch(2, &keccak(b"2a"), Some((1, keccak(b"1a")))).unwrap();
assert!(jdb.contains(&foo));
}
#[test]
fn reopen() {
let shared_db = Arc::new(kvdb_memorydb::create(0));
let bar = H256::random();
let foo = {
let mut jdb = ArchiveDB::new(shared_db.clone(), None);
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), DBValue::from_slice(b"bar"));
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
foo
};
{
let mut jdb = ArchiveDB::new(shared_db.clone(), None);
jdb.remove(&foo);
jdb.commit_batch(1, &keccak(b"1"), Some((0, keccak(b"0")))).unwrap();
}
{
let mut jdb = ArchiveDB::new(shared_db, None);
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit_batch(2, &keccak(b"2"), Some((1, keccak(b"1")))).unwrap();
}
}
#[test]
fn reopen_remove() {
let shared_db = Arc::new(kvdb_memorydb::create(0));
let foo = {
let mut jdb = ArchiveDB::new(shared_db.clone(), None);
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
jdb.commit_batch(1, &keccak(b"1"), Some((0, keccak(b"0")))).unwrap();
// foo is ancient history.
jdb.insert(b"foo");
jdb.commit_batch(2, &keccak(b"2"), Some((1, keccak(b"1")))).unwrap();
foo
};
{
let mut jdb = ArchiveDB::new(shared_db, None);
jdb.remove(&foo);
jdb.commit_batch(3, &keccak(b"3"), Some((2, keccak(b"2")))).unwrap();
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit_batch(4, &keccak(b"4"), Some((3, keccak(b"3")))).unwrap();
jdb.commit_batch(5, &keccak(b"5"), Some((4, keccak(b"4")))).unwrap();
}
}
#[test]
fn reopen_fork() {
let shared_db = Arc::new(kvdb_memorydb::create(0));
let (foo, _, _) = {
let mut jdb = ArchiveDB::new(shared_db.clone(), None);
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit_batch(1, &keccak(b"1a"), Some((0, keccak(b"0")))).unwrap();
jdb.remove(&bar);
jdb.commit_batch(1, &keccak(b"1b"), Some((0, keccak(b"0")))).unwrap();
(foo, bar, baz)
};
{
let mut jdb = ArchiveDB::new(shared_db, None);
jdb.commit_batch(2, &keccak(b"2b"), Some((1, keccak(b"1b")))).unwrap();
assert!(jdb.contains(&foo));
}
}
#[test]
fn returns_state() {
let shared_db = Arc::new(kvdb_memorydb::create(0));
let key = {
let mut jdb = ArchiveDB::new(shared_db.clone(), None);
let key = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
key
};
{
let jdb = ArchiveDB::new(shared_db, None);
let state = jdb.state(&key);
assert!(state.is_some());
}
}
#[test]
fn inject() {
let mut jdb = ArchiveDB::new(Arc::new(kvdb_memorydb::create(0)), None);
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();
assert_eq!(jdb.get(&key).unwrap(), DBValue::from_slice(b"dog"));
jdb.remove(&key);
jdb.inject_batch().unwrap();
assert!(jdb.get(&key).is_none());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,187 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! `JournalDB` interface and implementation.
use std::{fmt, str};
use std::sync::Arc;
/// Export the journaldb module.
mod traits;
mod archivedb;
mod earlymergedb;
mod overlayrecentdb;
mod refcounteddb;
/// Export the `JournalDB` trait.
pub use self::traits::JournalDB;
/// A journal database algorithm.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Algorithm {
/// Keep all keys forever.
Archive,
/// Ancient and recent history maintained separately; recent history lasts for particular
/// number of blocks.
///
/// Inserts go into backing database, journal retains knowledge of whether backing DB key is
/// ancient or recent. Non-canon inserts get explicitly reverted and removed from backing DB.
EarlyMerge,
/// Ancient and recent history maintained separately; recent history lasts for particular
/// number of blocks.
///
/// Inserts go into memory overlay, which is tried for key fetches. Memory overlay gets
/// flushed in backing only at end of recent history.
OverlayRecent,
/// Ancient and recent history maintained separately; recent history lasts for particular
/// number of blocks.
///
/// References are counted in disk-backed DB.
RefCounted,
}
impl Default for Algorithm {
fn default() -> Algorithm { Algorithm::OverlayRecent }
}
impl str::FromStr for Algorithm {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"archive" => Ok(Algorithm::Archive),
"light" => Ok(Algorithm::EarlyMerge),
"fast" => Ok(Algorithm::OverlayRecent),
"basic" => Ok(Algorithm::RefCounted),
e => Err(format!("Invalid algorithm: {}", e)),
}
}
}
impl Algorithm {
/// Returns static str describing journal database algorithm.
pub fn as_str(&self) -> &'static str {
match *self {
Algorithm::Archive => "archive",
Algorithm::EarlyMerge => "light",
Algorithm::OverlayRecent => "fast",
Algorithm::RefCounted => "basic",
}
}
/// Returns static str describing journal database algorithm.
pub fn as_internal_name_str(&self) -> &'static str {
match *self {
Algorithm::Archive => "archive",
Algorithm::EarlyMerge => "earlymerge",
Algorithm::OverlayRecent => "overlayrecent",
Algorithm::RefCounted => "refcounted",
}
}
/// Returns true if pruning strategy is stable
pub fn is_stable(&self) -> bool {
match *self {
Algorithm::Archive | Algorithm::OverlayRecent => true,
_ => false,
}
}
/// Returns all algorithm types.
pub fn all_types() -> Vec<Algorithm> {
vec![Algorithm::Archive, Algorithm::EarlyMerge, Algorithm::OverlayRecent, Algorithm::RefCounted]
}
}
impl fmt::Display for Algorithm {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Create a new `JournalDB` trait object over a generic key-value database.
pub fn new(backing: Arc<::kvdb::KeyValueDB>, algorithm: Algorithm, col: Option<u32>) -> Box<JournalDB> {
match algorithm {
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(backing, col)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(backing, col)),
Algorithm::OverlayRecent => Box::new(overlayrecentdb::OverlayRecentDB::new(backing, col)),
Algorithm::RefCounted => Box::new(refcounteddb::RefCountedDB::new(backing, col)),
}
}
// all keys must be at least 12 bytes
const DB_PREFIX_LEN : usize = ::kvdb::PREFIX_LEN;
const LATEST_ERA_KEY : [u8; ::kvdb::PREFIX_LEN] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
#[cfg(test)]
mod tests {
use super::Algorithm;
#[test]
fn test_journal_algorithm_parsing() {
assert_eq!(Algorithm::Archive, "archive".parse().unwrap());
assert_eq!(Algorithm::EarlyMerge, "light".parse().unwrap());
assert_eq!(Algorithm::OverlayRecent, "fast".parse().unwrap());
assert_eq!(Algorithm::RefCounted, "basic".parse().unwrap());
}
#[test]
fn test_journal_algorithm_printing() {
assert_eq!(Algorithm::Archive.to_string(), "archive".to_owned());
assert_eq!(Algorithm::EarlyMerge.to_string(), "light".to_owned());
assert_eq!(Algorithm::OverlayRecent.to_string(), "fast".to_owned());
assert_eq!(Algorithm::RefCounted.to_string(), "basic".to_owned());
}
#[test]
fn test_journal_algorithm_is_stable() {
assert!(Algorithm::Archive.is_stable());
assert!(Algorithm::OverlayRecent.is_stable());
assert!(!Algorithm::EarlyMerge.is_stable());
assert!(!Algorithm::RefCounted.is_stable());
}
#[test]
fn test_journal_algorithm_default() {
assert_eq!(Algorithm::default(), Algorithm::OverlayRecent);
}
#[test]
fn test_journal_algorithm_all_types() {
// compiling should fail if some cases are not covered
let mut archive = 0;
let mut earlymerge = 0;
let mut overlayrecent = 0;
let mut refcounted = 0;
for a in &Algorithm::all_types() {
match *a {
Algorithm::Archive => archive += 1,
Algorithm::EarlyMerge => earlymerge += 1,
Algorithm::OverlayRecent => overlayrecent += 1,
Algorithm::RefCounted => refcounted += 1,
}
}
assert_eq!(archive, 1);
assert_eq!(earlymerge, 1);
assert_eq!(overlayrecent, 1);
assert_eq!(refcounted, 1);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,337 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Disk-backed, ref-counted `JournalDB` implementation.
use std::collections::HashMap;
use std::sync::Arc;
use heapsize::HeapSizeOf;
use rlp::*;
use hashdb::*;
use overlaydb::OverlayDB;
use memorydb::MemoryDB;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{KeyValueDB, DBTransaction};
use bigint::hash::H256;
use error::UtilError;
use bytes::Bytes;
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
///
/// Like `OverlayDB`, there is a memory overlay; `commit()` must be called in order to
/// write operations out to disk. Unlike `OverlayDB`, `remove()` operations do not take effect
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect.
///
/// journal format:
/// ```
/// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
/// [era, n] => [ ... ]
/// ```
///
/// when we make a new commit, we journal the inserts and removes.
/// for each `end_era` that we journaled that we are no passing by,
/// we remove all of its removes assuming it is canonical and all
/// of its inserts otherwise.
// TODO: store last_era, reclaim_period.
pub struct RefCountedDB {
forward: OverlayDB,
backing: Arc<KeyValueDB>,
latest_era: Option<u64>,
inserts: Vec<H256>,
removes: Vec<H256>,
column: Option<u32>,
}
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB {
/// Create a new instance given a `backing` database.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> RefCountedDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
RefCountedDB {
forward: OverlayDB::new(backing.clone(), col),
backing: backing,
inserts: vec![],
removes: vec![],
latest_era: latest_era,
column: col,
}
}
}
impl HashDB for RefCountedDB {
fn keys(&self) -> HashMap<H256, i32> { self.forward.keys() }
fn get(&self, key: &H256) -> Option<DBValue> { self.forward.get(key) }
fn contains(&self, key: &H256) -> bool { self.forward.contains(key) }
fn insert(&mut self, value: &[u8]) -> H256 { let r = self.forward.insert(value); self.inserts.push(r.clone()); r }
fn emplace(&mut self, key: H256, value: DBValue) { self.inserts.push(key.clone()); self.forward.emplace(key, value); }
fn remove(&mut self, key: &H256) { self.removes.push(key.clone()); }
}
impl JournalDB for RefCountedDB {
fn boxed_clone(&self) -> Box<JournalDB> {
Box::new(RefCountedDB {
forward: self.forward.clone(),
backing: self.backing.clone(),
latest_era: self.latest_era,
inserts: self.inserts.clone(),
removes: self.removes.clone(),
column: self.column.clone(),
})
}
fn mem_used(&self) -> usize {
self.inserts.heap_size_of_children() + self.removes.heap_size_of_children()
}
fn is_empty(&self) -> bool {
self.latest_era.is_none()
}
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}
fn latest_era(&self) -> Option<u64> { self.latest_era }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.into_vec())
}
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
// record new commit's details.
let mut index = 0usize;
let mut last;
while self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})?.is_some() {
index += 1;
}
let mut r = RlpStream::new_list(3);
r.append(id);
r.append_list(&self.inserts);
r.append_list(&self.removes);
batch.put(self.column, &last, r.as_raw());
let ops = self.inserts.len() + self.removes.len();
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
self.inserts.clear();
self.removes.clear();
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
}
Ok(ops as u32)
}
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
// apply old commits' details
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = {
self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})?
} {
let rlp = Rlp::new(&rlp_data);
let our_id: H256 = rlp.val_at(0);
let to_remove: Vec<H256> = rlp.list_at(if *canon_id == our_id {2} else {1});
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
for i in &to_remove {
self.forward.remove(i);
}
batch.delete(self.column, &last);
index += 1;
}
let r = self.forward.commit_to_batch(batch)?;
Ok(r)
}
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError> {
self.inserts.clear();
for remove in self.removes.drain(..) {
self.forward.remove(&remove);
}
self.forward.commit_to_batch(batch)
}
fn consolidate(&mut self, mut with: MemoryDB) {
for (key, (value, rc)) in with.drain() {
for _ in 0..rc {
self.emplace(key, value.clone());
}
for _ in rc..0 {
self.remove(&key);
}
}
}
}
#[cfg(test)]
mod tests {
#![cfg_attr(feature="dev", allow(blacklisted_name))]
#![cfg_attr(feature="dev", allow(similar_names))]
use keccak::keccak;
use hashdb::{HashDB, DBValue};
use kvdb_memorydb;
use super::*;
use super::super::traits::JournalDB;
fn new_db() -> RefCountedDB {
let backing = Arc::new(kvdb_memorydb::create(0));
RefCountedDB::new(backing, None)
}
#[test]
fn long_history() {
// history is 3
let mut jdb = new_db();
let h = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.commit_batch(1, &keccak(b"1"), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit_batch(2, &keccak(b"2"), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit_batch(3, &keccak(b"3"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&h));
jdb.commit_batch(4, &keccak(b"4"), Some((1, keccak(b"1")))).unwrap();
assert!(!jdb.contains(&h));
}
#[test]
fn latest_era_should_work() {
// history is 3
let mut jdb = new_db();
assert_eq!(jdb.latest_era(), None);
let h = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert_eq!(jdb.latest_era(), Some(0));
jdb.remove(&h);
jdb.commit_batch(1, &keccak(b"1"), None).unwrap();
assert_eq!(jdb.latest_era(), Some(1));
jdb.commit_batch(2, &keccak(b"2"), None).unwrap();
assert_eq!(jdb.latest_era(), Some(2));
jdb.commit_batch(3, &keccak(b"3"), Some((0, keccak(b"0")))).unwrap();
assert_eq!(jdb.latest_era(), Some(3));
jdb.commit_batch(4, &keccak(b"4"), Some((1, keccak(b"1")))).unwrap();
assert_eq!(jdb.latest_era(), Some(4));
}
#[test]
fn complex() {
// history is 1
let mut jdb = new_db();
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit_batch(1, &keccak(b"1"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit_batch(2, &keccak(b"2"), Some((1, keccak(b"1")))).unwrap();
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.remove(&foo);
jdb.commit_batch(3, &keccak(b"3"), Some((2, keccak(b"2")))).unwrap();
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(!jdb.contains(&baz));
jdb.commit_batch(4, &keccak(b"4"), Some((3, keccak(b"3")))).unwrap();
assert!(!jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(!jdb.contains(&baz));
}
#[test]
fn fork() {
// history is 1
let mut jdb = new_db();
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit_batch(1, &keccak(b"1a"), Some((0, keccak(b"0")))).unwrap();
jdb.remove(&bar);
jdb.commit_batch(1, &keccak(b"1b"), Some((0, keccak(b"0")))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.commit_batch(2, &keccak(b"2b"), Some((1, keccak(b"1b")))).unwrap();
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&baz));
assert!(!jdb.contains(&bar));
}
#[test]
fn inject() {
let mut jdb = new_db();
let key = jdb.insert(b"dog");
jdb.inject_batch().unwrap();
assert_eq!(jdb.get(&key).unwrap(), DBValue::from_slice(b"dog"));
jdb.remove(&key);
jdb.inject_batch().unwrap();
assert!(jdb.get(&key).is_none());
}
}

View File

@@ -1,103 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Disk-backed `HashDB` implementation.
use std::sync::Arc;
use hashdb::*;
use kvdb::{self, DBTransaction};
use bigint::hash::H256;
use error::UtilError;
use bytes::Bytes;
/// A `HashDB` which can manage a short-term journal potentially containing many forks of mutually
/// exclusive actions.
pub trait JournalDB: HashDB {
/// Return a copy of ourself, in a box.
fn boxed_clone(&self) -> Box<JournalDB>;
/// Returns heap memory size used
fn mem_used(&self) -> usize;
/// Returns the size of journalled state in memory.
/// This function has a considerable speed requirement --
/// it must be fast enough to call several times per block imported.
fn journal_size(&self) -> usize { 0 }
/// Check if this database has any commits
fn is_empty(&self) -> bool;
/// Get the earliest era in the DB. None if there isn't yet any data in there.
fn earliest_era(&self) -> Option<u64> { None }
/// Get the latest era in the DB. None if there isn't yet any data in there.
fn latest_era(&self) -> Option<u64>;
/// Journal recent database operations as being associated with a given era and id.
// TODO: give the overlay to this function so journaldbs don't manage the overlays themeselves.
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError>;
/// Mark a given block as canonical, indicating that competing blocks' states may be pruned out.
fn mark_canonical(&mut self, batch: &mut DBTransaction, era: u64, id: &H256) -> Result<u32, UtilError>;
/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
///
/// Any keys or values inserted or deleted must be completely independent of those affected
/// by any previous `commit` operations. Essentially, this means that `inject` can be used
/// either to restore a state to a fresh database, or to insert data which may only be journalled
/// from this point onwards.
fn inject(&mut self, batch: &mut DBTransaction) -> Result<u32, UtilError>;
/// State data query
fn state(&self, _id: &H256) -> Option<Bytes>;
/// Whether this database is pruned.
fn is_pruned(&self) -> bool { true }
/// Get backing database.
fn backing(&self) -> &Arc<kvdb::KeyValueDB>;
/// Clear internal strucutres. This should called after changes have been written
/// to the backing strage
fn flush(&self) {}
/// Consolidate all the insertions and deletions in the given memory overlay.
fn consolidate(&mut self, overlay: ::memorydb::MemoryDB);
/// Commit all changes in a single batch
#[cfg(test)]
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let mut batch = self.backing().transaction();
let mut ops = self.journal_under(&mut batch, now, id)?;
if let Some((end_era, canon_id)) = end {
ops += self.mark_canonical(&mut batch, end_era, &canon_id)?;
}
let result = self.backing().write(batch).map(|_| ops).map_err(Into::into);
self.flush();
result
}
/// Inject all changes in a single batch.
#[cfg(test)]
fn inject_batch(&mut self) -> Result<u32, UtilError> {
let mut batch = self.backing().transaction();
let res = self.inject(&mut batch)?;
self.backing().write(batch).map(|_| res).map_err(Into::into)
}
}

View File

@@ -1,307 +0,0 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Disk-backed `HashDB` implementation.
use std::sync::Arc;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use error::{Result, BaseDataError};
use bigint::hash::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay.
///
/// The operations `insert()` and `remove()` take place on the memory overlay; batches of
/// such operations may be flushed to the disk-backed DB with `commit()` or discarded with
/// `revert()`.
///
/// `lookup()` and `contains()` maintain normal behaviour - all `insert()` and `remove()`
/// queries have an immediate effect in terms of these functions.
#[derive(Clone)]
pub struct OverlayDB {
overlay: MemoryDB,
backing: Arc<KeyValueDB>,
column: Option<u32>,
}
impl OverlayDB {
/// Create a new instance of OverlayDB given a `backing` database.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayDB {
OverlayDB{ overlay: MemoryDB::new(), backing: backing, column: col }
}
/// Create a new instance of OverlayDB with an anonymous temporary database.
#[cfg(test)]
pub fn new_temp() -> OverlayDB {
let backing = Arc::new(::kvdb_memorydb::create(0));
Self::new(backing, None)
}
/// Commit all operations in a single batch.
#[cfg(test)]
pub fn commit(&mut self) -> Result<u32> {
let mut batch = self.backing.transaction();
let res = self.commit_to_batch(&mut batch)?;
self.backing.write(batch).map(|_| res).map_err(|e| e.into())
}
/// Commit all operations to given batch.
pub fn commit_to_batch(&mut self, batch: &mut DBTransaction) -> Result<u32> {
let mut ret = 0u32;
let mut deletes = 0usize;
for i in self.overlay.drain() {
let (key, (value, rc)) = i;
if rc != 0 {
match self.payload(&key) {
Some(x) => {
let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
deletes += if self.put_payload_in_batch(batch, &key, (back_value, total_rc as u32)) {1} else {0};
}
None => {
if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
self.put_payload_in_batch(batch, &key, (value, rc as u32));
}
};
ret += 1;
}
}
trace!("OverlayDB::commit() deleted {} nodes", deletes);
Ok(ret)
}
/// Revert all operations on this object (i.e. `insert()`s and `remove()`s) since the
/// last `commit()`.
pub fn revert(&mut self) { self.overlay.clear(); }
/// Get the number of references that would be committed.
pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(key).map_or(0, |(_, refs)| refs) }
/// Get the refs and value of the given key.
fn payload(&self, key: &H256) -> Option<(DBValue, u32)> {
self.backing.get(self.column, key)
.expect("Low-level database error. Some issue with your hard disk?")
.map(|d| {
let r = Rlp::new(&d);
(DBValue::from_slice(r.at(1).data()), r.at(0).as_val())
})
}
/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload_in_batch(&self, batch: &mut DBTransaction, key: &H256, payload: (DBValue, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
s.append(&&*payload.0);
batch.put(self.column, key, s.as_raw());
false
} else {
batch.delete(self.column, key);
true
}
}
}
impl HashDB for OverlayDB {
fn keys(&self) -> HashMap<H256, i32> {
let mut ret: HashMap<H256, i32> = self.backing.iter(self.column)
.map(|(key, _)| {
let h = H256::from_slice(&*key);
let r = self.payload(&h).unwrap().1;
(h, r as i32)
})
.collect();
for (key, refs) in self.overlay.keys() {
match ret.entry(key) {
Entry::Occupied(mut entry) => {
*entry.get_mut() += refs;
},
Entry::Vacant(entry) => {
entry.insert(refs);
}
}
}
ret
}
fn get(&self, key: &H256) -> Option<DBValue> {
// return ok if positive; if negative, check backing - might be enough references there to make
// it positive again.
let k = self.overlay.raw(key);
let memrc = {
if let Some((d, rc)) = k {
if rc > 0 { return Some(d); }
rc
} else {
0
}
};
match self.payload(key) {
Some(x) => {
let (d, rc) = x;
if rc as i32 + memrc > 0 {
Some(d)
}
else {
None
}
}
// Replace above match arm with this once https://github.com/rust-lang/rust/issues/15287 is done.
//Some((d, rc)) if rc + memrc > 0 => Some(d),
_ => None,
}
}
fn contains(&self, key: &H256) -> bool {
// return ok if positive; if negative, check backing - might be enough references there to make
// it positive again.
let k = self.overlay.raw(key);
match k {
Some((_, rc)) if rc > 0 => true,
_ => {
let memrc = k.map_or(0, |(_, rc)| rc);
match self.payload(key) {
Some(x) => {
let (_, rc) = x;
rc as i32 + memrc > 0
}
// Replace above match arm with this once https://github.com/rust-lang/rust/issues/15287 is done.
//Some((d, rc)) if rc + memrc > 0 => true,
_ => false,
}
}
}
}
fn insert(&mut self, value: &[u8]) -> H256 { self.overlay.insert(value) }
fn emplace(&mut self, key: H256, value: DBValue) { self.overlay.emplace(key, value); }
fn remove(&mut self, key: &H256) { self.overlay.remove(key); }
}
#[test]
#[cfg_attr(feature="dev", allow(blacklisted_name))]
fn overlaydb_revert() {
let mut m = OverlayDB::new_temp();
let foo = m.insert(b"foo"); // insert foo.
let mut batch = m.backing.transaction();
m.commit_to_batch(&mut batch).unwrap(); // commit - new operations begin here...
m.backing.write(batch).unwrap();
let bar = m.insert(b"bar"); // insert bar.
m.remove(&foo); // remove foo.
assert!(!m.contains(&foo)); // foo is gone.
assert!(m.contains(&bar)); // bar is here.
m.revert(); // revert the last two operations.
assert!(m.contains(&foo)); // foo is here.
assert!(!m.contains(&bar)); // bar is gone.
}
#[test]
fn overlaydb_overlay_insert_and_remove() {
let mut trie = OverlayDB::new_temp();
let h = trie.insert(b"hello world");
assert_eq!(trie.get(&h).unwrap(), DBValue::from_slice(b"hello world"));
trie.remove(&h);
assert_eq!(trie.get(&h), None);
}
#[test]
fn overlaydb_backing_insert_revert() {
let mut trie = OverlayDB::new_temp();
let h = trie.insert(b"hello world");
assert_eq!(trie.get(&h).unwrap(), DBValue::from_slice(b"hello world"));
trie.commit().unwrap();
assert_eq!(trie.get(&h).unwrap(), DBValue::from_slice(b"hello world"));
trie.revert();
assert_eq!(trie.get(&h).unwrap(), DBValue::from_slice(b"hello world"));
}
#[test]
fn overlaydb_backing_remove() {
let mut trie = OverlayDB::new_temp();
let h = trie.insert(b"hello world");
trie.commit().unwrap();
trie.remove(&h);
assert_eq!(trie.get(&h), None);
trie.commit().unwrap();
assert_eq!(trie.get(&h), None);
trie.revert();
assert_eq!(trie.get(&h), None);
}
#[test]
fn overlaydb_backing_remove_revert() {
let mut trie = OverlayDB::new_temp();
let h = trie.insert(b"hello world");
trie.commit().unwrap();
trie.remove(&h);
assert_eq!(trie.get(&h), None);
trie.revert();
assert_eq!(trie.get(&h).unwrap(), DBValue::from_slice(b"hello world"));
}
#[test]
fn overlaydb_negative() {
let mut trie = OverlayDB::new_temp();
let h = trie.insert(b"hello world");
trie.commit().unwrap();
trie.remove(&h);
trie.remove(&h); //bad - sends us into negative refs.
assert_eq!(trie.get(&h), None);
assert!(trie.commit().is_err());
}
#[test]
fn overlaydb_complex() {
let mut trie = OverlayDB::new_temp();
let hfoo = trie.insert(b"foo");
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
let hbar = trie.insert(b"bar");
assert_eq!(trie.get(&hbar).unwrap(), DBValue::from_slice(b"bar"));
trie.commit().unwrap();
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
assert_eq!(trie.get(&hbar).unwrap(), DBValue::from_slice(b"bar"));
trie.insert(b"foo"); // two refs
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
trie.commit().unwrap();
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
assert_eq!(trie.get(&hbar).unwrap(), DBValue::from_slice(b"bar"));
trie.remove(&hbar); // zero refs - delete
assert_eq!(trie.get(&hbar), None);
trie.remove(&hfoo); // one ref - keep
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
trie.commit().unwrap();
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
trie.remove(&hfoo); // zero ref - would delete, but...
assert_eq!(trie.get(&hfoo), None);
trie.insert(b"foo"); // one ref - keep after all.
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
trie.commit().unwrap();
assert_eq!(trie.get(&hfoo).unwrap(), DBValue::from_slice(b"foo"));
trie.remove(&hfoo); // zero ref - delete
assert_eq!(trie.get(&hfoo), None);
trie.commit().unwrap(); //
assert_eq!(trie.get(&hfoo), None);
}