* Consolidation migration

* Started db amalgamation

* Using client constants for columns

* Adding with_columns constructor

* Migrating to single db

* Fixing tests.

* test.sh without verbose

* Fixing warnings

* add migration tests that catch the bug

* make multiple migrations more robust

* add moved v9

* Merge branch 'noop-migrations' into single-db

* spurious line

* clean up migrations ordering

* update comment [ci skip]

* Bumping default number of max_open_files & re-ordering columns.

* fix merge

* fix ignored analysis tests

* Caching best block content

* Faster best_block_header

* Adding progress to v8 migration

* clean up warnings

* Separate hashes and bodies in the DB

* Separate hashes and bodies in the DB

* Fixed tests
This commit is contained in:
Tomasz Drwięga
2016-07-28 23:46:24 +02:00
committed by Gav Wood
parent 0934a283b2
commit e4f0c0b215
42 changed files with 1578 additions and 1058 deletions

View File

@@ -20,9 +20,9 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction, DatabaseConfig};
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
@@ -30,9 +30,6 @@ use std::env;
/// Would be nich to use rocksdb columns for this eventually.
const AUX_FLAG: u8 = 255;
/// Database version.
const DB_VERSION : u32 = 0x103;
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
///
@@ -44,28 +41,18 @@ pub struct ArchiveDB {
overlay: MemoryDB,
backing: Arc<Database>,
latest_era: Option<u64>,
column: Option<u32>,
}
impl ArchiveDB {
/// Create a new instance from file
pub fn new(path: &str, config: DatabaseConfig) -> ArchiveDB {
let backing = Database::open(&config, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
if !backing.is_empty() {
match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
Ok(Some(DB_VERSION)) => {},
v => panic!("Incompatible DB version, expected {}, got {:?}; to resolve, remove {} and restart.", DB_VERSION, v, path)
}
} else {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
}
let latest_era = backing.get(&LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
pub fn new(backing: Arc<Database>, col: Option<u32>) -> ArchiveDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
ArchiveDB {
overlay: MemoryDB::new(),
backing: Arc::new(backing),
backing: backing,
latest_era: latest_era,
column: col,
}
}
@@ -74,18 +61,19 @@ impl ArchiveDB {
fn new_temp() -> ArchiveDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
Self::new(backing, None)
}
fn payload(&self, key: &H256) -> Option<Bytes> {
self.backing.get(key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
}
}
impl HashDB for ArchiveDB {
fn keys(&self) -> HashMap<H256, i32> {
let mut ret: HashMap<H256, i32> = HashMap::new();
for (key, _) in self.backing.iter() {
for (key, _) in self.backing.iter(self.column) {
let h = H256::from_slice(key.deref());
ret.insert(h, 1);
}
@@ -140,7 +128,7 @@ impl HashDB for ArchiveDB {
let mut db_hash = hash.to_vec();
db_hash.push(AUX_FLAG);
self.backing.get(&db_hash)
self.backing.get(self.column, &db_hash)
.expect("Low-level database error. Some issue with your hard disk?")
.map(|v| v.to_vec())
}
@@ -156,6 +144,7 @@ impl JournalDB for ArchiveDB {
overlay: self.overlay.clone(),
backing: self.backing.clone(),
latest_era: self.latest_era,
column: self.column.clone(),
})
}
@@ -167,8 +156,7 @@ impl JournalDB for ArchiveDB {
self.latest_era.is_none()
}
fn commit(&mut self, now: u64, _: &H256, _: Option<(u64, H256)>) -> Result<u32, UtilError> {
let batch = DBTransaction::new();
fn commit(&mut self, batch: &DBTransaction, now: u64, _id: &H256, _end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let mut inserts = 0usize;
let mut deletes = 0usize;
@@ -176,7 +164,7 @@ impl JournalDB for ArchiveDB {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
batch.put(&key, &value).expect("Low-level database error. Some issue with your hard disk?");
batch.put(self.column, &key, &value).expect("Low-level database error. Some issue with your hard disk?");
inserts += 1;
}
if rc < 0 {
@@ -187,24 +175,27 @@ impl JournalDB for ArchiveDB {
for (mut key, value) in self.overlay.drain_aux().into_iter() {
key.push(AUX_FLAG);
batch.put(&key, &value).expect("Low-level database error. Some issue with your hard disk?");
batch.put(self.column, &key, &value).expect("Low-level database error. Some issue with your hard disk?");
}
if self.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
try!(batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)));
self.latest_era = Some(now);
}
try!(self.backing.write(batch));
Ok((inserts + deletes) as u32)
}
fn latest_era(&self) -> Option<u64> { self.latest_era }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing.get_by_prefix(&id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
}
fn is_pruned(&self) -> bool { false }
fn backing(&self) -> &Arc<Database> {
&self.backing
}
}
#[cfg(test)]
@@ -216,7 +207,7 @@ mod tests {
use super::*;
use hashdb::*;
use journaldb::traits::JournalDB;
use kvdb::DatabaseConfig;
use kvdb::Database;
#[test]
fn insert_same_in_fork() {
@@ -224,18 +215,18 @@ mod tests {
let mut jdb = ArchiveDB::new_temp();
let x = jdb.insert(b"X");
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.remove(&x);
jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
let x = jdb.insert(b"X");
jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
jdb.commit_batch(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
jdb.commit_batch(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
assert!(jdb.contains(&x));
}
@@ -245,16 +236,16 @@ mod tests {
// history is 3
let mut jdb = ArchiveDB::new_temp();
let h = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&h));
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
}
#[test]
@@ -264,29 +255,29 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&baz));
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.contains(&foo));
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
}
#[test]
@@ -296,22 +287,22 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.contains(&foo));
}
@@ -321,16 +312,16 @@ mod tests {
let mut jdb = ArchiveDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.insert(b"foo");
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.contains(&foo));
jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
assert!(jdb.contains(&foo));
}
@@ -338,19 +329,24 @@ mod tests {
fn fork_same_key() {
// history is 1
let mut jdb = ArchiveDB::new_temp();
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.contains(&foo));
}
fn new_db(dir: &Path) -> ArchiveDB {
let db = Database::open_default(dir.to_str().unwrap()).unwrap();
ArchiveDB::new(Arc::new(db), None)
}
#[test]
fn reopen() {
let mut dir = ::std::env::temp_dir();
@@ -358,25 +354,25 @@ mod tests {
let bar = H256::random();
let foo = {
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
foo
};
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
}
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
}
}
@@ -386,27 +382,27 @@ mod tests {
dir.push(H32::random().hex());
let foo = {
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
// foo is ancient history.
jdb.insert(b"foo");
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
foo
};
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
}
}
@@ -415,23 +411,23 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let (foo, _, _) = {
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
(foo, bar, baz)
};
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
let mut jdb = new_db(&dir);
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.contains(&foo));
}
}
@@ -441,14 +437,14 @@ mod tests {
let temp = ::devtools::RandomTempPath::new();
let key = {
let mut jdb = ArchiveDB::new(temp.as_str(), DatabaseConfig::default());
let mut jdb = new_db(temp.as_path().as_path());
let key = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
key
};
{
let jdb = ArchiveDB::new(temp.as_str(), DatabaseConfig::default());
let jdb = new_db(temp.as_path().as_path());
let state = jdb.state(&key);
assert!(state.is_some());
}

View File

@@ -20,9 +20,9 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction, DatabaseConfig};
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
@@ -66,33 +66,22 @@ pub struct EarlyMergeDB {
backing: Arc<Database>,
refs: Option<Arc<RwLock<HashMap<H256, RefInfo>>>>,
latest_era: Option<u64>,
column: Option<u32>,
}
const DB_VERSION : u32 = 0x003;
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl EarlyMergeDB {
/// Create a new instance from file
pub fn new(path: &str, config: DatabaseConfig) -> EarlyMergeDB {
let backing = Database::open(&config, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
if !backing.is_empty() {
match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
Ok(Some(DB_VERSION)) => {},
v => panic!("Incompatible DB version, expected {}, got {:?}; to resolve, remove {} and restart.", DB_VERSION, v, path)
}
} else {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
}
let (latest_era, refs) = EarlyMergeDB::read_refs(&backing);
pub fn new(backing: Arc<Database>, col: Option<u32>) -> EarlyMergeDB {
let (latest_era, refs) = EarlyMergeDB::read_refs(&backing, col);
let refs = Some(Arc::new(RwLock::new(refs)));
EarlyMergeDB {
overlay: MemoryDB::new(),
backing: Arc::new(backing),
backing: backing,
refs: refs,
latest_era: latest_era,
column: col,
}
}
@@ -101,7 +90,8 @@ impl EarlyMergeDB {
fn new_temp() -> EarlyMergeDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
Self::new(backing, None)
}
fn morph_key(key: &H256, index: u8) -> Bytes {
@@ -111,13 +101,13 @@ impl EarlyMergeDB {
}
// The next three are valid only as long as there is an insert operation of `key` in the journal.
fn set_already_in(batch: &DBTransaction, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); }
fn reset_already_in(batch: &DBTransaction, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); }
fn is_already_in(backing: &Database, key: &H256) -> bool {
backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some()
fn set_already_in(batch: &DBTransaction, col: Option<u32>, key: &H256) { batch.put(col, &Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); }
fn reset_already_in(batch: &DBTransaction, col: Option<u32>, key: &H256) { batch.delete(col, &Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); }
fn is_already_in(backing: &Database, col: Option<u32>, key: &H256) -> bool {
backing.get(col, &Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some()
}
fn insert_keys(inserts: &[(H256, Bytes)], backing: &Database, refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, trace: bool) {
fn insert_keys(inserts: &[(H256, Bytes)], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, trace: bool) {
for &(ref h, ref d) in inserts {
if let Some(c) = refs.get_mut(h) {
// already counting. increment.
@@ -129,9 +119,9 @@ impl EarlyMergeDB {
}
// this is the first entry for this node in the journal.
if backing.get(h).expect("Low-level database error. Some issue with your hard disk?").is_some() {
if backing.get(col, h).expect("Low-level database error. Some issue with your hard disk?").is_some() {
// already in the backing DB. start counting, and remember it was already in.
Self::set_already_in(batch, h);
Self::set_already_in(batch, col, h);
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: true});
if trace {
trace!(target: "jdb.fine", " insert({}): New to queue, in DB: Recording and inserting into queue", h);
@@ -141,8 +131,8 @@ impl EarlyMergeDB {
// Gets removed when a key leaves the journal, so should never be set when we're placing a new key.
//Self::reset_already_in(&h);
assert!(!Self::is_already_in(backing, &h));
batch.put(h, d).expect("Low-level database error. Some issue with your hard disk?");
assert!(!Self::is_already_in(backing, col, &h));
batch.put(col, h, d).expect("Low-level database error. Some issue with your hard disk?");
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: false});
if trace {
trace!(target: "jdb.fine", " insert({}): New to queue, not in DB: Inserting into queue and DB", h);
@@ -150,7 +140,7 @@ impl EarlyMergeDB {
}
}
fn replay_keys(inserts: &[H256], backing: &Database, refs: &mut HashMap<H256, RefInfo>) {
fn replay_keys(inserts: &[H256], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs);
for h in inserts {
if let Some(c) = refs.get_mut(h) {
@@ -161,12 +151,12 @@ impl EarlyMergeDB {
// this is the first entry for this node in the journal.
// it is initialised to 1 if it was already in.
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: Self::is_already_in(backing, h)});
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: Self::is_already_in(backing, col, h)});
}
trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs);
}
fn remove_keys(deletes: &[H256], refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, from: RemoveFrom, trace: bool) {
fn remove_keys(deletes: &[H256], refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, col: Option<u32>, from: RemoveFrom, trace: bool) {
// with a remove on {queue_refs: 1, in_archive: true}, we have two options:
// - convert to {queue_refs: 1, in_archive: false} (i.e. remove it from the conceptual archive)
// - convert to {queue_refs: 0, in_archive: true} (i.e. remove it from the conceptual queue)
@@ -178,7 +168,7 @@ impl EarlyMergeDB {
if let Some(c) = refs.get_mut(h) {
if c.in_archive && from == RemoveFrom::Archive {
c.in_archive = false;
Self::reset_already_in(batch, h);
Self::reset_already_in(batch, col, h);
if trace {
trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Reducing to queue only and recording", h);
}
@@ -196,14 +186,14 @@ impl EarlyMergeDB {
match n {
Some(RefInfo{queue_refs: 1, in_archive: true}) => {
refs.remove(h);
Self::reset_already_in(batch, h);
Self::reset_already_in(batch, col, h);
if trace {
trace!(target: "jdb.fine", " remove({}): In archive, 1 in queue: Removing from queue and leaving in archive", h);
}
}
Some(RefInfo{queue_refs: 1, in_archive: false}) => {
refs.remove(h);
batch.delete(h).expect("Low-level database error. Some issue with your hard disk?");
batch.delete(col, h).expect("Low-level database error. Some issue with your hard disk?");
if trace {
trace!(target: "jdb.fine", " remove({}): Not in archive, only 1 ref in queue: Removing from queue and DB", h);
}
@@ -211,7 +201,7 @@ impl EarlyMergeDB {
None => {
// Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs.
//assert!(!Self::is_already_in(db, &h));
batch.delete(h).expect("Low-level database error. Some issue with your hard disk?");
batch.delete(col, h).expect("Low-level database error. Some issue with your hard disk?");
if trace {
trace!(target: "jdb.fine", " remove({}): Not in queue - MUST BE IN ARCHIVE: Removing from DB", h);
}
@@ -223,7 +213,7 @@ impl EarlyMergeDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing);
let (latest_era, reconstructed) = Self::read_refs(&self.backing, self.column);
let refs = self.refs.as_ref().unwrap().write();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
@@ -236,18 +226,18 @@ impl EarlyMergeDB {
}
fn payload(&self, key: &H256) -> Option<Bytes> {
self.backing.get(key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
}
fn read_refs(db: &Database) -> (Option<u64>, HashMap<H256, RefInfo>) {
fn read_refs(db: &Database, col: Option<u32>) -> (Option<u64>, HashMap<H256, RefInfo>) {
let mut refs = HashMap::new();
let mut latest_era = None;
if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") {
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
let mut era = decode::<u64>(&val);
latest_era = Some(era);
loop {
let mut index = 0usize;
while let Some(rlp_data) = db.get({
while let Some(rlp_data) = db.get(col, {
let mut r = RlpStream::new_list(3);
r.append(&era);
r.append(&index);
@@ -256,7 +246,7 @@ impl EarlyMergeDB {
}).expect("Low-level database error.") {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
Self::replay_keys(&inserts, db, &mut refs);
Self::replay_keys(&inserts, db, col, &mut refs);
index += 1;
};
if index == 0 || era == 0 {
@@ -267,12 +257,12 @@ impl EarlyMergeDB {
}
(latest_era, refs)
}
}
}
impl HashDB for EarlyMergeDB {
fn keys(&self) -> HashMap<H256, i32> {
let mut ret: HashMap<H256, i32> = HashMap::new();
for (key, _) in self.backing.iter() {
for (key, _) in self.backing.iter(self.column) {
let h = H256::from_slice(key.deref());
ret.insert(h, 1);
}
@@ -321,11 +311,16 @@ impl JournalDB for EarlyMergeDB {
backing: self.backing.clone(),
refs: self.refs.clone(),
latest_era: self.latest_era.clone(),
column: self.column.clone(),
})
}
fn is_empty(&self) -> bool {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn backing(&self) -> &Arc<Database> {
&self.backing
}
fn latest_era(&self) -> Option<u64> { self.latest_era }
@@ -338,11 +333,11 @@ impl JournalDB for EarlyMergeDB {
}
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing.get_by_prefix(&id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
}
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
@@ -389,13 +384,12 @@ impl JournalDB for EarlyMergeDB {
// record new commit's details.
let mut refs = self.refs.as_ref().unwrap().write();
let batch = DBTransaction::new();
let trace = false;
{
let mut index = 0usize;
let mut last;
while try!(self.backing.get({
while try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
@@ -436,15 +430,15 @@ impl JournalDB for EarlyMergeDB {
r.begin_list(inserts.len());
inserts.iter().foreach(|&(k, _)| {r.append(&k);});
r.append(&removes);
Self::insert_keys(&inserts, &self.backing, &mut refs, &batch, trace);
Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, &batch, trace);
if trace {
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
trace!(target: "jdb.ops", " Inserts: {:?}", ins);
trace!(target: "jdb.ops", " Deletes: {:?}", removes);
}
try!(batch.put(&last, r.as_raw()));
try!(batch.put(self.column, &last, r.as_raw()));
if self.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
try!(batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)));
self.latest_era = Some(now);
}
}
@@ -453,7 +447,7 @@ impl JournalDB for EarlyMergeDB {
if let Some((end_era, canon_id)) = end {
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = try!(self.backing.get({
while let Some(rlp_data) = try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
@@ -470,7 +464,7 @@ impl JournalDB for EarlyMergeDB {
if trace {
trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
}
Self::remove_keys(&deletes, &mut refs, &batch, RemoveFrom::Archive, trace);
Self::remove_keys(&deletes, &mut refs, &batch, self.column, RemoveFrom::Archive, trace);
if trace {
trace!(target: "jdb.ops", " Finalising: {:?}", inserts);
@@ -488,7 +482,7 @@ impl JournalDB for EarlyMergeDB {
}
Some( RefInfo{queue_refs: x, in_archive: false} ) => {
// must set already in; ,
Self::set_already_in(&batch, k);
Self::set_already_in(&batch, self.column, k);
refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true });
}
Some( RefInfo{in_archive: true, ..} ) => {
@@ -502,10 +496,10 @@ impl JournalDB for EarlyMergeDB {
if trace {
trace!(target: "jdb.ops", " Reverting: {:?}", inserts);
}
Self::remove_keys(&inserts, &mut refs, &batch, RemoveFrom::Queue, trace);
Self::remove_keys(&inserts, &mut refs, &batch, self.column, RemoveFrom::Queue, trace);
}
try!(batch.delete(&last));
try!(batch.delete(self.column, &last));
index += 1;
}
if trace {
@@ -513,10 +507,6 @@ impl JournalDB for EarlyMergeDB {
}
}
try!(self.backing.write(batch));
// Comment out for now. TODO: automatically enable in tests.
if trace {
trace!(target: "jdb", "OK: {:?}", refs.clone());
}
@@ -535,7 +525,7 @@ mod tests {
use super::super::traits::JournalDB;
use hashdb::*;
use log::init_log;
use kvdb::DatabaseConfig;
use kvdb::{Database, DatabaseConfig};
#[test]
fn insert_same_in_fork() {
@@ -543,25 +533,25 @@ mod tests {
let mut jdb = EarlyMergeDB::new_temp();
let x = jdb.insert(b"X");
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&x);
jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
let x = jdb.insert(b"X");
jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
jdb.commit_batch(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
jdb.commit_batch(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&x));
@@ -571,17 +561,17 @@ mod tests {
fn insert_older_era() {
let mut jdb = EarlyMergeDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0a".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let bar = jdb.insert(b"bar");
jdb.commit(1, &b"1".sha3(), Some((0, b"0a".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(0, &b"0b".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
@@ -592,20 +582,20 @@ mod tests {
// history is 3
let mut jdb = EarlyMergeDB::new_temp();
let h = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&h));
}
@@ -617,7 +607,7 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
@@ -625,7 +615,7 @@ mod tests {
jdb.remove(&foo);
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
@@ -633,20 +623,20 @@ mod tests {
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(!jdb.contains(&baz));
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
assert!(!jdb.contains(&bar));
@@ -660,25 +650,25 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&baz));
@@ -691,115 +681,113 @@ mod tests {
let mut jdb = EarlyMergeDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
}
#[test]
fn fork_same_key_one() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
let mut jdb = EarlyMergeDB::new_temp();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
}
#[test]
fn fork_same_key_other() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
let mut jdb = EarlyMergeDB::new_temp();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
}
#[test]
fn fork_ins_del_ins() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
let mut jdb = EarlyMergeDB::new_temp();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(2, &b"2a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(2, &b"2b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3a".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"3a".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3b".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"3b".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"4a".sha3(), Some((2, b"2a".sha3()))).unwrap();
jdb.commit_batch(4, &b"4a".sha3(), Some((2, b"2a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"5a".sha3(), Some((3, b"3a".sha3()))).unwrap();
jdb.commit_batch(5, &b"5a".sha3(), Some((3, b"3a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
fn new_db(path: &Path) -> EarlyMergeDB {
let config = DatabaseConfig::with_columns(Some(1));
let backing = Arc::new(Database::open(&config, path.to_str().unwrap()).unwrap());
EarlyMergeDB::new(backing, Some(0))
}
#[test]
fn reopen() {
let mut dir = ::std::env::temp_dir();
@@ -807,27 +795,27 @@ mod tests {
let bar = H256::random();
let foo = {
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
foo
};
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
}
@@ -836,145 +824,136 @@ mod tests {
#[test]
fn insert_delete_insert_delete_insert_expunge() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = EarlyMergeDB::new_temp();
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// expunge foo
jdb.commit(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn forked_insert_delete_insert_delete_insert_expunge() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = EarlyMergeDB::new_temp();
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1a".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1b".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2a".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2b".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3a".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3b".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// expunge foo
jdb.commit(5, &b"5".sha3(), Some((1, b"1a".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn broken_assert() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new_temp();
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.remove(&foo);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); // BROKEN
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); // BROKEN
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
}
#[test]
fn reopen_test() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new_temp();
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(3, &b"3".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.remove(&bar);
jdb.commit(6, &b"6".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(6, &b"6".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.insert(b"bar");
jdb.commit(7, &b"7".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(7, &b"7".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
@@ -988,45 +967,48 @@ mod tests {
let foo = b"foo".sha3();
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.remove(&foo);
jdb.commit(2, &b"2".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.insert(b"foo");
jdb.commit(3, &b"3".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
}; {
let mut jdb = new_db(&dir);
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
}; {
let mut jdb = new_db(&dir);
jdb.commit(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
}; {
let mut jdb = new_db(&dir);
jdb.commit(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
jdb.commit_batch(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
}
@@ -1037,26 +1019,26 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let (foo, bar, baz) = {
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
(foo, bar, baz)
};
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
let mut jdb = new_db(&dir);
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&baz));

View File

@@ -17,7 +17,7 @@
//! `JournalDB` interface and implementation.
use common::*;
use kvdb::DatabaseConfig;
use kvdb::Database;
/// Export the journaldb module.
pub mod traits;
@@ -116,19 +116,18 @@ impl fmt::Display for Algorithm {
}
/// Create a new `JournalDB` trait object.
pub fn new(path: &str, algorithm: Algorithm, config: DatabaseConfig) -> Box<JournalDB> {
pub fn new(backing: Arc<Database>, algorithm: Algorithm, col: Option<u32>) -> Box<JournalDB> {
match algorithm {
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(path, config)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(path, config)),
Algorithm::OverlayRecent => Box::new(overlayrecentdb::OverlayRecentDB::new(path, config)),
Algorithm::RefCounted => Box::new(refcounteddb::RefCountedDB::new(path, config)),
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(backing, col)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(backing, col)),
Algorithm::OverlayRecent => Box::new(overlayrecentdb::OverlayRecentDB::new(backing, col)),
Algorithm::RefCounted => Box::new(refcounteddb::RefCountedDB::new(backing, col)),
}
}
// all keys must be at least 12 bytes
const DB_PREFIX_LEN : usize = 12;
const LATEST_ERA_KEY : [u8; DB_PREFIX_LEN] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
const VERSION_KEY : [u8; DB_PREFIX_LEN] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
#[cfg(test)]
mod tests {

View File

@@ -20,8 +20,8 @@ use common::*;
use rlp::*;
use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use kvdb::{Database, DBTransaction, DatabaseConfig};
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use super::JournalDB;
@@ -61,6 +61,7 @@ pub struct OverlayRecentDB {
transaction_overlay: MemoryDB,
backing: Arc<Database>,
journal_overlay: Arc<RwLock<JournalOverlay>>,
column: Option<u32>,
}
#[derive(PartialEq)]
@@ -89,38 +90,22 @@ impl Clone for OverlayRecentDB {
transaction_overlay: self.transaction_overlay.clone(),
backing: self.backing.clone(),
journal_overlay: self.journal_overlay.clone(),
column: self.column.clone(),
}
}
}
const DB_VERSION : u32 = 0x203;
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl OverlayRecentDB {
/// Create a new instance from file
pub fn new(path: &str, config: DatabaseConfig) -> OverlayRecentDB {
Self::from_prefs(path, config)
}
/// Create a new instance from file
pub fn from_prefs(path: &str, config: DatabaseConfig) -> OverlayRecentDB {
let backing = Database::open(&config, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
if !backing.is_empty() {
match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
Ok(Some(DB_VERSION)) => {}
v => panic!("Incompatible DB version, expected {}, got {:?}; to resolve, remove {} and restart.", DB_VERSION, v, path)
}
} else {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
}
let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&backing)));
/// Create a new instance.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> OverlayRecentDB {
let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&backing, col)));
OverlayRecentDB {
transaction_overlay: MemoryDB::new(),
backing: Arc::new(backing),
backing: backing,
journal_overlay: journal_overlay,
column: col,
}
}
@@ -129,31 +114,32 @@ impl OverlayRecentDB {
pub fn new_temp() -> OverlayRecentDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
Self::new(backing, None)
}
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing);
let reconstructed = Self::read_overlay(&self.backing, self.column);
let journal_overlay = self.journal_overlay.read();
*journal_overlay == reconstructed
}
fn payload(&self, key: &H256) -> Option<Bytes> {
self.backing.get(key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
}
fn read_overlay(db: &Database) -> JournalOverlay {
fn read_overlay(db: &Database, col: Option<u32>) -> JournalOverlay {
let mut journal = HashMap::new();
let mut overlay = MemoryDB::new();
let mut count = 0;
let mut latest_era = None;
if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") {
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
let mut era = decode::<u64>(&val);
latest_era = Some(era);
loop {
let mut index = 0usize;
while let Some(rlp_data) = db.get({
while let Some(rlp_data) = db.get(col, {
let mut r = RlpStream::new_list(3);
r.append(&era);
r.append(&index);
@@ -212,21 +198,24 @@ impl JournalDB for OverlayRecentDB {
}
fn is_empty(&self) -> bool {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn backing(&self) -> &Arc<Database> {
&self.backing
}
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
fn state(&self, key: &H256) -> Option<Bytes> {
let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec());
v.or_else(|| self.backing.get_by_prefix(&key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
}
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// record new commit's details.
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut journal_overlay = self.journal_overlay.write();
let batch = DBTransaction::new();
{
let mut r = RlpStream::new_list(3);
let mut tx = self.transaction_overlay.drain();
@@ -249,9 +238,9 @@ impl JournalDB for OverlayRecentDB {
k.append(&now);
k.append(&index);
k.append(&&PADDING[..]);
try!(batch.put(&k.drain(), r.as_raw()));
try!(batch.put(self.column, &k.drain(), r.as_raw()));
if journal_overlay.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
try!(batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)));
journal_overlay.latest_era = Some(now);
}
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
@@ -271,7 +260,7 @@ impl JournalDB for OverlayRecentDB {
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
try!(batch.delete(&r.drain()));
try!(batch.delete(self.column, &r.drain()));
trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
{
if canon_id == journal.id {
@@ -290,7 +279,7 @@ impl JournalDB for OverlayRecentDB {
}
// apply canon inserts first
for (k, v) in canon_insertions {
try!(batch.put(&k, &v));
try!(batch.put(self.column, &k, &v));
}
// update the overlay
for k in overlay_deletions {
@@ -299,13 +288,12 @@ impl JournalDB for OverlayRecentDB {
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&OverlayRecentDB::to_short_key(&k)) {
try!(batch.delete(&k));
try!(batch.delete(self.column, &k));
}
}
}
journal_overlay.journal.remove(&end_era);
}
try!(self.backing.write(batch));
Ok(0)
}
@@ -314,7 +302,7 @@ impl JournalDB for OverlayRecentDB {
impl HashDB for OverlayRecentDB {
fn keys(&self) -> HashMap<H256, i32> {
let mut ret: HashMap<H256, i32> = HashMap::new();
for (key, _) in self.backing.iter() {
for (key, _) in self.backing.iter(self.column) {
let h = H256::from_slice(key.deref());
ret.insert(h, 1);
}
@@ -374,7 +362,12 @@ mod tests {
use hashdb::*;
use log::init_log;
use journaldb::JournalDB;
use kvdb::DatabaseConfig;
use kvdb::Database;
fn new_db(path: &Path) -> OverlayRecentDB {
let backing = Arc::new(Database::open_default(path.to_str().unwrap()).unwrap());
OverlayRecentDB::new(backing, None)
}
#[test]
fn insert_same_in_fork() {
@@ -382,25 +375,25 @@ mod tests {
let mut jdb = OverlayRecentDB::new_temp();
let x = jdb.insert(b"X");
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&x);
jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
let x = jdb.insert(b"X");
jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
jdb.commit_batch(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
jdb.commit_batch(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&x));
@@ -411,20 +404,20 @@ mod tests {
// history is 3
let mut jdb = OverlayRecentDB::new_temp();
let h = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&h));
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&h));
}
@@ -436,7 +429,7 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
@@ -444,7 +437,7 @@ mod tests {
jdb.remove(&foo);
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
@@ -452,20 +445,20 @@ mod tests {
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(!jdb.contains(&baz));
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
assert!(!jdb.contains(&bar));
@@ -479,25 +472,25 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&baz));
@@ -510,112 +503,105 @@ mod tests {
let mut jdb = OverlayRecentDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
}
#[test]
fn fork_same_key_one() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
let mut jdb = OverlayRecentDB::new_temp();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
}
#[test]
fn fork_same_key_other() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new_temp();
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
}
#[test]
fn fork_ins_del_ins() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new_temp();
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(2, &b"2a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(2, &b"2b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3a".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"3a".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3b".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"3b".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"4a".sha3(), Some((2, b"2a".sha3()))).unwrap();
jdb.commit_batch(4, &b"4a".sha3(), Some((2, b"2a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"5a".sha3(), Some((3, b"3a".sha3()))).unwrap();
jdb.commit_batch(5, &b"5a".sha3(), Some((3, b"3a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
@@ -626,27 +612,27 @@ mod tests {
let bar = H256::random();
let foo = {
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
foo
};
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
}
@@ -655,145 +641,133 @@ mod tests {
#[test]
fn insert_delete_insert_delete_insert_expunge() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = OverlayRecentDB::new_temp();
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// expunge foo
jdb.commit(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn forked_insert_delete_insert_delete_insert_expunge() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = OverlayRecentDB::new_temp();
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1a".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1b".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2a".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2b".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3a".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3b".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// expunge foo
jdb.commit(5, &b"5".sha3(), Some((1, b"1a".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn broken_assert() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new_temp();
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.remove(&foo);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); // BROKEN
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); // BROKEN
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
}
#[test]
fn reopen_test() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = OverlayRecentDB::new_temp();
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(3, &b"3".sha3(), None).unwrap();
jdb.commit_batch(3, &b"3".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.remove(&bar);
jdb.commit(6, &b"6".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(6, &b"6".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.insert(b"bar");
jdb.commit(7, &b"7".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(7, &b"7".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
@@ -807,45 +781,48 @@ mod tests {
let foo = b"foo".sha3();
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.remove(&foo);
jdb.commit(2, &b"2".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
jdb.insert(b"foo");
jdb.commit(3, &b"3".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
}; {
let mut jdb = new_db(&dir);
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
}; {
let mut jdb = new_db(&dir);
jdb.commit(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
}; {
let mut jdb = new_db(&dir);
jdb.commit(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
jdb.commit_batch(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.contains(&foo));
}
@@ -856,26 +833,26 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let (foo, bar, baz) = {
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
let mut jdb = new_db(&dir);
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
(foo, bar, baz)
};
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
let mut jdb = new_db(&dir);
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&baz));
@@ -887,17 +864,17 @@ mod tests {
fn insert_older_era() {
let mut jdb = OverlayRecentDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0a".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let bar = jdb.insert(b"bar");
jdb.commit(1, &b"1".sha3(), Some((0, b"0a".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(0, &b"0b".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));

View File

@@ -20,9 +20,9 @@ use common::*;
use rlp::*;
use hashdb::*;
use overlaydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY};
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction, DatabaseConfig};
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
@@ -39,35 +39,23 @@ pub struct RefCountedDB {
latest_era: Option<u64>,
inserts: Vec<H256>,
removes: Vec<H256>,
column: Option<u32>,
}
const DB_VERSION : u32 = 0x200;
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB {
/// Create a new instance given a `backing` database.
pub fn new(path: &str, config: DatabaseConfig) -> RefCountedDB {
let backing = Database::open(&config, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
if !backing.is_empty() {
match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
Ok(Some(DB_VERSION)) => {},
v => panic!("Incompatible DB version, expected {}, got {:?}; to resolve, remove {} and restart.", DB_VERSION, v, path)
}
} else {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
}
let backing = Arc::new(backing);
let latest_era = backing.get(&LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
pub fn new(backing: Arc<Database>, col: Option<u32>) -> RefCountedDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
RefCountedDB {
forward: OverlayDB::new_with_arc(backing.clone()),
forward: OverlayDB::new(backing.clone(), col),
backing: backing,
inserts: vec![],
removes: vec![],
latest_era: latest_era,
column: col,
}
}
@@ -76,7 +64,8 @@ impl RefCountedDB {
fn new_temp() -> RefCountedDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
Self::new(backing, None)
}
}
@@ -97,6 +86,7 @@ impl JournalDB for RefCountedDB {
latest_era: self.latest_era,
inserts: self.inserts.clone(),
removes: self.removes.clone(),
column: self.column.clone(),
})
}
@@ -108,13 +98,17 @@ impl JournalDB for RefCountedDB {
self.latest_era.is_none()
}
fn backing(&self) -> &Arc<Database> {
&self.backing
}
fn latest_era(&self) -> Option<u64> { self.latest_era }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing.get_by_prefix(&id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
self.backing.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]).map(|b| b.to_vec())
}
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
@@ -128,12 +122,11 @@ impl JournalDB for RefCountedDB {
// of its inserts otherwise.
// record new commit's details.
let batch = DBTransaction::new();
{
let mut index = 0usize;
let mut last;
while try!(self.backing.get({
while try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
@@ -148,7 +141,7 @@ impl JournalDB for RefCountedDB {
r.append(id);
r.append(&self.inserts);
r.append(&self.removes);
try!(batch.put(&last, r.as_raw()));
try!(batch.put(self.column, &last, r.as_raw()));
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
@@ -156,7 +149,7 @@ impl JournalDB for RefCountedDB {
self.removes.clear();
if self.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
try!(batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)));
self.latest_era = Some(now);
}
}
@@ -167,7 +160,7 @@ impl JournalDB for RefCountedDB {
let mut last;
while let Some(rlp_data) = {
// trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index);
try!(self.backing.get({
try!(self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
@@ -183,13 +176,12 @@ impl JournalDB for RefCountedDB {
for i in &to_remove {
self.forward.remove(i);
}
try!(batch.delete(&last));
try!(batch.delete(self.column, &last));
index += 1;
}
}
let r = try!(self.forward.commit_to_batch(&batch));
try!(self.backing.write(batch));
Ok(r)
}
}
@@ -209,16 +201,16 @@ mod tests {
// history is 3
let mut jdb = RefCountedDB::new_temp();
let h = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&h));
jdb.remove(&h);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert!(jdb.contains(&h));
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&h));
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(!jdb.contains(&h));
}
@@ -228,16 +220,16 @@ mod tests {
let mut jdb = RefCountedDB::new_temp();
assert_eq!(jdb.latest_era(), None);
let h = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert_eq!(jdb.latest_era(), Some(0));
jdb.remove(&h);
jdb.commit(1, &b"1".sha3(), None).unwrap();
jdb.commit_batch(1, &b"1".sha3(), None).unwrap();
assert_eq!(jdb.latest_era(), Some(1));
jdb.commit(2, &b"2".sha3(), None).unwrap();
jdb.commit_batch(2, &b"2".sha3(), None).unwrap();
assert_eq!(jdb.latest_era(), Some(2));
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
assert_eq!(jdb.latest_era(), Some(3));
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
assert_eq!(jdb.latest_era(), Some(4));
}
@@ -248,32 +240,32 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
jdb.commit_batch(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit_batch(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(!jdb.contains(&baz));
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit_batch(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(!jdb.contains(&foo));
assert!(!jdb.contains(&bar));
assert!(!jdb.contains(&baz));
@@ -286,22 +278,22 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit_batch(0, &b"0".sha3(), None).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
jdb.commit_batch(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
assert!(jdb.contains(&baz));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
jdb.commit_batch(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.contains(&foo));
assert!(!jdb.contains(&baz));
assert!(!jdb.contains(&bar));

View File

@@ -18,6 +18,7 @@
use common::*;
use hashdb::*;
use kvdb::{Database, DBTransaction};
/// A `HashDB` which can manage a short-term journal potentially containing many forks of mutually
/// exclusive actions.
@@ -36,11 +37,22 @@ pub trait JournalDB : HashDB + Send + Sync {
/// Commit all recent insert operations and canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
/// State data query
fn state(&self, _id: &H256) -> Option<Bytes>;
/// Whether this database is pruned.
fn is_pruned(&self) -> bool { true }
/// Get backing database.
fn backing(&self) -> &Arc<Database>;
#[cfg(test)]
/// Commit all changes in a single batch
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let batch = self.backing().transaction();
let res = try!(self.commit(&batch, now, id, end));
self.backing().write(batch).map(|_| res).map_err(Into::into)
}
}

View File

@@ -18,7 +18,7 @@
use std::default::Default;
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBVector, DBIterator,
Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache};
Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache, Column};
const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2;
@@ -26,33 +26,31 @@ const DB_BACKGROUND_COMPACTIONS: i32 = 2;
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
pub struct DBTransaction {
batch: WriteBatch,
}
impl Default for DBTransaction {
fn default() -> Self {
DBTransaction::new()
}
cfs: Vec<Column>,
}
impl DBTransaction {
/// Create new transaction.
pub fn new() -> DBTransaction {
DBTransaction { batch: WriteBatch::new() }
pub fn new(db: &Database) -> DBTransaction {
DBTransaction {
batch: WriteBatch::new(),
cfs: db.cfs.clone(),
}
}
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
self.batch.put(key, value)
pub fn put(&self, col: Option<u32>, key: &[u8], value: &[u8]) -> Result<(), String> {
col.map_or_else(|| self.batch.put(key, value), |c| self.batch.put_cf(self.cfs[c as usize], key, value))
}
/// Delete value by key.
pub fn delete(&self, key: &[u8]) -> Result<(), String> {
self.batch.delete(key)
pub fn delete(&self, col: Option<u32>, key: &[u8]) -> Result<(), String> {
col.map_or_else(|| self.batch.delete(key), |c| self.batch.delete_cf(self.cfs[c as usize], key))
}
}
/// Compaction profile for the database settings
#[derive(Clone)]
#[derive(Clone, Copy)]
pub struct CompactionProfile {
/// L0-L1 target file size
pub initial_file_size: u64,
@@ -85,6 +83,7 @@ impl CompactionProfile {
}
/// Database configuration
#[derive(Clone, Copy)]
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
@@ -92,22 +91,16 @@ pub struct DatabaseConfig {
pub cache_size: Option<usize>,
/// Compaction profile
pub compaction: CompactionProfile,
/// Set number of columns
pub columns: Option<u32>,
}
impl DatabaseConfig {
/// Database with default settings and specified cache size
pub fn with_cache(cache_size: usize) -> DatabaseConfig {
DatabaseConfig {
cache_size: Some(cache_size),
max_open_files: 256,
compaction: CompactionProfile::default(),
}
}
/// Modify the compaction profile
pub fn compaction(mut self, profile: CompactionProfile) -> Self {
self.compaction = profile;
self
/// Create new `DatabaseConfig` with default parameters and specified set of columns.
pub fn with_columns(columns: Option<u32>) -> Self {
let mut config = Self::default();
config.columns = columns;
config
}
}
@@ -115,8 +108,9 @@ impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
cache_size: None,
max_open_files: 256,
max_open_files: 1024,
compaction: CompactionProfile::default(),
columns: None,
}
}
}
@@ -138,6 +132,7 @@ impl<'a> Iterator for DatabaseIterator {
pub struct Database {
db: DB,
write_opts: WriteOptions,
cfs: Vec<Column>,
}
impl Database {
@@ -171,10 +166,35 @@ impl Database {
opts.set_block_based_table_factory(&block_opts);
}
let write_opts = WriteOptions::new();
//write_opts.disable_wal(true); // TODO: make sure this is safe
let mut write_opts = WriteOptions::new();
write_opts.disable_wal(true); // TODO: make sure this is safe
let db = match DB::open(&opts, path) {
let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(columns) => {
let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
match DB::open_cf(&opts, path, &cfnames) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n).unwrap()).collect();
assert!(cfs.len() == columns as usize);
Ok(db)
}
Err(_) => {
// retry and create CFs
match DB::open_cf(&opts, path, &[]) {
Ok(mut db) => {
cfs = cfnames.iter().map(|n| db.create_cf(n, &opts).unwrap()).collect();
Ok(db)
},
err @ Err(_) => err,
}
}
}
},
None => DB::open(&opts, path)
};
let db = match db {
Ok(db) => db,
Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
@@ -184,17 +204,12 @@ impl Database {
},
Err(s) => { return Err(s); }
};
Ok(Database { db: db, write_opts: write_opts, })
Ok(Database { db: db, write_opts: write_opts, cfs: cfs })
}
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten.
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
self.db.put_opt(key, value, &self.write_opts)
}
/// Delete value by key.
pub fn delete(&self, key: &[u8]) -> Result<(), String> {
self.db.delete_opt(key, &self.write_opts)
/// Creates new transaction for this database.
pub fn transaction(&self) -> DBTransaction {
DBTransaction::new(self)
}
/// Commit transaction to database.
@@ -203,13 +218,14 @@ impl Database {
}
/// Get value by key.
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
self.db.get(key)
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBVector>, String> {
col.map_or_else(|| self.db.get(key), |c| self.db.get_cf(self.cfs[c as usize], key))
}
/// Get value by partial key. Prefix size should match configured prefix size.
pub fn get_by_prefix(&self, prefix: &[u8]) -> Option<Box<[u8]>> {
let mut iter = self.db.iterator(IteratorMode::From(prefix, Direction::Forward));
pub fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
let mut iter = col.map_or_else(|| self.db.iterator(IteratorMode::From(prefix, Direction::Forward)),
|c| self.db.iterator_cf(self.cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)).unwrap());
match iter.next() {
// TODO: use prefix_same_as_start read option (not availabele in C API currently)
Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None },
@@ -218,13 +234,14 @@ impl Database {
}
/// Check if there is anything in the database.
pub fn is_empty(&self) -> bool {
self.db.iterator(IteratorMode::Start).next().is_none()
pub fn is_empty(&self, col: Option<u32>) -> bool {
self.iter(col).next().is_none()
}
/// Check if there is anything in the database.
pub fn iter(&self) -> DatabaseIterator {
DatabaseIterator { iter: self.db.iterator(IteratorMode::Start) }
/// Get database iterator.
pub fn iter(&self, col: Option<u32>) -> DatabaseIterator {
col.map_or_else(|| DatabaseIterator { iter: self.db.iterator(IteratorMode::Start) },
|c| DatabaseIterator { iter: self.db.iterator_cf(self.cfs[c as usize], IteratorMode::Start).unwrap() })
}
}
@@ -243,38 +260,46 @@ mod tests {
let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap();
let key3 = H256::from_str("01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap();
db.put(&key1, b"cat").unwrap();
db.put(&key2, b"dog").unwrap();
let batch = db.transaction();
batch.put(None, &key1, b"cat").unwrap();
batch.put(None, &key2, b"dog").unwrap();
db.write(batch).unwrap();
assert_eq!(db.get(&key1).unwrap().unwrap().deref(), b"cat");
assert_eq!(db.get(None, &key1).unwrap().unwrap().deref(), b"cat");
let contents: Vec<_> = db.iter().collect();
let contents: Vec<_> = db.iter(None).collect();
assert_eq!(contents.len(), 2);
assert_eq!(&*contents[0].0, key1.deref());
assert_eq!(&*contents[0].1, b"cat");
assert_eq!(&*contents[1].0, key2.deref());
assert_eq!(&*contents[1].1, b"dog");
db.delete(&key1).unwrap();
assert!(db.get(&key1).unwrap().is_none());
db.put(&key1, b"cat").unwrap();
let batch = db.transaction();
batch.delete(None, &key1).unwrap();
db.write(batch).unwrap();
let transaction = DBTransaction::new();
transaction.put(&key3, b"elephant").unwrap();
transaction.delete(&key1).unwrap();
assert!(db.get(None, &key1).unwrap().is_none());
let batch = db.transaction();
batch.put(None, &key1, b"cat").unwrap();
db.write(batch).unwrap();
let transaction = db.transaction();
transaction.put(None, &key3, b"elephant").unwrap();
transaction.delete(None, &key1).unwrap();
db.write(transaction).unwrap();
assert!(db.get(&key1).unwrap().is_none());
assert_eq!(db.get(&key3).unwrap().unwrap().deref(), b"elephant");
assert!(db.get(None, &key1).unwrap().is_none());
assert_eq!(db.get(None, &key3).unwrap().unwrap().deref(), b"elephant");
assert_eq!(db.get_by_prefix(&key3).unwrap().deref(), b"elephant");
assert_eq!(db.get_by_prefix(&key2).unwrap().deref(), b"dog");
assert_eq!(db.get_by_prefix(None, &key3).unwrap().deref(), b"elephant");
assert_eq!(db.get_by_prefix(None, &key2).unwrap().deref(), b"dog");
}
#[test]
fn kvdb() {
let path = RandomTempPath::create_dir();
let smoke = Database::open_default(path.as_path().to_str().unwrap()).unwrap();
assert!(smoke.is_empty());
assert!(smoke.is_empty(None));
test_db(&DatabaseConfig::default());
}
}

View File

@@ -46,14 +46,16 @@ impl Default for Config {
pub struct Batch {
inner: BTreeMap<Vec<u8>, Vec<u8>>,
batch_size: usize,
column: Option<u32>,
}
impl Batch {
/// Make a new batch with the given config.
pub fn new(config: &Config) -> Self {
pub fn new(config: &Config, col: Option<u32>) -> Self {
Batch {
inner: BTreeMap::new(),
batch_size: config.batch_size,
column: col,
}
}
@@ -70,10 +72,10 @@ impl Batch {
pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> {
if self.inner.is_empty() { return Ok(()) }
let transaction = DBTransaction::new();
let transaction = DBTransaction::new(dest);
for keypair in &self.inner {
try!(transaction.put(&keypair.0, &keypair.1).map_err(Error::Custom));
try!(transaction.put(self.column, &keypair.0, &keypair.1).map_err(Error::Custom));
}
self.inner.clear();
@@ -102,14 +104,18 @@ impl From<::std::io::Error> for Error {
/// A generalized migration from the given db to a destination db.
pub trait Migration: 'static {
/// Number of columns in database after the migration.
fn columns(&self) -> Option<u32>;
/// Version of the database after the migration.
fn version(&self) -> u32;
/// Migrate a source to a destination.
fn migrate(&mut self, source: &Database, config: &Config, destination: &mut Database) -> Result<(), Error>;
fn migrate(&mut self, source: &Database, config: &Config, destination: &mut Database, col: Option<u32>) -> Result<(), Error>;
}
/// A simple migration over key-value pairs.
pub trait SimpleMigration: 'static {
/// Number of columns in database after the migration.
fn columns(&self) -> Option<u32>;
/// Version of database after the migration.
fn version(&self) -> u32;
/// Should migrate existing object to new database.
@@ -118,12 +124,14 @@ pub trait SimpleMigration: 'static {
}
impl<T: SimpleMigration> Migration for T {
fn columns(&self) -> Option<u32> { SimpleMigration::columns(self) }
fn version(&self) -> u32 { SimpleMigration::version(self) }
fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database) -> Result<(), Error> {
let mut batch = Batch::new(config);
fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
let mut batch = Batch::new(config, col);
for (key, value) in source.iter() {
for (key, value) in source.iter(col) {
if let Some((key, value)) = self.simple_migrate(key.to_vec(), value.to_vec()) {
try!(batch.insert(key, value, dest));
}
@@ -197,12 +205,14 @@ impl Manager {
/// and producing a path where the final migration lives.
pub fn execute(&mut self, old_path: &Path, version: u32) -> Result<PathBuf, Error> {
let config = self.config.clone();
let mut migrations = self.migrations_from(version);
let columns = self.no_of_columns_at(version);
let migrations = self.migrations_from(version);
if migrations.is_empty() { return Err(Error::MigrationImpossible) };
let db_config = DatabaseConfig {
let mut db_config = DatabaseConfig {
max_open_files: 64,
cache_size: None,
compaction: config.compaction_profile.clone(),
compaction: config.compaction_profile,
columns: columns,
};
let db_root = database_path(old_path);
@@ -212,14 +222,28 @@ impl Manager {
// start with the old db.
let old_path_str = try!(old_path.to_str().ok_or(Error::MigrationImpossible));
let mut cur_db = try!(Database::open(&db_config, old_path_str).map_err(Error::Custom));
for migration in migrations {
// Change number of columns in new db
let current_columns = db_config.columns;
db_config.columns = migration.columns();
// open the target temporary database.
temp_path = temp_idx.path(&db_root);
let temp_path_str = try!(temp_path.to_str().ok_or(Error::MigrationImpossible));
let mut new_db = try!(Database::open(&db_config, temp_path_str).map_err(Error::Custom));
// perform the migration from cur_db to new_db.
try!(migration.migrate(&cur_db, &config, &mut new_db));
match current_columns {
// migrate only default column
None => try!(migration.migrate(&cur_db, &config, &mut new_db, None)),
Some(v) => {
// Migrate all columns in previous DB
for col in 0..v {
try!(migration.migrate(&cur_db, &config, &mut new_db, Some(col)))
}
}
}
// next iteration, we will migrate from this db into the other temp.
cur_db = new_db;
temp_idx.swap();
@@ -242,5 +266,38 @@ impl Manager {
fn migrations_from(&mut self, version: u32) -> Vec<&mut Box<Migration>> {
self.migrations.iter_mut().filter(|m| m.version() > version).collect()
}
fn no_of_columns_at(&self, version: u32) -> Option<u32> {
let migration = self.migrations.iter().find(|m| m.version() == version);
match migration {
Some(m) => m.columns(),
None => None
}
}
}
/// Prints a dot every `max` ticks
pub struct Progress {
current: usize,
max: usize,
}
impl Default for Progress {
fn default() -> Self {
Progress {
current: 0,
max: 100_000,
}
}
}
impl Progress {
/// Tick progress meter.
pub fn tick(&mut self) {
self.current += 1;
if self.current == self.max {
self.current = 0;
flush!(".");
}
}
}

View File

@@ -20,7 +20,7 @@
use common::*;
use migration::{Config, SimpleMigration, Manager};
use kvdb::{Database, DBTransaction};
use kvdb::Database;
use devtools::RandomTempPath;
use std::path::PathBuf;
@@ -35,9 +35,9 @@ fn db_path(path: &Path) -> PathBuf {
fn make_db(path: &Path, pairs: BTreeMap<Vec<u8>, Vec<u8>>) {
let db = Database::open_default(path.to_str().unwrap()).expect("failed to open temp database");
{
let transaction = DBTransaction::new();
let transaction = db.transaction();
for (k, v) in pairs {
transaction.put(&k, &v).expect("failed to add pair to transaction");
transaction.put(None, &k, &v).expect("failed to add pair to transaction");
}
db.write(transaction).expect("failed to write db transaction");
@@ -49,7 +49,7 @@ fn verify_migration(path: &Path, pairs: BTreeMap<Vec<u8>, Vec<u8>>) {
let db = Database::open_default(path.to_str().unwrap()).unwrap();
for (k, v) in pairs {
let x = db.get(&k).unwrap().unwrap();
let x = db.get(None, &k).unwrap().unwrap();
assert_eq!(&x[..], &v[..]);
}
@@ -58,9 +58,9 @@ fn verify_migration(path: &Path, pairs: BTreeMap<Vec<u8>, Vec<u8>>) {
struct Migration0;
impl SimpleMigration for Migration0 {
fn version(&self) -> u32 {
1
}
fn columns(&self) -> Option<u32> { None }
fn version(&self) -> u32 { 1 }
fn simple_migrate(&mut self, key: Vec<u8>, value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)> {
let mut key = key;
@@ -74,9 +74,9 @@ impl SimpleMigration for Migration0 {
struct Migration1;
impl SimpleMigration for Migration1 {
fn version(&self) -> u32 {
2
}
fn columns(&self) -> Option<u32> { None }
fn version(&self) -> u32 { 2 }
fn simple_migrate(&mut self, key: Vec<u8>, _value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)> {
Some((key, vec![]))

View File

@@ -24,7 +24,6 @@ use hashdb::*;
use memorydb::*;
use std::ops::*;
use std::sync::*;
use std::env;
use std::collections::HashMap;
use kvdb::{Database, DBTransaction};
@@ -40,22 +39,29 @@ use kvdb::{Database, DBTransaction};
pub struct OverlayDB {
overlay: MemoryDB,
backing: Arc<Database>,
column: Option<u32>,
}
impl OverlayDB {
/// Create a new instance of OverlayDB given a `backing` database.
pub fn new(backing: Database) -> OverlayDB { Self::new_with_arc(Arc::new(backing)) }
/// Create a new instance of OverlayDB given a `backing` database.
pub fn new_with_arc(backing: Arc<Database>) -> OverlayDB {
OverlayDB{ overlay: MemoryDB::new(), backing: backing }
pub fn new(backing: Arc<Database>, col: Option<u32>) -> OverlayDB {
OverlayDB{ overlay: MemoryDB::new(), backing: backing, column: col }
}
/// Create a new instance of OverlayDB with an anonymous temporary database.
#[cfg(test)]
pub fn new_temp() -> OverlayDB {
let mut dir = env::temp_dir();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
Self::new(Database::open_default(dir.to_str().unwrap()).unwrap())
Self::new(Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()), None)
}
/// Commit all operations in a single batch.
#[cfg(test)]
pub fn commit(&mut self) -> Result<u32, UtilError> {
let batch = self.backing.transaction();
let res = try!(self.commit_to_batch(&batch));
self.backing.write(batch).map(|_| res).map_err(|e| e.into())
}
/// Commit all operations to given batch.
@@ -88,83 +94,8 @@ impl OverlayDB {
Ok(ret)
}
/// Commit all memory operations to the backing database.
///
/// Returns either an error or the number of items changed in the backing database.
///
/// Will return an error if the number of `remove()`s ever exceeds the number of
/// `insert()`s for any key. This will leave the database in an undeterminate
/// state. Don't ever let it happen.
///
/// # Example
/// ```
/// extern crate ethcore_util;
/// use ethcore_util::hashdb::*;
/// use ethcore_util::overlaydb::*;
/// fn main() {
/// let mut m = OverlayDB::new_temp();
/// let key = m.insert(b"foo"); // insert item.
/// assert!(m.contains(&key)); // key exists (in memory).
/// assert_eq!(m.commit().unwrap(), 1); // 1 item changed.
/// assert!(m.contains(&key)); // key still exists (in backing).
/// m.remove(&key); // delete item.
/// assert!(!m.contains(&key)); // key "doesn't exist" (though still does in backing).
/// m.remove(&key); // oh dear... more removes than inserts for the key...
/// //m.commit().unwrap(); // this commit/unwrap would cause a panic.
/// m.revert(); // revert both removes.
/// assert!(m.contains(&key)); // key now still exists.
/// }
/// ```
pub fn commit(&mut self) -> Result<u32, UtilError> {
let mut ret = 0u32;
let mut deletes = 0usize;
for i in self.overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc != 0 {
match self.payload(&key) {
Some(x) => {
let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
deletes += if self.put_payload(&key, (back_value, total_rc as u32)) {1} else {0};
}
None => {
if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
self.put_payload(&key, (value, rc as u32));
}
};
ret += 1;
}
}
trace!("OverlayDB::commit() deleted {} nodes", deletes);
Ok(ret)
}
/// Revert all operations on this object (i.e. `insert()`s and `remove()`s) since the
/// last `commit()`.
///
/// # Example
/// ```
/// extern crate ethcore_util;
/// use ethcore_util::hashdb::*;
/// use ethcore_util::overlaydb::*;
/// fn main() {
/// let mut m = OverlayDB::new_temp();
/// let foo = m.insert(b"foo"); // insert foo.
/// m.commit().unwrap(); // commit - new operations begin here...
/// let bar = m.insert(b"bar"); // insert bar.
/// m.remove(&foo); // remove foo.
/// assert!(!m.contains(&foo)); // foo is gone.
/// assert!(m.contains(&bar)); // bar is here.
/// m.revert(); // revert the last two operations.
/// assert!(m.contains(&foo)); // foo is here.
/// assert!(!m.contains(&bar)); // bar is gone.
/// }
/// ```
pub fn revert(&mut self) { self.overlay.clear(); }
/// Get the number of references that would be committed.
@@ -172,7 +103,7 @@ impl OverlayDB {
/// Get the refs and value of the given key.
fn payload(&self, key: &H256) -> Option<(Bytes, u32)> {
self.backing.get(key)
self.backing.get(self.column, key)
.expect("Low-level database error. Some issue with your hard disk?")
.map(|d| {
let r = Rlp::new(&d);
@@ -186,24 +117,10 @@ impl OverlayDB {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
s.append(&payload.0);
batch.put(key, s.as_raw()).expect("Low-level database error. Some issue with your hard disk?");
batch.put(self.column, key, s.as_raw()).expect("Low-level database error. Some issue with your hard disk?");
false
} else {
batch.delete(key).expect("Low-level database error. Some issue with your hard disk?");
true
}
}
/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
s.append(&payload.0);
self.backing.put(key, s.as_raw()).expect("Low-level database error. Some issue with your hard disk?");
false
} else {
self.backing.delete(key).expect("Low-level database error. Some issue with your hard disk?");
batch.delete(self.column, key).expect("Low-level database error. Some issue with your hard disk?");
true
}
}
@@ -212,7 +129,7 @@ impl OverlayDB {
impl HashDB for OverlayDB {
fn keys(&self) -> HashMap<H256, i32> {
let mut ret: HashMap<H256, i32> = HashMap::new();
for (key, _) in self.backing.iter() {
for (key, _) in self.backing.iter(self.column) {
let h = H256::from_slice(key.deref());
let r = self.payload(&h).unwrap().1;
ret.insert(h, r as i32);
@@ -274,6 +191,22 @@ impl HashDB for OverlayDB {
fn remove(&mut self, key: &H256) { self.overlay.remove(key); }
}
#[test]
fn overlaydb_revert() {
let mut m = OverlayDB::new_temp();
let foo = m.insert(b"foo"); // insert foo.
let batch = m.backing.transaction();
m.commit_to_batch(&batch).unwrap(); // commit - new operations begin here...
m.backing.write(batch).unwrap();
let bar = m.insert(b"bar"); // insert bar.
m.remove(&foo); // remove foo.
assert!(!m.contains(&foo)); // foo is gone.
assert!(m.contains(&bar)); // bar is here.
m.revert(); // revert the last two operations.
assert!(m.contains(&foo)); // foo is here.
assert!(!m.contains(&bar)); // bar is gone.
}
#[test]
fn overlaydb_overlay_insert_and_remove() {
let mut trie = OverlayDB::new_temp();
@@ -366,14 +299,18 @@ fn overlaydb_complex() {
fn playpen() {
use std::fs;
{
let db: Database = Database::open_default("/tmp/test").unwrap();
db.put(b"test", b"test2").unwrap();
match db.get(b"test") {
let db = Database::open_default("/tmp/test").unwrap();
let batch = db.transaction();
batch.put(None, b"test", b"test2").unwrap();
db.write(batch).unwrap();
match db.get(None, b"test") {
Ok(Some(value)) => println!("Got value {:?}", value.deref()),
Ok(None) => println!("No value for that key"),
Err(..) => println!("Gah"),
}
db.delete(b"test").unwrap();
let batch = db.transaction();
batch.delete(None, b"test").unwrap();
db.write(batch).unwrap();
}
fs::remove_dir_all("/tmp/test").unwrap();
}

View File

@@ -59,7 +59,7 @@ mod tests {
use kvdb::*;
let path = "db path".to_string();
let values: Vec<_> = Database::open_default(&path).unwrap().iter().map(|(_, v)| v).collect();
let values: Vec<_> = Database::open_default(&path).unwrap().iter(Some(2)).map(|(_, v)| v).collect();
let mut rlp_counts: HashMap<_, u32> = HashMap::new();
let mut rlp_sizes: HashMap<_, u32> = HashMap::new();

View File

@@ -228,7 +228,7 @@ mod tests {
fn test_compression() {
use kvdb::*;
let path = "db to test".to_string();
let values: Vec<_> = Database::open_default(&path).unwrap().iter().map(|(_, v)| v).collect();
let values: Vec<_> = Database::open_default(&path).unwrap().iter(Some(2)).map(|(_, v)| v).collect();
let mut decomp_size = 0;
let mut comp_size = 0;