structured rlp encoding in journaldb (#8047)

* structured rlp encoding in journaldb

* removed redundant code
This commit is contained in:
Marek Kotewicz 2018-03-15 11:14:38 +01:00 committed by Andrew Jones
parent 4d1cb01da0
commit 21cb08586b
7 changed files with 283 additions and 146 deletions

View File

@ -19,7 +19,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::sync::Arc; use std::sync::Arc;
use rlp::*; use rlp::{encode, decode};
use hashdb::*; use hashdb::*;
use super::memorydb::*; use super::memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
@ -46,7 +46,8 @@ pub struct ArchiveDB {
impl ArchiveDB { impl ArchiveDB {
/// Create a new instance from a key-value db. /// Create a new instance from a key-value db.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> ArchiveDB { pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> ArchiveDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val)); let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.")
.map(|val| decode::<u64>(&val));
ArchiveDB { ArchiveDB {
overlay: MemoryDB::new(), overlay: MemoryDB::new(),
backing: backing, backing: backing,

View File

@ -21,7 +21,7 @@ use std::collections::hash_map::Entry;
use std::sync::Arc; use std::sync::Arc;
use parking_lot::RwLock; use parking_lot::RwLock;
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
use rlp::*; use rlp::{encode, decode};
use hashdb::*; use hashdb::*;
use memorydb::*; use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
@ -30,6 +30,7 @@ use kvdb::{KeyValueDB, DBTransaction};
use ethereum_types::H256; use ethereum_types::H256;
use error::{BaseDataError, UtilError}; use error::{BaseDataError, UtilError};
use bytes::Bytes; use bytes::Bytes;
use util::{DatabaseKey, DatabaseValueView, DatabaseValueRef};
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
struct RefInfo { struct RefInfo {
@ -111,8 +112,6 @@ pub struct EarlyMergeDB {
column: Option<u32>, column: Option<u32>,
} }
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl EarlyMergeDB { impl EarlyMergeDB {
/// Create a new instance from file /// Create a new instance from file
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> EarlyMergeDB { pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> EarlyMergeDB {
@ -267,20 +266,17 @@ impl EarlyMergeDB {
let mut era = decode::<u64>(&val); let mut era = decode::<u64>(&val);
latest_era = Some(era); latest_era = Some(era);
loop { loop {
let mut index = 0usize; //let mut index = 0usize;
while let Some(rlp_data) = db.get(col, { let mut db_key = DatabaseKey {
let mut r = RlpStream::new_list(3); era,
r.append(&era); index: 0usize,
r.append(&index);
r.append(&&PADDING[..]);
&r.drain()
}).expect("Low-level database error.") {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.list_at(1);
Self::replay_keys(&inserts, db, col, &mut refs);
index += 1;
}; };
if index == 0 || era == 0 { while let Some(rlp_data) = db.get(col, &encode(&db_key)).expect("Low-level database error.") {
let inserts = DatabaseValueView::new(&rlp_data).inserts().expect("rlp read from db; qed");
Self::replay_keys(&inserts, db, col, &mut refs);
db_key.index += 1;
};
if db_key.index == 0 || era == 0 {
break; break;
} }
era -= 1; era -= 1;
@ -373,18 +369,17 @@ impl JournalDB for EarlyMergeDB {
}; };
{ {
let mut index = 0usize; let mut db_key = DatabaseKey {
era: now,
index: 0usize,
};
let mut last; let mut last;
while self.backing.get(self.column, { while self.backing.get(self.column, {
let mut r = RlpStream::new_list(3); last = encode(&db_key);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last &last
})?.is_some() { })?.is_some() {
index += 1; db_key.index += 1;
} }
let drained = self.overlay.drain(); let drained = self.overlay.drain();
@ -403,28 +398,25 @@ impl JournalDB for EarlyMergeDB {
// TODO: check all removes are in the db. // TODO: check all removes are in the db.
let mut r = RlpStream::new_list(3);
r.append(id);
// Process the new inserts. // Process the new inserts.
// We use the inserts for three things. For each: // We use the inserts for three things. For each:
// - we place into the backing DB or increment the counter if already in; // - we place into the backing DB or increment the counter if already in;
// - we note in the backing db that it was already in; // - we note in the backing db that it was already in;
// - we write the key into our journal for this block; // - we write the key into our journal for this block;
r.begin_list(inserts.len());
for &(k, _) in &inserts {
r.append(&k);
}
r.append_list(&removes);
Self::insert_keys(&inserts, &*self.backing, self.column, &mut refs, batch); Self::insert_keys(&inserts, &*self.backing, self.column, &mut refs, batch);
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>(); let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
let value_ref = DatabaseValueRef {
id,
inserts: &ins,
deletes: &removes,
};
trace!(target: "jdb.ops", " Deletes: {:?}", removes); trace!(target: "jdb.ops", " Deletes: {:?}", removes);
trace!(target: "jdb.ops", " Inserts: {:?}", ins); trace!(target: "jdb.ops", " Inserts: {:?}", ins);
batch.put(self.column, &last, r.as_raw()); batch.put(self.column, &last, &encode(&value_ref));
if self.latest_era.map_or(true, |e| now > e) { if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)); batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now); self.latest_era = Some(now);
@ -438,23 +430,22 @@ impl JournalDB for EarlyMergeDB {
let mut refs = self.refs.as_ref().unwrap().write(); let mut refs = self.refs.as_ref().unwrap().write();
// apply old commits' details // apply old commits' details
let mut index = 0usize; let mut db_key = DatabaseKey {
era: end_era,
index: 0usize,
};
let mut last; let mut last;
while let Some(rlp_data) = self.backing.get(self.column, { while let Some(rlp_data) = {
let mut r = RlpStream::new_list(3); last = encode(&db_key);
r.append(&end_era); self.backing.get(self.column, &last)
r.append(&index); }? {
r.append(&&PADDING[..]); let view = DatabaseValueView::new(&rlp_data);
last = r.drain(); let inserts = view.inserts().expect("rlp read from db; qed");
&last
})? {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.list_at(1);
if canon_id == &rlp.val_at::<H256>(0) { if canon_id == &view.id().expect("rlp read from db; qed") {
// Collect keys to be removed. Canon block - remove the (enacted) deletes. // Collect keys to be removed. Canon block - remove the (enacted) deletes.
let deletes: Vec<H256> = rlp.list_at(2); let deletes = view.deletes().expect("rlp read from db; qed");
trace!(target: "jdb.ops", " Expunging: {:?}", deletes); trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive); Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive);
@ -488,10 +479,10 @@ impl JournalDB for EarlyMergeDB {
} }
batch.delete(self.column, &last); batch.delete(self.column, &last);
index += 1; db_key.index += 1;
} }
trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, db_key.index, canon_id);
trace!(target: "jdb", "OK: {:?}", &*refs); trace!(target: "jdb", "OK: {:?}", &*refs);
Ok(0) Ok(0)

View File

@ -46,6 +46,7 @@ mod archivedb;
mod earlymergedb; mod earlymergedb;
mod overlayrecentdb; mod overlayrecentdb;
mod refcounteddb; mod refcounteddb;
mod util;
pub mod overlaydb; pub mod overlaydb;

View File

@ -21,7 +21,7 @@ use std::collections::HashMap;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use error::{Result, BaseDataError}; use error::{Result, BaseDataError};
use ethereum_types::H256; use ethereum_types::H256;
use rlp::*; use rlp::{UntrustedRlp, RlpStream, Encodable, DecoderError, Decodable, encode, decode};
use hashdb::*; use hashdb::*;
use memorydb::*; use memorydb::*;
use kvdb::{KeyValueDB, DBTransaction}; use kvdb::{KeyValueDB, DBTransaction};
@ -41,6 +41,39 @@ pub struct OverlayDB {
column: Option<u32>, column: Option<u32>,
} }
struct Payload {
count: u32,
value: DBValue,
}
impl Payload {
fn new(count: u32, value: DBValue) -> Self {
Payload {
count,
value,
}
}
}
impl Encodable for Payload {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&self.count);
s.append(&&*self.value);
}
}
impl Decodable for Payload {
fn decode(rlp: &UntrustedRlp) -> ::std::result::Result<Self, DecoderError> {
let payload = Payload {
count: rlp.val_at(0)?,
value: DBValue::from_slice(rlp.at(1)?.data()?),
};
Ok(payload)
}
}
impl OverlayDB { impl OverlayDB {
/// Create a new instance of OverlayDB given a `backing` database. /// Create a new instance of OverlayDB given a `backing` database.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayDB { pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayDB {
@ -71,18 +104,19 @@ impl OverlayDB {
if rc != 0 { if rc != 0 {
match self.payload(&key) { match self.payload(&key) {
Some(x) => { Some(x) => {
let (back_value, back_rc) = x; let total_rc: i32 = x.count as i32 + rc;
let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 { if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key))); return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
} }
deletes += if self.put_payload_in_batch(batch, &key, (back_value, total_rc as u32)) {1} else {0}; let payload = Payload::new(total_rc as u32, x.value);
deletes += if self.put_payload_in_batch(batch, &key, &payload) {1} else {0};
} }
None => { None => {
if rc < 0 { if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key))); return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
} }
self.put_payload_in_batch(batch, &key, (value, rc as u32)); let payload = Payload::new(rc as u32, value);
self.put_payload_in_batch(batch, &key, &payload);
} }
}; };
ret += 1; ret += 1;
@ -100,22 +134,16 @@ impl OverlayDB {
pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(key).map_or(0, |(_, refs)| refs) } pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(key).map_or(0, |(_, refs)| refs) }
/// Get the refs and value of the given key. /// Get the refs and value of the given key.
fn payload(&self, key: &H256) -> Option<(DBValue, u32)> { fn payload(&self, key: &H256) -> Option<Payload> {
self.backing.get(self.column, key) self.backing.get(self.column, key)
.expect("Low-level database error. Some issue with your hard disk?") .expect("Low-level database error. Some issue with your hard disk?")
.map(|d| { .map(|d| decode(&d))
let r = Rlp::new(&d);
(DBValue::from_slice(r.at(1).data()), r.at(0).as_val())
})
} }
/// Put the refs and value of the given key, possibly deleting it from the db. /// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload_in_batch(&self, batch: &mut DBTransaction, key: &H256, payload: (DBValue, u32)) -> bool { fn put_payload_in_batch(&self, batch: &mut DBTransaction, key: &H256, payload: &Payload) -> bool {
if payload.1 > 0 { if payload.count > 0 {
let mut s = RlpStream::new_list(2); batch.put(self.column, key, &encode(payload));
s.append(&payload.1);
s.append(&&*payload.0);
batch.put(self.column, key, s.as_raw());
false false
} else { } else {
batch.delete(self.column, key); batch.delete(self.column, key);
@ -129,7 +157,7 @@ impl HashDB for OverlayDB {
let mut ret: HashMap<H256, i32> = self.backing.iter(self.column) let mut ret: HashMap<H256, i32> = self.backing.iter(self.column)
.map(|(key, _)| { .map(|(key, _)| {
let h = H256::from_slice(&*key); let h = H256::from_slice(&*key);
let r = self.payload(&h).unwrap().1; let r = self.payload(&h).unwrap().count;
(h, r as i32) (h, r as i32)
}) })
.collect(); .collect();
@ -161,9 +189,8 @@ impl HashDB for OverlayDB {
}; };
match self.payload(key) { match self.payload(key) {
Some(x) => { Some(x) => {
let (d, rc) = x; if x.count as i32 + memrc > 0 {
if rc as i32 + memrc > 0 { Some(x.value)
Some(d)
} }
else { else {
None None
@ -185,8 +212,7 @@ impl HashDB for OverlayDB {
let memrc = k.map_or(0, |(_, rc)| rc); let memrc = k.map_or(0, |(_, rc)| rc);
match self.payload(key) { match self.payload(key) {
Some(x) => { Some(x) => {
let (_, rc) = x; x.count as i32 + memrc > 0
rc as i32 + memrc > 0
} }
// Replace above match arm with this once https://github.com/rust-lang/rust/issues/15287 is done. // Replace above match arm with this once https://github.com/rust-lang/rust/issues/15287 is done.
//Some((d, rc)) if rc + memrc > 0 => true, //Some((d, rc)) if rc + memrc > 0 => true,

View File

@ -21,7 +21,7 @@ use std::collections::hash_map::Entry;
use std::sync::Arc; use std::sync::Arc;
use parking_lot::RwLock; use parking_lot::RwLock;
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
use rlp::*; use rlp::{UntrustedRlp, RlpStream, encode, decode, DecoderError, Decodable, Encodable};
use hashdb::*; use hashdb::*;
use memorydb::*; use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
@ -31,6 +31,7 @@ use ethereum_types::H256;
use plain_hasher::H256FastMap; use plain_hasher::H256FastMap;
use error::{BaseDataError, UtilError}; use error::{BaseDataError, UtilError};
use bytes::Bytes; use bytes::Bytes;
use util::DatabaseKey;
/// Implementation of the `JournalDB` trait for a disk-backed database with a memory overlay /// Implementation of the `JournalDB` trait for a disk-backed database with a memory overlay
/// and, possibly, latent-removal semantics. /// and, possibly, latent-removal semantics.
@ -70,6 +71,52 @@ pub struct OverlayRecentDB {
column: Option<u32>, column: Option<u32>,
} }
struct DatabaseValue {
id: H256,
inserts: Vec<(H256, DBValue)>,
deletes: Vec<H256>,
}
impl Decodable for DatabaseValue {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let id = rlp.val_at(0)?;
let inserts = rlp.at(1)?.iter().map(|r| {
let k = r.val_at(0)?;
let v = DBValue::from_slice(r.at(1)?.data()?);
Ok((k, v))
}).collect::<Result<Vec<_>, _>>()?;
let deletes = rlp.list_at(2)?;
let value = DatabaseValue {
id,
inserts,
deletes,
};
Ok(value)
}
}
struct DatabaseValueRef<'a> {
id: &'a H256,
inserts: &'a [(H256, DBValue)],
deletes: &'a [H256],
}
impl<'a> Encodable for DatabaseValueRef<'a> {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(3);
s.append(self.id);
s.begin_list(self.inserts.len());
for kv in self.inserts {
s.begin_list(2);
s.append(&kv.0);
s.append(&&*kv.1);
}
s.append_list(self.deletes);
}
}
#[derive(PartialEq)] #[derive(PartialEq)]
struct JournalOverlay { struct JournalOverlay {
backing_overlay: MemoryDB, // Nodes added in the history period backing_overlay: MemoryDB, // Nodes added in the history period
@ -104,8 +151,6 @@ impl Clone for OverlayRecentDB {
} }
} }
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl OverlayRecentDB { impl OverlayRecentDB {
/// Create a new instance. /// Create a new instance.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayRecentDB { pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayRecentDB {
@ -144,43 +189,34 @@ impl OverlayRecentDB {
let mut era = decode::<u64>(&val); let mut era = decode::<u64>(&val);
latest_era = Some(era); latest_era = Some(era);
loop { loop {
let mut index = 0usize; let mut db_key = DatabaseKey {
while let Some(rlp_data) = db.get(col, { era,
let mut r = RlpStream::new_list(3); index: 0usize,
r.append(&era); };
r.append(&index); while let Some(rlp_data) = db.get(col, &encode(&db_key)).expect("Low-level database error.") {
r.append(&&PADDING[..]); trace!("read_overlay: era={}, index={}", era, db_key.index);
&r.drain() let value = decode::<DatabaseValue>(&rlp_data);
}).expect("Low-level database error.") { count += value.inserts.len();
trace!("read_overlay: era={}, index={}", era, index);
let rlp = Rlp::new(&rlp_data);
let id: H256 = rlp.val_at(0);
let insertions = rlp.at(1);
let deletions: Vec<H256> = rlp.list_at(2);
let mut inserted_keys = Vec::new(); let mut inserted_keys = Vec::new();
for r in insertions.iter() { for (k, v) in value.inserts {
let k: H256 = r.val_at(0);
let v = r.at(1).data();
let short_key = to_short_key(&k); let short_key = to_short_key(&k);
if !overlay.contains(&short_key) { if !overlay.contains(&short_key) {
cumulative_size += v.len(); cumulative_size += v.len();
} }
overlay.emplace(short_key, DBValue::from_slice(v)); overlay.emplace(short_key, v);
inserted_keys.push(k); inserted_keys.push(k);
count += 1;
} }
journal.entry(era).or_insert_with(Vec::new).push(JournalEntry { journal.entry(era).or_insert_with(Vec::new).push(JournalEntry {
id: id, id: value.id,
insertions: inserted_keys, insertions: inserted_keys,
deletions: deletions, deletions: value.deletes,
}); });
index += 1; db_key.index += 1;
earliest_era = Some(era); earliest_era = Some(era);
}; };
if index == 0 || era == 0 { if db_key.index == 0 || era == 0 {
break; break;
} }
era -= 1; era -= 1;
@ -196,8 +232,6 @@ impl OverlayRecentDB {
cumulative_size: cumulative_size, cumulative_size: cumulative_size,
} }
} }
} }
#[inline] #[inline]
@ -256,22 +290,24 @@ impl JournalDB for OverlayRecentDB {
// flush previous changes // flush previous changes
journal_overlay.pending_overlay.clear(); journal_overlay.pending_overlay.clear();
let mut r = RlpStream::new_list(3);
let mut tx = self.transaction_overlay.drain(); let mut tx = self.transaction_overlay.drain();
let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect(); let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect();
let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect(); let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect();
let ops = inserted_keys.len() + removed_keys.len(); let ops = inserted_keys.len() + removed_keys.len();
// Increase counter for each inserted key no matter if the block is canonical or not. // Increase counter for each inserted key no matter if the block is canonical or not.
let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None }); let insertions: Vec<_> = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None }).collect();
let encoded_value = {
let value_ref = DatabaseValueRef {
id,
inserts: &insertions,
deletes: &removed_keys,
};
encode(&value_ref)
};
r.append(id);
r.begin_list(inserted_keys.len());
for (k, v) in insertions { for (k, v) in insertions {
r.begin_list(2);
r.append(&k);
r.append(&&*v);
let short_key = to_short_key(&k); let short_key = to_short_key(&k);
if !journal_overlay.backing_overlay.contains(&short_key) { if !journal_overlay.backing_overlay.contains(&short_key) {
journal_overlay.cumulative_size += v.len(); journal_overlay.cumulative_size += v.len();
@ -279,14 +315,14 @@ impl JournalDB for OverlayRecentDB {
journal_overlay.backing_overlay.emplace(short_key, v); journal_overlay.backing_overlay.emplace(short_key, v);
} }
r.append_list(&removed_keys);
let mut k = RlpStream::new_list(3);
let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len()); let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len());
k.append(&now); let db_key = DatabaseKey {
k.append(&index); era: now,
k.append(&&PADDING[..]); index,
batch.put_vec(self.column, &k.drain(), r.out()); };
batch.put_vec(self.column, &encode(&db_key), encoded_value.into_vec());
if journal_overlay.latest_era.map_or(true, |e| now > e) { if journal_overlay.latest_era.map_or(true, |e| now > e) {
trace!(target: "journaldb", "Set latest era to {}", now); trace!(target: "journaldb", "Set latest era to {}", now);
batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).into_vec()); batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).into_vec());
@ -317,11 +353,11 @@ impl JournalDB for OverlayRecentDB {
let mut index = 0usize; let mut index = 0usize;
for mut journal in records.drain(..) { for mut journal in records.drain(..) {
//delete the record from the db //delete the record from the db
let mut r = RlpStream::new_list(3); let db_key = DatabaseKey {
r.append(&end_era); era: end_era,
r.append(&index); index,
r.append(&&PADDING[..]); };
batch.delete(self.column, &r.drain()); batch.delete(self.column, &encode(&db_key));
trace!(target: "journaldb", "Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); trace!(target: "journaldb", "Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
{ {
if *canon_id == journal.id { if *canon_id == journal.id {

View File

@ -19,7 +19,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
use rlp::*; use rlp::{encode, decode};
use hashdb::*; use hashdb::*;
use overlaydb::OverlayDB; use overlaydb::OverlayDB;
use memorydb::MemoryDB; use memorydb::MemoryDB;
@ -29,6 +29,7 @@ use kvdb::{KeyValueDB, DBTransaction};
use ethereum_types::H256; use ethereum_types::H256;
use error::UtilError; use error::UtilError;
use bytes::Bytes; use bytes::Bytes;
use util::{DatabaseKey, DatabaseValueView, DatabaseValueRef};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics. /// and latent-removal semantics.
@ -59,12 +60,11 @@ pub struct RefCountedDB {
column: Option<u32>, column: Option<u32>,
} }
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB { impl RefCountedDB {
/// Create a new instance given a `backing` database. /// Create a new instance given a `backing` database.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> RefCountedDB { pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> RefCountedDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val)); let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.")
.map(|val| decode::<u64>(&val));
RefCountedDB { RefCountedDB {
forward: OverlayDB::new(backing.clone(), col), forward: OverlayDB::new(backing.clone(), col),
@ -118,29 +118,32 @@ impl JournalDB for RefCountedDB {
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> { fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> Result<u32, UtilError> {
// record new commit's details. // record new commit's details.
let mut index = 0usize; let mut db_key = DatabaseKey {
era: now,
index: 0usize,
};
let mut last; let mut last;
while self.backing.get(self.column, { while self.backing.get(self.column, {
let mut r = RlpStream::new_list(3); last = encode(&db_key);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last &last
})?.is_some() { })?.is_some() {
index += 1; db_key.index += 1;
} }
let mut r = RlpStream::new_list(3); {
r.append(id); let value_ref = DatabaseValueRef {
r.append_list(&self.inserts); id,
r.append_list(&self.removes); inserts: &self.inserts,
batch.put(self.column, &last, r.as_raw()); deletes: &self.removes,
};
batch.put(self.column, &last, &encode(&value_ref));
}
let ops = self.inserts.len() + self.removes.len(); let ops = self.inserts.len() + self.removes.len();
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes); trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, db_key.index, id, self.inserts, self.removes);
self.inserts.clear(); self.inserts.clear();
self.removes.clear(); self.removes.clear();
@ -155,27 +158,30 @@ impl JournalDB for RefCountedDB {
fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> { fn mark_canonical(&mut self, batch: &mut DBTransaction, end_era: u64, canon_id: &H256) -> Result<u32, UtilError> {
// apply old commits' details // apply old commits' details
let mut index = 0usize; let mut db_key = DatabaseKey {
era: end_era,
index: 0usize,
};
let mut last; let mut last;
while let Some(rlp_data) = { while let Some(rlp_data) = {
self.backing.get(self.column, { self.backing.get(self.column, {
let mut r = RlpStream::new_list(3); last = encode(&db_key);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last &last
})? })?
} { } {
let rlp = Rlp::new(&rlp_data); let view = DatabaseValueView::new(&rlp_data);
let our_id: H256 = rlp.val_at(0); let our_id = view.id().expect("rlp read from db; qed");
let to_remove: Vec<H256> = rlp.list_at(if *canon_id == our_id {2} else {1}); let to_remove = if canon_id == &our_id {
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove); view.deletes()
} else {
view.inserts()
}.expect("rlp read from db; qed");
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, db_key.index, our_id, canon_id, to_remove);
for i in &to_remove { for i in &to_remove {
self.forward.remove(i); self.forward.remove(i);
} }
batch.delete(self.column, &last); batch.delete(self.column, &last);
index += 1; db_key.index += 1;
} }
let r = self.forward.commit_to_batch(batch)?; let r = self.forward.commit_to_batch(batch)?;

View File

@ -0,0 +1,76 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::H256;
use rlp::{RlpStream, Encodable, UntrustedRlp, DecoderError};
const PADDING : [u8; 10] = [ 0u8; 10 ];
pub struct DatabaseKey {
pub era: u64,
pub index: usize,
}
impl Encodable for DatabaseKey {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(3);
s.append(&self.era);
s.append(&self.index);
s.append(&&PADDING[..]);
}
}
pub struct DatabaseValueView<'a> {
rlp: UntrustedRlp<'a>,
}
impl<'a> DatabaseValueView<'a> {
pub fn new(data: &'a [u8]) -> Self {
DatabaseValueView {
rlp: UntrustedRlp::new(data),
}
}
#[inline]
pub fn id(&self) -> Result<H256, DecoderError> {
self.rlp.val_at(0)
}
#[inline]
pub fn inserts(&self) -> Result<Vec<H256>, DecoderError> {
self.rlp.list_at(1)
}
#[inline]
pub fn deletes(&self) -> Result<Vec<H256>, DecoderError> {
self.rlp.list_at(2)
}
}
pub struct DatabaseValueRef<'a> {
pub id: &'a H256,
pub inserts: &'a [H256],
pub deletes: &'a [H256],
}
impl<'a> Encodable for DatabaseValueRef<'a> {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(3);
s.append(self.id);
s.append_list(self.inserts);
s.append_list(self.deletes);
}
}