JournalDB can now operate in "archive" mode.

This commit is contained in:
Gav Wood 2016-03-04 20:19:36 +01:00
parent 00b5fcebe3
commit ba67b67ff3
2 changed files with 70 additions and 28 deletions

View File

@ -25,7 +25,10 @@ use kvdb::{Database, DBTransaction, DatabaseConfig};
use std::env; use std::env;
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay /// Implementation of the HashDB trait for a disk-backed database with a memory overlay
/// and latent-removal semantics. /// and, possibly, latent-removal semantics.
///
/// If `counters` is `None`, then it behaves exactly like OverlayDB. If not it behaves
/// differently:
/// ///
/// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to /// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to
/// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect /// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect
@ -34,7 +37,7 @@ use std::env;
pub struct JournalDB { pub struct JournalDB {
overlay: MemoryDB, overlay: MemoryDB,
backing: Arc<Database>, backing: Arc<Database>,
counters: Arc<RwLock<HashMap<H256, i32>>>, counters: Option<Arc<RwLock<HashMap<H256, i32>>>>,
} }
impl Clone for JournalDB { impl Clone for JournalDB {
@ -51,7 +54,8 @@ impl Clone for JournalDB {
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
const DB_VERSION: u32 = 3; const DB_VERSION : u32 = 3;
const DB_VERSION_NO_JOURNAL : u32 = 3 + 256;
const PADDING : [u8; 10] = [ 0u8; 10 ]; const PADDING : [u8; 10] = [ 0u8; 10 ];
@ -59,25 +63,38 @@ impl JournalDB {
/// Create a new instance from file /// Create a new instance from file
pub fn new(path: &str) -> JournalDB { pub fn new(path: &str) -> JournalDB {
Self::from_prefs(path, true)
}
/// Create a new instance from file
pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB {
let opts = DatabaseConfig { let opts = DatabaseConfig {
prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix
}; };
let backing = Database::open(&opts, path).unwrap_or_else(|e| { let backing = Database::open(&opts, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e); panic!("Error opening state db: {}", e);
}); });
let with_journal;
if !backing.is_empty() { if !backing.is_empty() {
match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
Ok(Some(DB_VERSION)) => {}, Ok(Some(DB_VERSION)) => { with_journal = true; },
Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; },
v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v)
} }
} else { } else {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database");
with_journal = prefer_journal;
} }
let counters = JournalDB::read_counters(&backing);
let counters = if with_journal {
Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing))))
} else {
None
};
JournalDB { JournalDB {
overlay: MemoryDB::new(), overlay: MemoryDB::new(),
backing: Arc::new(backing), backing: Arc::new(backing),
counters: Arc::new(RwLock::new(counters)), counters: counters,
} }
} }
@ -94,9 +111,48 @@ impl JournalDB {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
} }
/// Commit all recent insert operations.
pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let have_counters = self.counters.is_some();
if have_counters {
self.commit_with_counters(now, id, end)
} else {
self.commit_without_counters()
}
}
/// Drain the overlay and place it into a batch for the DB.
fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> (usize, usize) {
let mut ret = 0usize;
let mut deletes = 0usize;
for i in overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
ret += 1;
}
if rc < 0 {
assert!(rc == -1);
ret += 1;
deletes += 1;
}
}
(ret, deletes)
}
/// Just commit the overlay into the backing DB.
fn commit_without_counters(&mut self) -> Result<u32, UtilError> {
let batch = DBTransaction::new();
let (ret, _) = Self::batch_overlay_insertions(&mut self.overlay, &batch);
try!(self.backing.write(batch));
Ok(ret as u32)
}
/// Commit all recent insert operations and historical removals from the old era /// Commit all recent insert operations and historical removals from the old era
/// to the backing database. /// to the backing database.
pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> { fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format: // journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
@ -122,8 +178,8 @@ impl JournalDB {
// record new commit's details. // record new commit's details.
debug!("commit: #{} ({}), end era: {:?}", now, id, end); debug!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut counters = self.counters.as_ref().unwrap().write().unwrap();
let batch = DBTransaction::new(); let batch = DBTransaction::new();
let mut counters = self.counters.write().unwrap();
{ {
let mut index = 0usize; let mut index = 0usize;
let mut last; let mut last;
@ -196,25 +252,11 @@ impl JournalDB {
} }
// Commit overlay insertions // Commit overlay insertions
let mut ret = 0u32; let (ret, deletes) = Self::batch_overlay_insertions(&mut self.overlay, &batch);
let mut deletes = 0usize;
for i in self.overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
ret += 1;
}
if rc < 0 {
assert!(rc == -1);
ret += 1;
deletes += 1;
}
}
try!(self.backing.write(batch)); try!(self.backing.write(batch));
debug!("commit: Deleted {} nodes", deletes); debug!("commit: Deleted {} nodes", deletes);
Ok(ret) Ok(ret as u32)
} }

View File

@ -146,7 +146,7 @@ impl OverlayDB {
}) })
} }
/// Get the refs and value of the given key. /// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool { fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 { if payload.1 > 0 {
let mut s = RlpStream::new_list(2); let mut s = RlpStream::new_list(2);