More performance optimizations (#1814)
* Buffered DB * Use identity hash for MemoryDB * Various tweaks * Delayed DB compression * Reduce last_hashes cloning * Keep state cache * Updating tests * Optimized to_big_int * Fixing build with stable * Safer code
This commit is contained in:
committed by
Gav Wood
parent
deceb5fd56
commit
7093651d70
@@ -20,7 +20,8 @@ use rustc_serialize::hex::FromHex;
|
||||
use std::{ops, fmt, cmp};
|
||||
use std::cmp::*;
|
||||
use std::ops::*;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::hash::{Hash, Hasher, BuildHasherDefault};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::str::FromStr;
|
||||
use math::log2;
|
||||
use error::UtilError;
|
||||
@@ -539,6 +540,38 @@ impl_hash!(H520, 65);
|
||||
impl_hash!(H1024, 128);
|
||||
impl_hash!(H2048, 256);
|
||||
|
||||
// Specialized HashMap and HashSet
|
||||
|
||||
/// Hasher that just takes 8 bytes of the provided value.
|
||||
pub struct PlainHasher(u64);
|
||||
|
||||
impl Default for PlainHasher {
|
||||
#[inline]
|
||||
fn default() -> PlainHasher {
|
||||
PlainHasher(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Hasher for PlainHasher {
|
||||
#[inline]
|
||||
fn finish(&self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write(&mut self, bytes: &[u8]) {
|
||||
debug_assert!(bytes.len() == 32);
|
||||
let mut prefix = [0u8; 8];
|
||||
prefix.clone_from_slice(&bytes[0..8]);
|
||||
self.0 = unsafe { ::std::mem::transmute(prefix) };
|
||||
}
|
||||
}
|
||||
|
||||
/// Specialized version of HashMap with H256 keys and fast hashing function.
|
||||
pub type H256FastMap<T> = HashMap<H256, T, BuildHasherDefault<PlainHasher>>;
|
||||
/// Specialized version of HashSet with H256 keys and fast hashing function.
|
||||
pub type H256FastSet = HashSet<H256, BuildHasherDefault<PlainHasher>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use hash::*;
|
||||
|
||||
@@ -126,7 +126,7 @@ impl OverlayRecentDB {
|
||||
}
|
||||
|
||||
fn payload(&self, key: &H256) -> Option<Bytes> {
|
||||
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
|
||||
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?")
|
||||
}
|
||||
|
||||
fn read_overlay(db: &Database, col: Option<u32>) -> JournalOverlay {
|
||||
@@ -239,9 +239,9 @@ impl JournalDB for OverlayRecentDB {
|
||||
k.append(&now);
|
||||
k.append(&index);
|
||||
k.append(&&PADDING[..]);
|
||||
try!(batch.put(self.column, &k.drain(), r.as_raw()));
|
||||
try!(batch.put_vec(self.column, &k.drain(), r.out()));
|
||||
if journal_overlay.latest_era.map_or(true, |e| now > e) {
|
||||
try!(batch.put(self.column, &LATEST_ERA_KEY, &encode(&now)));
|
||||
try!(batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec()));
|
||||
journal_overlay.latest_era = Some(now);
|
||||
}
|
||||
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
|
||||
@@ -280,7 +280,7 @@ impl JournalDB for OverlayRecentDB {
|
||||
}
|
||||
// apply canon inserts first
|
||||
for (k, v) in canon_insertions {
|
||||
try!(batch.put(self.column, &k, &v));
|
||||
try!(batch.put_vec(self.column, &k, v));
|
||||
}
|
||||
// update the overlay
|
||||
for k in overlay_deletions {
|
||||
|
||||
191
util/src/kvdb.rs
191
util/src/kvdb.rs
@@ -16,8 +16,11 @@
|
||||
|
||||
//! Key-Value store abstraction with `RocksDB` backend.
|
||||
|
||||
use common::*;
|
||||
use elastic_array::*;
|
||||
use std::default::Default;
|
||||
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBVector, DBIterator,
|
||||
use rlp::{UntrustedRlp, RlpType, View, Compressible};
|
||||
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
|
||||
Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache, Column};
|
||||
|
||||
const DB_BACKGROUND_FLUSHES: i32 = 2;
|
||||
@@ -25,30 +28,89 @@ const DB_BACKGROUND_COMPACTIONS: i32 = 2;
|
||||
|
||||
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
|
||||
pub struct DBTransaction {
|
||||
batch: WriteBatch,
|
||||
cfs: Vec<Column>,
|
||||
ops: RwLock<Vec<DBOp>>,
|
||||
}
|
||||
|
||||
enum DBOp {
|
||||
Insert {
|
||||
col: Option<u32>,
|
||||
key: ElasticArray32<u8>,
|
||||
value: Bytes,
|
||||
},
|
||||
InsertCompressed {
|
||||
col: Option<u32>,
|
||||
key: ElasticArray32<u8>,
|
||||
value: Bytes,
|
||||
},
|
||||
Delete {
|
||||
col: Option<u32>,
|
||||
key: ElasticArray32<u8>,
|
||||
}
|
||||
}
|
||||
|
||||
impl DBTransaction {
|
||||
/// Create new transaction.
|
||||
pub fn new(db: &Database) -> DBTransaction {
|
||||
pub fn new(_db: &Database) -> DBTransaction {
|
||||
DBTransaction {
|
||||
batch: WriteBatch::new(),
|
||||
cfs: db.cfs.clone(),
|
||||
ops: RwLock::new(Vec::with_capacity(256)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
|
||||
pub fn put(&self, col: Option<u32>, key: &[u8], value: &[u8]) -> Result<(), String> {
|
||||
col.map_or_else(|| self.batch.put(key, value), |c| self.batch.put_cf(self.cfs[c as usize], key, value))
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.write().push(DBOp::Insert {
|
||||
col: col,
|
||||
key: ekey,
|
||||
value: value.to_vec(),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
|
||||
pub fn put_vec(&self, col: Option<u32>, key: &[u8], value: Bytes) -> Result<(), String> {
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.write().push(DBOp::Insert {
|
||||
col: col,
|
||||
key: ekey,
|
||||
value: value,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
|
||||
/// Value will be RLP-compressed on flush
|
||||
pub fn put_compressed(&self, col: Option<u32>, key: &[u8], value: Bytes) -> Result<(), String> {
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.write().push(DBOp::InsertCompressed {
|
||||
col: col,
|
||||
key: ekey,
|
||||
value: value,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete value by key.
|
||||
pub fn delete(&self, col: Option<u32>, key: &[u8]) -> Result<(), String> {
|
||||
col.map_or_else(|| self.batch.delete(key), |c| self.batch.delete_cf(self.cfs[c as usize], key))
|
||||
let mut ekey = ElasticArray32::new();
|
||||
ekey.append_slice(key);
|
||||
self.ops.write().push(DBOp::Delete {
|
||||
col: col,
|
||||
key: ekey,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct DBColumnOverlay {
|
||||
insertions: HashMap<ElasticArray32<u8>, Bytes>,
|
||||
compressed_insertions: HashMap<ElasticArray32<u8>, Bytes>,
|
||||
deletions: HashSet<ElasticArray32<u8>>,
|
||||
}
|
||||
|
||||
/// Compaction profile for the database settings
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct CompactionProfile {
|
||||
@@ -118,7 +180,7 @@ impl Default for DatabaseConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Database iterator
|
||||
/// Database iterator for flushed data only
|
||||
pub struct DatabaseIterator {
|
||||
iter: DBIterator,
|
||||
}
|
||||
@@ -136,6 +198,7 @@ pub struct Database {
|
||||
db: DB,
|
||||
write_opts: WriteOptions,
|
||||
cfs: Vec<Column>,
|
||||
overlay: RwLock<Vec<DBColumnOverlay>>,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
@@ -209,7 +272,16 @@ impl Database {
|
||||
},
|
||||
Err(s) => { return Err(s); }
|
||||
};
|
||||
Ok(Database { db: db, write_opts: write_opts, cfs: cfs })
|
||||
Ok(Database {
|
||||
db: db,
|
||||
write_opts: write_opts,
|
||||
overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| DBColumnOverlay {
|
||||
insertions: HashMap::new(),
|
||||
compressed_insertions: HashMap::new(),
|
||||
deletions: HashSet::new(),
|
||||
}).collect()),
|
||||
cfs: cfs,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates new transaction for this database.
|
||||
@@ -217,14 +289,107 @@ impl Database {
|
||||
DBTransaction::new(self)
|
||||
}
|
||||
|
||||
|
||||
fn to_overly_column(col: Option<u32>) -> usize {
|
||||
col.map_or(0, |c| (c + 1) as usize)
|
||||
}
|
||||
|
||||
/// Commit transaction to database.
|
||||
pub fn write_buffered(&self, tr: DBTransaction) -> Result<(), String> {
|
||||
let mut overlay = self.overlay.write();
|
||||
let ops = mem::replace(&mut *tr.ops.write(), Vec::new());
|
||||
for op in ops {
|
||||
match op {
|
||||
DBOp::Insert { col, key, value } => {
|
||||
let c = Self::to_overly_column(col);
|
||||
overlay[c].deletions.remove(&key);
|
||||
overlay[c].compressed_insertions.remove(&key);
|
||||
overlay[c].insertions.insert(key, value);
|
||||
},
|
||||
DBOp::InsertCompressed { col, key, value } => {
|
||||
let c = Self::to_overly_column(col);
|
||||
overlay[c].deletions.remove(&key);
|
||||
overlay[c].insertions.remove(&key);
|
||||
overlay[c].compressed_insertions.insert(key, value);
|
||||
},
|
||||
DBOp::Delete { col, key } => {
|
||||
let c = Self::to_overly_column(col);
|
||||
overlay[c].insertions.remove(&key);
|
||||
overlay[c].compressed_insertions.remove(&key);
|
||||
overlay[c].deletions.insert(key);
|
||||
},
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commit buffered changes to database.
|
||||
pub fn flush(&self) -> Result<(), String> {
|
||||
let batch = WriteBatch::new();
|
||||
let mut overlay = self.overlay.write();
|
||||
|
||||
let mut c = 0;
|
||||
for column in overlay.iter_mut() {
|
||||
let insertions = mem::replace(&mut column.insertions, HashMap::new());
|
||||
let compressed_insertions = mem::replace(&mut column.compressed_insertions, HashMap::new());
|
||||
let deletions = mem::replace(&mut column.deletions, HashSet::new());
|
||||
for d in deletions.into_iter() {
|
||||
if c > 0 {
|
||||
try!(batch.delete_cf(self.cfs[c - 1], &d));
|
||||
} else {
|
||||
try!(batch.delete(&d));
|
||||
}
|
||||
}
|
||||
for (key, value) in insertions.into_iter() {
|
||||
if c > 0 {
|
||||
try!(batch.put_cf(self.cfs[c - 1], &key, &value));
|
||||
} else {
|
||||
try!(batch.put(&key, &value));
|
||||
}
|
||||
}
|
||||
for (key, value) in compressed_insertions.into_iter() {
|
||||
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
|
||||
if c > 0 {
|
||||
try!(batch.put_cf(self.cfs[c - 1], &key, &compressed));
|
||||
} else {
|
||||
try!(batch.put(&key, &compressed));
|
||||
}
|
||||
}
|
||||
c += 1;
|
||||
}
|
||||
self.db.write_opt(batch, &self.write_opts)
|
||||
}
|
||||
|
||||
|
||||
/// Commit transaction to database.
|
||||
pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
|
||||
self.db.write_opt(tr.batch, &self.write_opts)
|
||||
let batch = WriteBatch::new();
|
||||
let ops = mem::replace(&mut *tr.ops.write(), Vec::new());
|
||||
for op in ops {
|
||||
match op {
|
||||
DBOp::Insert { col, key, value } => {
|
||||
try!(col.map_or_else(|| batch.put(&key, &value), |c| batch.put_cf(self.cfs[c as usize], &key, &value)))
|
||||
},
|
||||
DBOp::InsertCompressed { col, key, value } => {
|
||||
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
|
||||
try!(col.map_or_else(|| batch.put(&key, &compressed), |c| batch.put_cf(self.cfs[c as usize], &key, &compressed)))
|
||||
},
|
||||
DBOp::Delete { col, key } => {
|
||||
try!(col.map_or_else(|| batch.delete(&key), |c| batch.delete_cf(self.cfs[c as usize], &key)))
|
||||
},
|
||||
}
|
||||
}
|
||||
self.db.write_opt(batch, &self.write_opts)
|
||||
}
|
||||
|
||||
/// Get value by key.
|
||||
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBVector>, String> {
|
||||
col.map_or_else(|| self.db.get(key), |c| self.db.get_cf(self.cfs[c as usize], key))
|
||||
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> {
|
||||
let overlay = &self.overlay.read()[Self::to_overly_column(col)];
|
||||
overlay.insertions.get(key).or_else(|| overlay.compressed_insertions.get(key)).map_or_else(||
|
||||
col.map_or_else(
|
||||
|| self.db.get(key).map(|r| r.map(|v| v.to_vec())),
|
||||
|c| self.db.get_cf(self.cfs[c as usize], key).map(|r| r.map(|v| v.to_vec()))),
|
||||
|value| Ok(Some(value.clone())))
|
||||
}
|
||||
|
||||
/// Get value by partial key. Prefix size should match configured prefix size.
|
||||
|
||||
@@ -73,7 +73,7 @@ use std::collections::hash_map::Entry;
|
||||
/// ```
|
||||
#[derive(Default, Clone, PartialEq)]
|
||||
pub struct MemoryDB {
|
||||
data: HashMap<H256, (Bytes, i32)>,
|
||||
data: H256FastMap<(Bytes, i32)>,
|
||||
aux: HashMap<Bytes, Bytes>,
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ impl MemoryDB {
|
||||
/// Create a new instance of the memory DB.
|
||||
pub fn new() -> MemoryDB {
|
||||
MemoryDB {
|
||||
data: HashMap::new(),
|
||||
data: H256FastMap::default(),
|
||||
aux: HashMap::new(),
|
||||
}
|
||||
}
|
||||
@@ -116,8 +116,8 @@ impl MemoryDB {
|
||||
}
|
||||
|
||||
/// Return the internal map of hashes to data, clearing the current state.
|
||||
pub fn drain(&mut self) -> HashMap<H256, (Bytes, i32)> {
|
||||
mem::replace(&mut self.data, HashMap::new())
|
||||
pub fn drain(&mut self) -> H256FastMap<(Bytes, i32)> {
|
||||
mem::replace(&mut self.data, H256FastMap::default())
|
||||
}
|
||||
|
||||
/// Return the internal map of auxiliary data, clearing the current state.
|
||||
@@ -144,7 +144,7 @@ impl MemoryDB {
|
||||
pub fn denote(&self, key: &H256, value: Bytes) -> (&[u8], i32) {
|
||||
if self.raw(key) == None {
|
||||
unsafe {
|
||||
let p = &self.data as *const HashMap<H256, (Bytes, i32)> as *mut HashMap<H256, (Bytes, i32)>;
|
||||
let p = &self.data as *const H256FastMap<(Bytes, i32)> as *mut H256FastMap<(Bytes, i32)>;
|
||||
(*p).insert(key.clone(), (value, 0));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user