From 4771fdf0fb448d72e3b386dee78b308cd40f1c45 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Fri, 11 Mar 2016 13:50:39 +0100 Subject: [PATCH] Rearrange journaldb infrastructure. --- ethcore/src/block_queue.rs | 1 + ethcore/src/client/client.rs | 10 +- util/src/journaldb/archivedb.rs | 387 ++++++++++++++++++ util/src/journaldb/mod.rs | 33 ++ .../optiononedb.rs} | 279 +++++-------- util/src/journaldb/traits.rs | 37 ++ 6 files changed, 571 insertions(+), 176 deletions(-) create mode 100644 util/src/journaldb/archivedb.rs create mode 100644 util/src/journaldb/mod.rs rename util/src/{journaldb.rs => journaldb/optiononedb.rs} (88%) create mode 100644 util/src/journaldb/traits.rs diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 4e335f705..8b7143b0b 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -412,6 +412,7 @@ impl BlockQueue { } } + /// Optimise memory footprint of the heap fields. pub fn collect_garbage(&self) { { self.verification.unverified.lock().unwrap().shrink_to_fit(); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 3c8a28380..70a3eb92a 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -139,8 +139,14 @@ impl Client where V: Verifier { state_path.push("state"); let engine = Arc::new(try!(spec.to_engine())); - let mut state_db = Box::new(OptionOneDB::from_prefs(state_path.to_str().unwrap(), config.prefer_journal)); - if state_db.is_empty() && engine.spec().ensure_db_good(state_db.deref_mut()) { + let state_path_str = state_path.to_str().unwrap(); + let mut state_db = if config.prefer_journal { + new_optiononedb(state_path_str) + } else { + new_archivedb(state_path_str) + }; + + if state_db.is_empty() && engine.spec().ensure_db_good(state_db.as_hashdb_mut()) { state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs new file mode 100644 index 000000000..28cc4130a --- /dev/null +++ b/util/src/journaldb/archivedb.rs @@ -0,0 +1,387 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Disk-backed HashDB implementation. + +use common::*; +use rlp::*; +use hashdb::*; +use memorydb::*; +use super::traits::JournalDB; +use kvdb::{Database, DBTransaction, DatabaseConfig}; +#[cfg(test)] +use std::env; + +/// Implementation of the HashDB trait for a disk-backed database with a memory overlay +/// and latent-removal semantics. +/// +/// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to +/// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect +/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before +/// the removals actually take effect. +pub struct ArchiveDB { + overlay: MemoryDB, + backing: Arc, +} + +// all keys must be at least 12 bytes +const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const DB_VERSION : u32 = 259; + +impl ArchiveDB { + /// Create a new instance from file + pub fn new(path: &str) -> ArchiveDB { + let opts = DatabaseConfig { + prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix + }; + let backing = Database::open(&opts, path).unwrap_or_else(|e| { + panic!("Error opening state db: {}", e); + }); + if !backing.is_empty() { + match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { + Ok(Some(DB_VERSION)) => {}, + v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) + } + } else { + backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); + } + + ArchiveDB { + overlay: MemoryDB::new(), + backing: Arc::new(backing), + } + } + + /// Create a new instance with an anonymous temporary database. + #[cfg(test)] + fn new_temp() -> ArchiveDB { + let mut dir = env::temp_dir(); + dir.push(H32::random().hex()); + Self::new(dir.to_str().unwrap()) + } + + fn payload(&self, key: &H256) -> Option { + self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) + } +} + +impl HashDB for ArchiveDB { + fn keys(&self) -> HashMap { + let mut ret: HashMap = HashMap::new(); + for (key, _) in self.backing.iter() { + let h = H256::from_slice(key.deref()); + ret.insert(h, 1); + } + + for (key, refs) in self.overlay.keys().into_iter() { + let refs = *ret.get(&key).unwrap_or(&0) + refs; + ret.insert(key, refs); + } + ret + } + + fn lookup(&self, key: &H256) -> Option<&[u8]> { + let k = self.overlay.raw(key); + match k { + Some(&(ref d, rc)) if rc > 0 => Some(d), + _ => { + if let Some(x) = self.payload(key) { + Some(&self.overlay.denote(key, x).0) + } + else { + None + } + } + } + } + + fn exists(&self, key: &H256) -> bool { + self.lookup(key).is_some() + } + + fn insert(&mut self, value: &[u8]) -> H256 { + self.overlay.insert(value) + } + fn emplace(&mut self, key: H256, value: Bytes) { + self.overlay.emplace(key, value); + } + fn kill(&mut self, key: &H256) { + self.overlay.kill(key); + } +} + +impl JournalDB for ArchiveDB { + fn spawn(&self) -> Box { + Box::new(ArchiveDB { + overlay: MemoryDB::new(), + backing: self.backing.clone(), + }) + } + + fn mem_used(&self) -> usize { + self.overlay.mem_used() + } + + fn is_empty(&self) -> bool { + self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + } + + fn commit(&mut self, _: u64, _: &H256, _: Option<(u64, H256)>) -> Result { + let batch = DBTransaction::new(); + let mut inserts = 0usize; + let mut deletes = 0usize; + for i in self.overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc > 0 { + assert!(rc == 1); + batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); + inserts += 1; + } + if rc < 0 { + assert!(rc == -1); + deletes += 1; + } + } + try!(self.backing.write(batch)); + Ok((inserts + deletes) as u32) + } +} + +#[cfg(test)] +mod tests { + use common::*; + use super::*; + use hashdb::*; + + #[test] + fn insert_same_in_fork() { + // history is 1 + let mut jdb = ArchiveDB::new_temp(); + + let x = jdb.insert(b"X"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), None).unwrap(); + jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap(); + jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.remove(&x); + jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap(); + let x = jdb.insert(b"X"); + jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap(); + jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap(); + + assert!(jdb.exists(&x)); + } + + #[test] + fn long_history() { + // history is 3 + let mut jdb = ArchiveDB::new_temp(); + let h = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&h)); + jdb.remove(&h); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + assert!(jdb.exists(&h)); + jdb.commit(2, &b"2".sha3(), None).unwrap(); + assert!(jdb.exists(&h)); + jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&h)); + jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap(); + } + + #[test] + fn complex() { + // history is 1 + let mut jdb = ArchiveDB::new_temp(); + + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + + jdb.remove(&foo); + jdb.remove(&bar); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + assert!(jdb.exists(&baz)); + + let foo = jdb.insert(b"foo"); + jdb.remove(&baz); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&baz)); + + jdb.remove(&foo); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + + jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap(); + } + + #[test] + fn fork() { + // history is 1 + let mut jdb = ArchiveDB::new_temp(); + + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + assert!(jdb.exists(&baz)); + + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } + + #[test] + fn overwrite() { + // history is 1 + let mut jdb = ArchiveDB::new_temp(); + + let foo = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&foo)); + + jdb.remove(&foo); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + jdb.insert(b"foo"); + assert!(jdb.exists(&foo)); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } + + #[test] + fn fork_same_key() { + // history is 1 + let mut jdb = ArchiveDB::new_temp(); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + + let foo = jdb.insert(b"foo"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.insert(b"foo"); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + + jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } + + + #[test] + fn reopen() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.emplace(bar.clone(), b"bar".to_vec()); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + } + + { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + } + } + + #[test] + fn reopen_remove() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + + let foo = { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + + // foo is ancient history. + + jdb.insert(b"foo"); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + foo + }; + + { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.remove(&foo); + jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap(); + jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap(); + } + } + #[test] + fn reopen_fork() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let (foo, bar, baz) = { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + (foo, bar, baz) + }; + + { + let mut jdb = ArchiveDB::new(dir.to_str().unwrap()); + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } + } +} diff --git a/util/src/journaldb/mod.rs b/util/src/journaldb/mod.rs new file mode 100644 index 000000000..fdb825d51 --- /dev/null +++ b/util/src/journaldb/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! JournalDB interface and implementation. + +use common::*; + +/// Export the journaldb module. +pub mod traits; +mod archivedb; +mod optiononedb; + +/// Export the JournalDB trait. +pub use self::traits::JournalDB; + +/// Create a new JournalDB trait object which is an ArchiveDB. +pub fn new_archivedb(path: &str) -> Box { Box::new(archivedb::ArchiveDB::new(path)) } + +/// Create a new JournalDB trait object which is an OptionOneDB. +pub fn new_optiononedb(path: &str) -> Box { Box::new(optiononedb::OptionOneDB::new(path)) } diff --git a/util/src/journaldb.rs b/util/src/journaldb/optiononedb.rs similarity index 88% rename from util/src/journaldb.rs rename to util/src/journaldb/optiononedb.rs index e7a670a08..58bb88277 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb/optiononedb.rs @@ -20,26 +20,11 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; +use super::traits::JournalDB; use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] use std::env; -/// A HashDB which can manage a short-term journal potentially containing many forks of mutually -/// exclusive actions. -pub trait JournalDB : HashDB + Sync + Send { - /// Return a copy of ourself, in a box. - fn spawn(&self) -> Box; - - /// Returns heap memory size used - fn mem_used(&self) -> usize; - - /// Check if this database has any commits - fn is_empty(&self) -> bool; - - /// Commit all recent insert operations. - fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result; -} - /// Implementation of the HashDB trait for a disk-backed database with a memory overlay /// and latent-removal semantics. /// @@ -56,43 +41,28 @@ pub struct OptionOneDB { // all keys must be at least 12 bytes const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; - const DB_VERSION : u32 = 3; -const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; - const PADDING : [u8; 10] = [ 0u8; 10 ]; impl OptionOneDB { /// Create a new instance from file pub fn new(path: &str) -> OptionOneDB { - Self::from_prefs(path, true) - } - - /// Create a new instance from file - pub fn from_prefs(path: &str, prefer_journal: bool) -> OptionOneDB { let opts = DatabaseConfig { prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix }; let backing = Database::open(&opts, path).unwrap_or_else(|e| { panic!("Error opening state db: {}", e); }); - let with_journal; if !backing.is_empty() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => { with_journal = true; }, - Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, + Ok(Some(DB_VERSION)) => {}, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); - with_journal = prefer_journal; + backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); } - - let counters = if with_journal { - Some(Arc::new(RwLock::new(OptionOneDB::read_counters(&backing)))) - } else { - None - }; + + let counters = Some(Arc::new(RwLock::new(OptionOneDB::read_counters(&backing)))); OptionOneDB { overlay: MemoryDB::new(), backing: Arc::new(backing), @@ -108,34 +78,6 @@ impl OptionOneDB { Self::new(dir.to_str().unwrap()) } - /// Drain the overlay and place it into a batch for the DB. - fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { - let mut inserts = 0usize; - let mut deletes = 0usize; - for i in overlay.drain().into_iter() { - let (key, (value, rc)) = i; - if rc > 0 { - assert!(rc == 1); - batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); - inserts += 1; - } - if rc < 0 { - assert!(rc == -1); - deletes += 1; - } - } - trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); - inserts + deletes - } - - /// Just commit the overlay into the backing DB. - fn commit_without_counters(&mut self) -> Result { - let batch = DBTransaction::new(); - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); - try!(self.backing.write(batch)); - Ok(ret as u32) - } - fn morph_key(key: &H256, index: u8) -> Bytes { let mut ret = key.bytes().to_owned(); ret.push(index); @@ -217,9 +159,106 @@ impl OptionOneDB { } } - /// Commit all recent insert operations and historical removals from the old era - /// to the backing database. - fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + fn payload(&self, key: &H256) -> Option { + self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) + } + + fn read_counters(db: &Database) -> HashMap { + let mut counters = HashMap::new(); + if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { + let mut era = decode::(&val); + loop { + let mut index = 0usize; + while let Some(rlp_data) = db.get({ + let mut r = RlpStream::new_list(3); + r.append(&era); + r.append(&index); + r.append(&&PADDING[..]); + &r.drain() + }).expect("Low-level database error.") { + trace!("read_counters: era={}, index={}", era, index); + let rlp = Rlp::new(&rlp_data); + let inserts: Vec = rlp.val_at(1); + Self::replay_keys(&inserts, db, &mut counters); + index += 1; + }; + if index == 0 || era == 0 { + break; + } + era -= 1; + } + } + trace!("Recovered {} counters", counters.len()); + counters + } +} + +impl HashDB for OptionOneDB { + fn keys(&self) -> HashMap { + let mut ret: HashMap = HashMap::new(); + for (key, _) in self.backing.iter() { + let h = H256::from_slice(key.deref()); + ret.insert(h, 1); + } + + for (key, refs) in self.overlay.keys().into_iter() { + let refs = *ret.get(&key).unwrap_or(&0) + refs; + ret.insert(key, refs); + } + ret + } + + fn lookup(&self, key: &H256) -> Option<&[u8]> { + let k = self.overlay.raw(key); + match k { + Some(&(ref d, rc)) if rc > 0 => Some(d), + _ => { + if let Some(x) = self.payload(key) { + Some(&self.overlay.denote(key, x).0) + } + else { + None + } + } + } + } + + fn exists(&self, key: &H256) -> bool { + self.lookup(key).is_some() + } + + fn insert(&mut self, value: &[u8]) -> H256 { + self.overlay.insert(value) + } + fn emplace(&mut self, key: H256, value: Bytes) { + self.overlay.emplace(key, value); + } + fn kill(&mut self, key: &H256) { + self.overlay.kill(key); + } +} + +impl JournalDB for OptionOneDB { + fn spawn(&self) -> Box { + Box::new(OptionOneDB { + overlay: MemoryDB::new(), + backing: self.backing.clone(), + counters: self.counters.clone(), + }) + } + + fn mem_used(&self) -> usize { + self.overlay.mem_used() + match self.counters { + Some(ref c) => c.read().unwrap().heap_size_of_children(), + None => 0 + } + } + + fn is_empty(&self) -> bool { + self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + } + + fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -337,114 +376,6 @@ impl OptionOneDB { // trace!("OptionOneDB::commit() deleted {} nodes", deletes); Ok(0) } - - fn payload(&self, key: &H256) -> Option { - self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) - } - - fn read_counters(db: &Database) -> HashMap { - let mut counters = HashMap::new(); - if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { - let mut era = decode::(&val); - loop { - let mut index = 0usize; - while let Some(rlp_data) = db.get({ - let mut r = RlpStream::new_list(3); - r.append(&era); - r.append(&index); - r.append(&&PADDING[..]); - &r.drain() - }).expect("Low-level database error.") { - trace!("read_counters: era={}, index={}", era, index); - let rlp = Rlp::new(&rlp_data); - let inserts: Vec = rlp.val_at(1); - Self::replay_keys(&inserts, db, &mut counters); - index += 1; - }; - if index == 0 || era == 0 { - break; - } - era -= 1; - } - } - trace!("Recovered {} counters", counters.len()); - counters - } -} - -impl HashDB for OptionOneDB { - fn keys(&self) -> HashMap { - let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iter() { - let h = H256::from_slice(key.deref()); - ret.insert(h, 1); - } - - for (key, refs) in self.overlay.keys().into_iter() { - let refs = *ret.get(&key).unwrap_or(&0) + refs; - ret.insert(key, refs); - } - ret - } - - fn lookup(&self, key: &H256) -> Option<&[u8]> { - let k = self.overlay.raw(key); - match k { - Some(&(ref d, rc)) if rc > 0 => Some(d), - _ => { - if let Some(x) = self.payload(key) { - Some(&self.overlay.denote(key, x).0) - } - else { - None - } - } - } - } - - fn exists(&self, key: &H256) -> bool { - self.lookup(key).is_some() - } - - fn insert(&mut self, value: &[u8]) -> H256 { - self.overlay.insert(value) - } - fn emplace(&mut self, key: H256, value: Bytes) { - self.overlay.emplace(key, value); - } - fn kill(&mut self, key: &H256) { - self.overlay.kill(key); - } -} - -impl JournalDB for OptionOneDB { - fn spawn(&self) -> Box { - Box::new(OptionOneDB { - overlay: MemoryDB::new(), - backing: self.backing.clone(), - counters: self.counters.clone(), - }) - } - - fn mem_used(&self) -> usize { - self.overlay.mem_used() + match self.counters { - Some(ref c) => c.read().unwrap().heap_size_of_children(), - None => 0 - } - } - - fn is_empty(&self) -> bool { - self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() - } - - fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - let have_counters = self.counters.is_some(); - if have_counters { - self.commit_with_counters(now, id, end) - } else { - self.commit_without_counters() - } - } } #[cfg(test)] diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs new file mode 100644 index 000000000..25e132339 --- /dev/null +++ b/util/src/journaldb/traits.rs @@ -0,0 +1,37 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Disk-backed HashDB implementation. + +use common::*; +use hashdb::*; + +/// A HashDB which can manage a short-term journal potentially containing many forks of mutually +/// exclusive actions. +pub trait JournalDB : HashDB + Send + Sync { + /// Return a copy of ourself, in a box. + fn spawn(&self) -> Box; + + /// Returns heap memory size used + fn mem_used(&self) -> usize; + + /// Check if this database has any commits + fn is_empty(&self) -> bool; + + /// Commit all recent insert operations and canonical historical commits' removals from the + /// old era to the backing database, reverting any non-canonical historical commit's inserts. + fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result; +}