From 68d606b5f0ce3835c8962cacd52a134c64d4d2c3 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 18 Feb 2016 03:46:24 +0100 Subject: [PATCH] rocksdb abstraction layer --- Cargo.lock | 22 +++++++++++--- ethcore/Cargo.toml | 1 - ethcore/src/blockchain.rs | 15 +++++----- ethcore/src/client.rs | 28 ++---------------- ethcore/src/extras.rs | 5 ++-- ethcore/src/lib.rs | 1 - ethcore/src/tests/helpers.rs | 7 ++--- util/Cargo.toml | 2 +- util/src/journaldb.rs | 56 ++++++++++++++++++++---------------- util/src/lib.rs | 2 ++ util/src/overlaydb.rs | 14 ++++----- 11 files changed, 72 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e9f57900..1ee6e116c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,7 +176,6 @@ dependencies = [ "lazy_static 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", - "rocksdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rust-crypto 0.2.34 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", @@ -217,7 +216,7 @@ dependencies = [ "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "rocksdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rocksdb 0.3.0 (git+https://github.com/arkpar/rust-rocksdb.git)", "rust-crypto 0.2.34 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", @@ -397,6 +396,15 @@ name = "libc" version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "librocksdb-sys" +version = "0.1.0" +source = "git+https://github.com/arkpar/rust-rocksdb.git#e7f79d31e467c405a12db629daf5a86f81ed3e60" +dependencies = [ + "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "pkg-config 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "linked-hash-map" version = "0.0.8" @@ -525,6 +533,11 @@ name = "odds" version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pkg-config" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quasi" version = "0.6.0" @@ -573,9 +586,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/arkpar/rust-rocksdb.git#e7f79d31e467c405a12db629daf5a86f81ed3e60" dependencies = [ - "libc 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "librocksdb-sys 0.1.0 (git+https://github.com/arkpar/rust-rocksdb.git)", ] [[package]] diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 3d4d27520..97d93f3b8 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -10,7 +10,6 @@ authors = ["Ethcore "] log = "0.3" env_logger = "0.3" rustc-serialize = "0.3" -rocksdb = "0.3" heapsize = "0.2.0" rust-crypto = "0.2.34" time = "0.1" diff --git a/ethcore/src/blockchain.rs b/ethcore/src/blockchain.rs index 9240ff800..8518ef121 100644 --- a/ethcore/src/blockchain.rs +++ b/ethcore/src/blockchain.rs @@ -17,7 +17,6 @@ //! Blockchain database. use util::*; -use rocksdb::{DB, WriteBatch, Writable}; use header::*; use extras::*; use transaction::*; @@ -162,8 +161,8 @@ pub struct BlockChain { block_logs: RwLock>, blocks_blooms: RwLock>, - extras_db: DB, - blocks_db: DB, + extras_db: Database, + blocks_db: Database, cache_man: RwLock, } @@ -250,12 +249,12 @@ impl BlockChain { // open extras db let mut extras_path = path.to_path_buf(); extras_path.push("extras"); - let extras_db = DB::open_default(extras_path.to_str().unwrap()).unwrap(); + let extras_db = Database::open_default(extras_path.to_str().unwrap()).unwrap(); // open blocks db let mut blocks_path = path.to_path_buf(); blocks_path.push("blocks"); - let blocks_db = DB::open_default(blocks_path.to_str().unwrap()).unwrap(); + let blocks_db = Database::open_default(blocks_path.to_str().unwrap()).unwrap(); let mut cache_man = CacheManager{cache_usage: VecDeque::new(), in_use: HashSet::new()}; (0..COLLECTION_QUEUE_SIZE).foreach(|_| cache_man.cache_usage.push_back(HashSet::new())); @@ -294,7 +293,7 @@ impl BlockChain { bc.blocks_db.put(&hash, genesis).unwrap(); - let batch = WriteBatch::new(); + let batch = DBTransaction::new(); batch.put_extras(&hash, &details); batch.put_extras(&header.number(), &hash); batch.put(b"best", &hash).unwrap(); @@ -457,7 +456,7 @@ impl BlockChain { /// Transforms block into WriteBatch that may be written into database /// Additionally, if it's new best block it returns new best block object. - fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option, BlockDetails) { + fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (DBTransaction, Option, BlockDetails) { // create views onto rlp let block = BlockView::new(bytes); let header = block.header_view(); @@ -478,7 +477,7 @@ impl BlockChain { }; // prepare the batch - let batch = WriteBatch::new(); + let batch = DBTransaction::new(); // insert new block details batch.put_extras(&hash, &details); diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 09f7417e8..762adc113 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -18,7 +18,6 @@ use util::*; use util::panics::*; -use rocksdb::{Options, DB, DBCompactionStyle}; use blockchain::{BlockChain, BlockProvider, CacheSize}; use views::BlockView; use error::*; @@ -179,7 +178,7 @@ pub struct Client { } const HISTORY: u64 = 1000; -const CLIENT_DB_VER_STR: &'static str = "2.1"; +const CLIENT_DB_VER_STR: &'static str = "3.0"; impl Client { /// Create a new client with given spec and DB path. @@ -191,34 +190,11 @@ impl Client { let path = dir.as_path(); let gb = spec.genesis_block(); let chain = Arc::new(RwLock::new(BlockChain::new(&gb, path))); - let mut opts = Options::new(); - opts.set_max_open_files(256); - opts.create_if_missing(true); - opts.set_use_fsync(false); - opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); - /* - opts.set_bytes_per_sync(8388608); - opts.set_disable_data_sync(false); - opts.set_block_cache_size_mb(1024); - opts.set_table_cache_num_shard_bits(6); - opts.set_max_write_buffer_number(32); - opts.set_write_buffer_size(536870912); - opts.set_target_file_size_base(1073741824); - opts.set_min_write_buffer_number_to_merge(4); - opts.set_level_zero_stop_writes_trigger(2000); - opts.set_level_zero_slowdown_writes_trigger(0); - opts.set_compaction_style(DBUniversalCompaction); - opts.set_max_background_compactions(4); - opts.set_max_background_flushes(4); - opts.set_filter_deletes(false); - opts.set_disable_auto_compactions(false);*/ - let mut state_path = path.to_path_buf(); state_path.push("state"); - let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); let engine = Arc::new(try!(spec.to_engine())); - let mut state_db = JournalDB::new_with_arc(db.clone()); + let mut state_db = JournalDB::new(state_path.to_str().unwrap()); if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) { state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } diff --git a/ethcore/src/extras.rs b/ethcore/src/extras.rs index b65d4ed7a..3188378fc 100644 --- a/ethcore/src/extras.rs +++ b/ethcore/src/extras.rs @@ -18,7 +18,6 @@ use util::*; use header::BlockNumber; -use rocksdb::{DB, Writable}; /// Represents index of extra data in database #[derive(Copy, Debug, Hash, Eq, PartialEq, Clone)] @@ -56,7 +55,7 @@ pub trait ExtrasReadable { K: ExtrasSliceConvertable; } -impl ExtrasWritable for W where W: Writable { +impl ExtrasWritable for DBTransaction { fn put_extras(&self, hash: &K, value: &T) where T: ExtrasIndexable + Encodable, K: ExtrasSliceConvertable { @@ -65,7 +64,7 @@ impl ExtrasWritable for W where W: Writable { } } -impl ExtrasReadable for DB { +impl ExtrasReadable for Database { fn get_extras(&self, hash: &K) -> Option where T: ExtrasIndexable + Decodable, K: ExtrasSliceConvertable { diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 4cca74319..1b4d4e6ca 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -86,7 +86,6 @@ #[macro_use] extern crate ethcore_util as util; #[macro_use] extern crate lazy_static; extern crate rustc_serialize; -extern crate rocksdb; extern crate heapsize; extern crate crypto; extern crate time; diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 93e3e0a0d..550fc8937 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -22,7 +22,6 @@ use spec::*; use std::fs::{remove_dir_all}; use blockchain::{BlockChain}; use state::*; -use rocksdb::*; use evm::{Schedule, Factory}; use engine::*; use ethereum; @@ -258,8 +257,7 @@ pub fn generate_dummy_empty_blockchain() -> GuardedTempResult { pub fn get_temp_journal_db() -> GuardedTempResult { let temp = RandomTempPath::new(); - let db = DB::open_default(temp.as_str()).unwrap(); - let journal_db = JournalDB::new(db); + let journal_db = JournalDB::new(temp.as_str()); GuardedTempResult { _temp: temp, result: Some(journal_db) @@ -276,8 +274,7 @@ pub fn get_temp_state() -> GuardedTempResult { } pub fn get_temp_journal_db_in(path: &Path) -> JournalDB { - let db = DB::open_default(path.to_str().unwrap()).unwrap(); - JournalDB::new(db) + JournalDB::new(path.to_str().unwrap()) } pub fn get_temp_state_in(path: &Path) -> State { diff --git a/util/Cargo.toml b/util/Cargo.toml index 9ead8ccf6..67ac2a0bb 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -15,7 +15,7 @@ mio = "0.5.0" rand = "0.3.12" time = "0.1.34" tiny-keccak = "1.0" -rocksdb = "0.3" +rocksdb = { git = "https://github.com/arkpar/rust-rocksdb.git" } lazy_static = "0.1" eth-secp256k1 = { git = "https://github.com/arkpar/rust-secp256k1.git" } rust-crypto = "0.2.34" diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 7b810639b..8d31f50eb 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] use std::env; @@ -33,7 +33,7 @@ use std::env; /// the removals actually take effect. pub struct JournalDB { overlay: MemoryDB, - backing: Arc, + backing: Arc, counters: Arc>>, } @@ -47,21 +47,25 @@ impl Clone for JournalDB { } } -const LATEST_ERA_KEY : [u8; 4] = [ b'l', b'a', b's', b't' ]; -const VERSION_KEY : [u8; 4] = [ b'j', b'v', b'e', b'r' ]; +// all keys must be at least 12 bytes +const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const DB_VERSION: u32 = 2; +const DB_VERSION: u32 = 3; + +const PADDING : [u8; 10] = [ 0u8; 10 ]; impl JournalDB { - /// Create a new instance given a `backing` database. - pub fn new(backing: DB) -> JournalDB { - let db = Arc::new(backing); - JournalDB::new_with_arc(db) - } - /// Create a new instance given a shared `backing` database. - pub fn new_with_arc(backing: Arc) -> JournalDB { - if backing.iterator(IteratorMode::Start).next().is_some() { + /// Create a new instance from file + pub fn new(path: &str) -> JournalDB { + let opts = DatabaseConfig { + prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix + }; + let backing = Database::open(opts, path).unwrap_or_else(|e| { + panic!("Error opening state db: {}", e); + }); + if !backing.is_empty() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { Ok(Some(DB_VERSION)) => {}, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) @@ -72,7 +76,7 @@ impl JournalDB { let counters = JournalDB::read_counters(&backing); JournalDB { overlay: MemoryDB::new(), - backing: backing, + backing: Arc::new(backing), counters: Arc::new(RwLock::new(counters)), } } @@ -82,7 +86,7 @@ impl JournalDB { pub fn new_temp() -> JournalDB { let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) + Self::new(dir.to_str().unwrap()) } /// Check if this database has any commits @@ -117,16 +121,17 @@ impl JournalDB { // and the key is safe to delete. // record new commit's details. - let batch = WriteBatch::new(); + let batch = DBTransaction::new(); let mut counters = self.counters.write().unwrap(); { let mut index = 0usize; let mut last; while try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&now); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })).is_some() { @@ -154,9 +159,10 @@ impl JournalDB { let mut to_remove: Vec = Vec::new(); let mut canon_inserts: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&end_era); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })) { @@ -226,16 +232,17 @@ impl JournalDB { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &DB) -> HashMap { + fn read_counters(db: &Database) -> HashMap { let mut res = HashMap::new(); if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val); loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&era); r.append(&index); + r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); @@ -259,7 +266,7 @@ impl JournalDB { impl HashDB for JournalDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iterator(IteratorMode::Start) { + for (key, _) in self.backing.iter() { let h = H256::from_slice(key.deref()); ret.insert(h, 1); } @@ -429,12 +436,11 @@ mod tests { #[test] fn reopen() { - use rocksdb::DB; let mut dir = ::std::env::temp_dir(); dir.push(H32::random().hex()); let foo = { - let mut jdb = JournalDB::new(DB::open_default(dir.to_str().unwrap()).unwrap()); + let mut jdb = JournalDB::new(dir.to_str().unwrap()); // history is 1 let foo = jdb.insert(b"foo"); jdb.commit(0, &b"0".sha3(), None).unwrap(); @@ -442,13 +448,13 @@ mod tests { }; { - let mut jdb = JournalDB::new(DB::open_default(dir.to_str().unwrap()).unwrap()); + let mut jdb = JournalDB::new(dir.to_str().unwrap()); jdb.remove(&foo); jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); } { - let mut jdb = JournalDB::new(DB::open_default(dir.to_str().unwrap()).unwrap()); + let mut jdb = JournalDB::new(dir.to_str().unwrap()); assert!(jdb.exists(&foo)); jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); assert!(!jdb.exists(&foo)); diff --git a/util/src/lib.rs b/util/src/lib.rs index d4f972800..41d0ebe00 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -128,6 +128,7 @@ pub mod hashdb; pub mod memorydb; pub mod overlaydb; pub mod journaldb; +pub mod kvdb; mod math; pub mod chainfilter; pub mod crypto; @@ -162,6 +163,7 @@ pub use semantic_version::*; pub use network::*; pub use io::*; pub use log::*; +pub use kvdb::*; #[cfg(test)] mod tests; diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index f8e9c3eee..f4ed2d5d6 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -26,7 +26,7 @@ use std::ops::*; use std::sync::*; use std::env; use std::collections::HashMap; -use rocksdb::{DB, Writable, IteratorMode}; +use kvdb::{Database}; /// Implementation of the HashDB trait for a disk-backed database with a memory overlay. /// @@ -38,15 +38,15 @@ use rocksdb::{DB, Writable, IteratorMode}; /// queries have an immediate effect in terms of these functions. pub struct OverlayDB { overlay: MemoryDB, - backing: Arc, + backing: Arc, } impl OverlayDB { /// Create a new instance of OverlayDB given a `backing` database. - pub fn new(backing: DB) -> OverlayDB { Self::new_with_arc(Arc::new(backing)) } + pub fn new(backing: Database) -> OverlayDB { Self::new_with_arc(Arc::new(backing)) } /// Create a new instance of OverlayDB given a `backing` database. - pub fn new_with_arc(backing: Arc) -> OverlayDB { + pub fn new_with_arc(backing: Arc) -> OverlayDB { OverlayDB{ overlay: MemoryDB::new(), backing: backing } } @@ -54,7 +54,7 @@ impl OverlayDB { pub fn new_temp() -> OverlayDB { let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) + Self::new(Database::open_default(dir.to_str().unwrap()).unwrap()) } /// Commit all memory operations to the backing database. @@ -164,7 +164,7 @@ impl OverlayDB { impl HashDB for OverlayDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iterator(IteratorMode::Start) { + for (key, _) in self.backing.iter() { let h = H256::from_slice(key.deref()); let r = self.payload(&h).unwrap().1; ret.insert(h, r as i32); @@ -318,7 +318,7 @@ fn overlaydb_complex() { fn playpen() { use std::fs; { - let db: DB = DB::open_default("/tmp/test").unwrap(); + let db: Database = Database::open_default("/tmp/test").unwrap(); db.put(b"test", b"test2").unwrap(); match db.get(b"test") { Ok(Some(value)) => println!("Got value {:?}", value.deref()),