From eb526b7769c062447a6cc99bae7c3073bfec3885 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 12 Oct 2017 15:36:27 +0200 Subject: [PATCH] separated kvdb into 3 crates: kvdb, kvdb-memorydb && kvdb-rocksdb, #6693 --- Cargo.lock | 36 +- Cargo.toml | 2 +- ethcore/Cargo.toml | 2 + ethcore/light/Cargo.toml | 2 + ethcore/light/src/client/header_chain.rs | 3 +- ethcore/light/src/client/mod.rs | 5 +- ethcore/light/src/client/service.rs | 2 +- ethcore/light/src/lib.rs | 2 + ethcore/node_filter/Cargo.toml | 6 +- ethcore/node_filter/src/lib.rs | 13 +- ethcore/src/blockchain/blockchain.rs | 3 +- ethcore/src/client/config.rs | 2 +- ethcore/src/client/evm_test_client.rs | 7 +- ethcore/src/client/test_client.rs | 2 +- ethcore/src/lib.rs | 2 + ethcore/src/migrations/state/v7.rs | 2 +- ethcore/src/migrations/v10.rs | 3 +- ethcore/src/migrations/v9.rs | 2 +- ethcore/src/service.rs | 3 +- ethcore/src/snapshot/service.rs | 4 +- .../src/snapshot/tests/proof_of_authority.rs | 6 +- ethcore/src/snapshot/tests/proof_of_work.rs | 9 +- ethcore/src/snapshot/tests/service.rs | 2 +- ethcore/src/snapshot/tests/state.rs | 2 +- ethcore/src/spec/spec.rs | 4 +- ethcore/src/tests/client.rs | 2 +- ethcore/src/tests/helpers.rs | 2 +- ethcore/src/tests/trace.rs | 2 +- ethcore/src/trace/db.rs | 3 +- ethcore/src/tx_filter.rs | 2 +- local-store/Cargo.toml | 3 + local-store/src/lib.rs | 8 +- parity/helpers.rs | 2 +- parity/main.rs | 2 +- parity/migration.rs | 4 +- rpc/Cargo.toml | 2 +- rpc/src/lib.rs | 4 +- rpc/src/v1/tests/eth.rs | 2 +- secret_store/Cargo.toml | 1 + secret_store/src/key_storage.rs | 4 +- secret_store/src/lib.rs | 1 + sync/Cargo.toml | 7 +- sync/src/lib.rs | 1 + sync/src/tests/helpers.rs | 2 +- util/Cargo.toml | 4 + util/kvdb-memorydb/Cargo.toml | 9 + util/kvdb-memorydb/src/lib.rs | 124 +++ util/kvdb-rocksdb/Cargo.toml | 15 + util/kvdb-rocksdb/src/lib.rs | 805 ++++++++++++++++ util/kvdb/Cargo.toml | 10 +- util/kvdb/src/lib.rs | 892 +----------------- util/migration/Cargo.toml | 1 + util/migration/src/lib.rs | 7 +- util/migration/src/tests.rs | 4 +- util/src/journaldb/archivedb.rs | 4 +- util/src/journaldb/earlymergedb.rs | 6 +- util/src/journaldb/overlayrecentdb.rs | 4 +- util/src/journaldb/refcounteddb.rs | 2 +- util/src/lib.rs | 5 + util/src/overlaydb.rs | 2 +- 60 files changed, 1106 insertions(+), 968 deletions(-) create mode 100644 util/kvdb-memorydb/Cargo.toml create mode 100644 util/kvdb-memorydb/src/lib.rs create mode 100644 util/kvdb-rocksdb/Cargo.toml create mode 100644 util/kvdb-rocksdb/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0ba403990..ee95bb81f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -601,6 +601,8 @@ dependencies = [ "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)", "itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", + "kvdb-rocksdb 0.1.0", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -758,6 +760,8 @@ dependencies = [ "heapsize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", + "kvdb-rocksdb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "memorydb 0.1.0", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -844,6 +848,7 @@ dependencies = [ "hash 0.1.0", "hyper 0.10.13 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", + "kvdb-rocksdb 0.1.0", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "native-contracts 0.1.0", @@ -899,6 +904,8 @@ dependencies = [ "hashdb 0.1.0", "heapsize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", + "kvdb-rocksdb 0.1.0", "libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1024,6 +1031,7 @@ dependencies = [ "heapsize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "macros 0.1.0", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1514,10 +1522,26 @@ version = "0.1.0" dependencies = [ "elastic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ethcore-bigint 0.1.3", "ethcore-bytes 0.1.0", +] + +[[package]] +name = "kvdb-memorydb" +version = "0.1.0" +dependencies = [ + "kvdb 0.1.0", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rlp 0.2.0", +] + +[[package]] +name = "kvdb-rocksdb" +version = "0.1.0" +dependencies = [ + "elastic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-bigint 0.1.3", "ethcore-devtools 1.8.0", - "hashdb 0.1.0", + "kvdb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1670,6 +1694,7 @@ version = "0.1.0" dependencies = [ "ethcore-devtools 1.8.0", "kvdb 0.1.0", + "kvdb-rocksdb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "macros 0.1.0", ] @@ -1859,7 +1884,7 @@ dependencies = [ "ethcore-network 1.8.0", "ethcore-util 1.8.0", "futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", - "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "native-contracts 0.1.0", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2090,7 +2115,7 @@ dependencies = [ "ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)", "isatty 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.8)", - "kvdb 0.1.0", + "kvdb-rocksdb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "migration 0.1.0", "node-filter 1.8.0", @@ -2237,6 +2262,7 @@ dependencies = [ "ethcore-util 1.8.0", "ethkey 0.2.0", "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.2.0", "serde 1.0.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2294,7 +2320,7 @@ dependencies = [ "jsonrpc-macros 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.8)", "jsonrpc-pubsub 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.8)", "jsonrpc-ws-server 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.8)", - "kvdb 0.1.0", + "kvdb-memorydb 0.1.0", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "macros 0.1.0", "multihash 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index cf208330d..7ff604f5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ path = { path = "util/path" } panic_hook = { path = "panic_hook" } hash = { path = "util/hash" } migration = { path = "util/migration" } -kvdb = { path = "util/kvdb" } +kvdb-rocksdb = { path = "util/kvdb-rocksdb" } parity-dapps = { path = "dapps", optional = true } clippy = { version = "0.0.103", optional = true} diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index e7d63703d..a12083b56 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -56,6 +56,8 @@ rand = "0.3" rlp = { path = "../util/rlp" } rlp_derive = { path = "../util/rlp_derive" } kvdb = { path = "../util/kvdb" } +kvdb-rocksdb = { path = "../util/kvdb-rocksdb" } +kvdb-memorydb = { path = "../util/kvdb-memorydb" } util-error = { path = "../util/error" } snappy = { path = "../util/snappy" } migration = { path = "../util/migration" } diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 734eb7432..69a58270c 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -40,6 +40,8 @@ stats = { path = "../../util/stats" } hash = { path = "../../util/hash" } triehash = { path = "../../util/triehash" } kvdb = { path = "../../util/kvdb" } +kvdb-rocksdb = { path = "../../util/kvdb-rocksdb" } +kvdb-memorydb = { path = "../../util/kvdb-memorydb" } [features] default = [] diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index c31fd4787..bf0c9f01d 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -728,7 +728,8 @@ mod tests { use ethcore::header::Header; use ethcore::spec::Spec; use cache::Cache; - use kvdb::{in_memory, KeyValueDB}; + use kvdb::KeyValueDB; + use kvdb_memorydb::in_memory; use time::Duration; use parking_lot::Mutex; diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index fb0ec3917..4584a3237 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -36,7 +36,8 @@ use bigint::prelude::U256; use bigint::hash::H256; use futures::{IntoFuture, Future}; -use kvdb::{KeyValueDB, CompactionProfile}; +use kvdb::KeyValueDB; +use kvdb_rocksdb::CompactionProfile; use self::fetch::ChainDataFetcher; use self::header_chain::{AncestryIter, HeaderChain}; @@ -214,7 +215,7 @@ impl Client { io_channel: IoChannel, cache: Arc> ) -> Self { - let db = ::kvdb::in_memory(0); + let db = ::kvdb_memorydb::in_memory(0); Client::new( config, diff --git a/ethcore/light/src/client/service.rs b/ethcore/light/src/client/service.rs index f20d0ad90..2ab3e530a 100644 --- a/ethcore/light/src/client/service.rs +++ b/ethcore/light/src/client/service.rs @@ -25,7 +25,7 @@ use ethcore::db; use ethcore::service::ClientIoMessage; use ethcore::spec::Spec; use io::{IoContext, IoError, IoHandler, IoService}; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; use cache::Cache; use parking_lot::Mutex; diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index edf655e4d..9c860a273 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -92,6 +92,8 @@ extern crate vm; extern crate hash; extern crate triehash; extern crate kvdb; +extern crate kvdb_memorydb; +extern crate kvdb_rocksdb; #[cfg(feature = "ipc")] extern crate ethcore_ipc as ipc; diff --git a/ethcore/node_filter/Cargo.toml b/ethcore/node_filter/Cargo.toml index c95e83091..dc8797d8e 100644 --- a/ethcore/node_filter/Cargo.toml +++ b/ethcore/node_filter/Cargo.toml @@ -11,10 +11,12 @@ ethcore = { path = ".."} ethcore-util = { path = "../../util" } ethcore-bigint = { path = "../../util/bigint" } ethcore-bytes = { path = "../../util/bytes" } -ethcore-io = { path = "../../util/io" } ethcore-network = { path = "../../util/network" } -kvdb = { path = "../../util/kvdb" } native-contracts = { path = "../native_contracts" } futures = "0.1" log = "0.3" parking_lot = "0.4" + +[dev-dependencies] +kvdb-memorydb = { path = "../../util/kvdb-memorydb" } +ethcore-io = { path = "../../util/io" } diff --git a/ethcore/node_filter/src/lib.rs b/ethcore/node_filter/src/lib.rs index e92dba61d..b0db01908 100644 --- a/ethcore/node_filter/src/lib.rs +++ b/ethcore/node_filter/src/lib.rs @@ -24,9 +24,14 @@ extern crate ethcore_network as network; extern crate native_contracts; extern crate futures; extern crate parking_lot; -extern crate kvdb; -#[cfg(test)] extern crate ethcore_io as io; -#[macro_use] extern crate log; + +#[macro_use] +extern crate log; + +#[cfg(test)] +extern crate kvdb_memorydb; +#[cfg(test)] +extern crate ethcore_io as io; use std::sync::Weak; use std::collections::HashMap; @@ -135,7 +140,7 @@ mod test { let contract_addr = Address::from_str("0000000000000000000000000000000000000005").unwrap(); let data = include_bytes!("../res/node_filter.json"); let spec = Spec::load(&::std::env::temp_dir(), &data[..]).unwrap(); - let client_db = Arc::new(::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))); + let client_db = Arc::new(::kvdb_memorydb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))); let client = Client::new( ClientConfig::default(), diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index a55cf669f..4e152a485 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -1479,7 +1479,8 @@ mod tests { use std::sync::Arc; use rustc_hex::FromHex; use hash::keccak; - use kvdb::{in_memory, KeyValueDB}; + use kvdb::KeyValueDB; + use kvdb_memorydb::in_memory; use bigint::hash::*; use receipt::{Receipt, TransactionOutcome}; use blockchain::{BlockProvider, BlockChain, Config, ImportRoute}; diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index e629732d5..14efb3ec4 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -21,7 +21,7 @@ use std::fmt::{Display, Formatter, Error as FmtError}; use mode::Mode as IpcMode; use verification::{VerifierType, QueueConfig}; use util::journaldb; -use kvdb::CompactionProfile; +use kvdb_rocksdb::CompactionProfile; pub use std::time::Duration; pub use blockchain::Config as BlockChainConfig; diff --git a/ethcore/src/client/evm_test_client.rs b/ethcore/src/client/evm_test_client.rs index ff03554f9..f7bf7a75b 100644 --- a/ethcore/src/client/evm_test_client.rs +++ b/ethcore/src/client/evm_test_client.rs @@ -21,8 +21,7 @@ use std::sync::Arc; use bigint::prelude::U256; use bigint::hash::H256; use util::journaldb; -use trie; -use bytes; +use {trie, kvdb_memorydb, bytes}; use kvdb::{self, KeyValueDB}; use {state, state_db, client, executive, trace, transaction, db, spec, pod_state}; use factory::Factories; @@ -128,7 +127,7 @@ impl<'a> EvmTestClient<'a> { } fn state_from_spec(spec: &'a spec::Spec, factories: &Factories) -> Result, EvmTestError> { - let db = Arc::new(kvdb::in_memory(db::NUM_COLUMNS.expect("We use column-based DB; qed"))); + let db = Arc::new(kvdb_memorydb::in_memory(db::NUM_COLUMNS.expect("We use column-based DB; qed"))); let journal_db = journaldb::new(db.clone(), journaldb::Algorithm::EarlyMerge, db::COL_STATE); let mut state_db = state_db::StateDB::new(journal_db, 5 * 1024 * 1024); state_db = spec.ensure_db_good(state_db, factories)?; @@ -150,7 +149,7 @@ impl<'a> EvmTestClient<'a> { } fn state_from_pod(spec: &'a spec::Spec, factories: &Factories, pod_state: pod_state::PodState) -> Result, EvmTestError> { - let db = Arc::new(kvdb::in_memory(db::NUM_COLUMNS.expect("We use column-based DB; qed"))); + let db = Arc::new(kvdb_memorydb::in_memory(db::NUM_COLUMNS.expect("We use column-based DB; qed"))); let journal_db = journaldb::new(db.clone(), journaldb::Algorithm::EarlyMerge, db::COL_STATE); let state_db = state_db::StateDB::new(journal_db, 5 * 1024 * 1024); let mut state = state::State::new( diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 82f969320..4ee3c420c 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -27,7 +27,7 @@ use bigint::prelude::U256; use bigint::hash::H256; use parking_lot::RwLock; use util::*; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; use bytes::Bytes; use rlp::*; use ethkey::{Generator, Random}; diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index a67eb879a..c480beabe 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -113,6 +113,8 @@ extern crate ansi_term; extern crate semantic_version; extern crate unexpected; extern crate kvdb; +extern crate kvdb_rocksdb; +extern crate kvdb_memorydb; extern crate util_error; extern crate snappy; extern crate migration; diff --git a/ethcore/src/migrations/state/v7.rs b/ethcore/src/migrations/state/v7.rs index cbf517d1d..c15935117 100644 --- a/ethcore/src/migrations/state/v7.rs +++ b/ethcore/src/migrations/state/v7.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use bigint::hash::H256; use util::Address; use bytes::Bytes; -use kvdb::Database; +use kvdb_rocksdb::Database; use migration::{Batch, Config, Error, Migration, SimpleMigration, Progress}; use hash::keccak; use std::sync::Arc; diff --git a/ethcore/src/migrations/v10.rs b/ethcore/src/migrations/v10.rs index 155d6f4c0..3a236e719 100644 --- a/ethcore/src/migrations/v10.rs +++ b/ethcore/src/migrations/v10.rs @@ -26,7 +26,8 @@ use migration::{Error, Migration, Progress, Batch, Config}; use util::journaldb; use bigint::hash::H256; use trie::Trie; -use kvdb::{Database, DBTransaction}; +use kvdb::DBTransaction; +use kvdb_rocksdb::Database; /// Account bloom upgrade routine. If bloom already present, does nothing. /// If database empty (no best block), does nothing. diff --git a/ethcore/src/migrations/v9.rs b/ethcore/src/migrations/v9.rs index 7c28054fa..39637dc4e 100644 --- a/ethcore/src/migrations/v9.rs +++ b/ethcore/src/migrations/v9.rs @@ -18,7 +18,7 @@ //! This migration consolidates all databases into single one using Column Families. use rlp::{Rlp, RlpStream}; -use kvdb::Database; +use kvdb_rocksdb::Database; use migration::{Batch, Config, Error, Migration, Progress}; use std::sync::Arc; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 90f6810aa..8d67166be 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -19,7 +19,8 @@ use std::sync::Arc; use std::path::Path; use bigint::hash::H256; -use kvdb::{Database, DatabaseConfig, KeyValueDB}; +use kvdb::KeyValueDB; +use kvdb_rocksdb::{Database, DatabaseConfig}; use bytes::Bytes; use io::*; use spec::Spec; diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index a53824b1f..ae6a34cfa 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -40,7 +40,7 @@ use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use util_error::UtilError; use bytes::Bytes; use util::journaldb::Algorithm; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; use snappy; /// Helper for removing directories in case of error. @@ -682,7 +682,7 @@ mod tests { #[test] fn cannot_finish_with_invalid_chunks() { use bigint::hash::H256; - use kvdb::DatabaseConfig; + use kvdb_rocksdb::DatabaseConfig; let spec = get_test_spec(); let dir = RandomTempPath::new(); diff --git a/ethcore/src/snapshot/tests/proof_of_authority.rs b/ethcore/src/snapshot/tests/proof_of_authority.rs index 9634fd531..fcc4d3872 100644 --- a/ethcore/src/snapshot/tests/proof_of_authority.rs +++ b/ethcore/src/snapshot/tests/proof_of_authority.rs @@ -31,7 +31,7 @@ use tests::helpers; use transaction::{Transaction, Action, SignedTransaction}; use util::Address; -use kvdb; +use kvdb_memorydb; const PASS: &'static str = ""; const TRANSITION_BLOCK_1: usize = 2; // block at which the contract becomes activated. @@ -238,7 +238,7 @@ fn fixed_to_contract_only() { assert_eq!(client.chain_info().best_block_number, 11); let reader = snapshot_helpers::snap(&*client); - let new_db = kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)); + let new_db = kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)); let spec = spec_fixed_to_contract(); // ensure fresh engine's step matches. @@ -270,7 +270,7 @@ fn fixed_to_contract_to_contract() { assert_eq!(client.chain_info().best_block_number, 16); let reader = snapshot_helpers::snap(&*client); - let new_db = kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)); + let new_db = kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)); let spec = spec_fixed_to_contract(); for _ in 0..16 { spec.engine.step() } diff --git a/ethcore/src/snapshot/tests/proof_of_work.rs b/ethcore/src/snapshot/tests/proof_of_work.rs index 8002e4362..357d42a26 100644 --- a/ethcore/src/snapshot/tests/proof_of_work.rs +++ b/ethcore/src/snapshot/tests/proof_of_work.rs @@ -26,7 +26,8 @@ use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use parking_lot::Mutex; use snappy; -use kvdb::{self, KeyValueDB, DBTransaction}; +use kvdb::{KeyValueDB, DBTransaction}; +use kvdb_memorydb; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -43,7 +44,7 @@ fn chunk_and_restore(amount: u64) { let mut snapshot_path = new_path.as_path().to_owned(); snapshot_path.push("SNAP"); - let old_db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); + let old_db = Arc::new(kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let bc = BlockChain::new(Default::default(), &genesis, old_db.clone()); // build the blockchain. @@ -80,7 +81,7 @@ fn chunk_and_restore(amount: u64) { writer.into_inner().finish(manifest.clone()).unwrap(); // restore it. - let new_db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); + let new_db = Arc::new(kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); let mut rebuilder = SNAPSHOT_MODE.rebuilder(new_chain, new_db.clone(), &manifest).unwrap(); @@ -127,7 +128,7 @@ fn checks_flag() { let chunk = stream.out(); - let db = Arc::new(kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); + let db = Arc::new(kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let engine = ::spec::Spec::new_test().engine; let chain = BlockChain::new(Default::default(), &genesis, db.clone()); diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index d391883a9..ccaf819b0 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -27,7 +27,7 @@ use tests::helpers::generate_dummy_client_with_spec_and_data; use devtools::RandomTempPath; use io::IoChannel; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; struct NoopDBRestore; diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index 175ae4eb8..9f9b434df 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -27,7 +27,7 @@ use error::Error; use rand::{XorShiftRng, SeedableRng}; use bigint::hash::H256; use util::journaldb::{self, Algorithm}; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; use memorydb::MemoryDB; use parking_lot::Mutex; use devtools::RandomTempPath; diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index e643e1210..571d3d6dc 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -668,13 +668,13 @@ impl Spec { pub fn genesis_epoch_data(&self) -> Result, String> { use transaction::{Action, Transaction}; use util::journaldb; - use kvdb; + use kvdb_memorydb; let genesis = self.genesis_header(); let factories = Default::default(); let mut db = journaldb::new( - Arc::new(kvdb::in_memory(0)), + Arc::new(kvdb_memorydb::in_memory(0)), journaldb::Algorithm::Archive, None, ); diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 06005e66d..69e48a9df 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -27,7 +27,7 @@ use tests::helpers::*; use types::filter::Filter; use bigint::prelude::U256; use util::*; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; use devtools::*; use miner::Miner; use spec::Spec; diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 52a0dedc6..ab1a6c88c 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -232,7 +232,7 @@ pub fn get_test_client_with_blocks(blocks: Vec) -> Arc { } fn new_db() -> Arc<::kvdb::KeyValueDB> { - Arc::new(::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))) + Arc::new(::kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))) } pub fn generate_dummy_blockchain(block_number: u32) -> BlockChain { diff --git a/ethcore/src/tests/trace.rs b/ethcore/src/tests/trace.rs index 270d29597..57a745a1a 100644 --- a/ethcore/src/tests/trace.rs +++ b/ethcore/src/tests/trace.rs @@ -27,7 +27,7 @@ use client::*; use tests::helpers::*; use devtools::RandomTempPath; use client::{BlockChainClient, Client, ClientConfig}; -use kvdb::{Database, DatabaseConfig}; +use kvdb_rocksdb::{Database, DatabaseConfig}; use std::sync::Arc; use header::Header; use miner::Miner; diff --git a/ethcore/src/trace/db.rs b/ethcore/src/trace/db.rs index 087acdcc9..fed982f93 100644 --- a/ethcore/src/trace/db.rs +++ b/ethcore/src/trace/db.rs @@ -416,7 +416,8 @@ mod tests { use bigint::prelude::U256; use bigint::hash::H256; use util::Address; - use kvdb::{DBTransaction, in_memory, KeyValueDB}; + use kvdb::{DBTransaction, KeyValueDB}; + use kvdb_memorydb::in_memory; use header::BlockNumber; use trace::{Config, TraceDB, Database as TraceDatabase, DatabaseExtras, ImportRequest}; use trace::{Filter, LocalizedTrace, AddressesFilter, TraceError}; diff --git a/ethcore/src/tx_filter.rs b/ethcore/src/tx_filter.rs index ac40b8ab6..28f2611db 100644 --- a/ethcore/src/tx_filter.rs +++ b/ethcore/src/tx_filter.rs @@ -178,7 +178,7 @@ mod test { "#; let spec = Spec::load(&::std::env::temp_dir(), spec_data.as_bytes()).unwrap(); - let client_db = Arc::new(::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); + let client_db = Arc::new(::kvdb_memorydb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let client = Client::new( ClientConfig::default(), diff --git a/local-store/Cargo.toml b/local-store/Cargo.toml index db9830c40..e4b97f7a4 100644 --- a/local-store/Cargo.toml +++ b/local-store/Cargo.toml @@ -14,4 +14,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" log = "0.3" + +[dev-dependencies] ethkey = { path = "../ethkey" } +kvdb-memorydb = { path = "../util/kvdb-memorydb" } diff --git a/local-store/src/lib.rs b/local-store/src/lib.rs index 61fe54976..2666e5a9e 100644 --- a/local-store/src/lib.rs +++ b/local-store/src/lib.rs @@ -44,6 +44,8 @@ extern crate log; #[cfg(test)] extern crate ethkey; +#[cfg(test)] +extern crate kvdb_memorydb; const LOCAL_TRANSACTIONS_KEY: &'static [u8] = &*b"LOCAL_TXS"; @@ -243,7 +245,7 @@ mod tests { #[test] fn twice_empty() { - let db = Arc::new(::kvdb::in_memory(0)); + let db = Arc::new(::kvdb_memorydb::in_memory(0)); { let store = super::create(db.clone(), None, Dummy(vec![])); @@ -272,7 +274,7 @@ mod tests { PendingTransaction::new(signed, condition) }).collect(); - let db = Arc::new(::kvdb::in_memory(0)); + let db = Arc::new(::kvdb_memorydb::in_memory(0)); { // nothing written yet, will write pending. @@ -311,7 +313,7 @@ mod tests { PendingTransaction::new(signed, None) }); - let db = Arc::new(::kvdb::in_memory(0)); + let db = Arc::new(::kvdb_memorydb::in_memory(0)); { // nothing written, will write bad. let store = super::create(db.clone(), None, Dummy(transactions.clone())); diff --git a/parity/helpers.rs b/parity/helpers.rs index da54f6763..1319f45d3 100644 --- a/parity/helpers.rs +++ b/parity/helpers.rs @@ -21,7 +21,7 @@ use std::fs::File; use bigint::prelude::U256; use bigint::hash::clean_0x; use util::Address; -use kvdb::CompactionProfile; +use kvdb_rocksdb::CompactionProfile; use util::journaldb::Algorithm; use ethcore::client::{Mode, BlockId, VMType, DatabaseCompactionProfile, ClientConfig, VerifierType}; use ethcore::miner::{PendingSet, GasLimit, PrioritizationStrategy}; diff --git a/parity/main.rs b/parity/main.rs index 97ffbca5f..144af116a 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -62,7 +62,7 @@ extern crate ethcore_bigint as bigint; extern crate ethcore_bytes as bytes; extern crate ethcore_network as network; extern crate migration as migr; -extern crate kvdb; +extern crate kvdb_rocksdb; extern crate ethkey; extern crate ethsync; extern crate node_health; diff --git a/parity/migration.rs b/parity/migration.rs index 508491a0a..1e8cf2693 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -22,7 +22,7 @@ use std::fmt::{Display, Formatter, Error as FmtError}; use std::sync::Arc; use util::journaldb::Algorithm; use migr::{Manager as MigrationManager, Config as MigrationConfig, Error as MigrationError, Migration}; -use kvdb::{CompactionProfile, Database, DatabaseConfig}; +use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig}; use ethcore::migrations; use ethcore::db; use ethcore::migrations::Extract; @@ -283,7 +283,7 @@ mod legacy { use std::path::{Path, PathBuf}; use util::journaldb::Algorithm; use migr::{Manager as MigrationManager}; - use kvdb::CompactionProfile; + use kvdb_rocksdb::CompactionProfile; use ethcore::migrations; /// Blocks database path. diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 59f5a117c..ceb57639c 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -57,7 +57,6 @@ rlp = { path = "../util/rlp" } stats = { path = "../util/stats" } vm = { path = "../ethcore/vm" } hash = { path = "../util/hash" } -kvdb = { path = "../util/kvdb" } hardware-wallet = { path = "../hw" } clippy = { version = "0.0.103", optional = true} @@ -66,6 +65,7 @@ pretty_assertions = "0.1" [dev-dependencies] macros = { path = "../util/macros" } ethcore-network = { path = "../util/network" } +kvdb-memorydb = { path = "../util/kvdb-memorydb" } [features] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"] diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 2e6d6148f..a4572835f 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -65,7 +65,6 @@ extern crate rlp; extern crate stats; extern crate hash; extern crate hardware_wallet; -extern crate kvdb; #[macro_use] extern crate log; @@ -85,6 +84,9 @@ extern crate pretty_assertions; #[macro_use] extern crate macros; +#[cfg(test)] +extern crate kvdb_memorydb; + pub extern crate jsonrpc_ws_server as ws; mod authcodes; diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index a2b23f52e..2674447a3 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -33,7 +33,7 @@ use io::IoChannel; use bigint::prelude::U256; use bigint::hash::H256; use util::Address; -use kvdb::in_memory; +use kvdb_memorydb::in_memory; use jsonrpc_core::IoHandler; use v1::impls::{EthClient, SigningUnsafeClient}; diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index 3e98c62e7..ca9070440 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -33,6 +33,7 @@ ethcore-devtools = { path = "../devtools" } ethcore-util = { path = "../util" } ethcore-bigint = { path = "../util/bigint" } kvdb = { path = "../util/kvdb" } +kvdb-rocksdb = { path = "../util/kvdb-rocksdb" } hash = { path = "../util/hash" } ethcore-ipc = { path = "../ipc/rpc" } ethcore-ipc-nano = { path = "../ipc/nano" } diff --git a/secret_store/src/key_storage.rs b/secret_store/src/key_storage.rs index ca408dfc1..87f1ec084 100644 --- a/secret_store/src/key_storage.rs +++ b/secret_store/src/key_storage.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use std::collections::BTreeMap; use serde_json; use ethkey::{Secret, Public}; -use kvdb::{Database, DatabaseIterator}; +use kvdb_rocksdb::{Database, DatabaseIterator}; use types::all::{Error, ServiceConfiguration, ServerKeyId, NodeId}; use serialization::{SerializablePublic, SerializableSecret}; @@ -293,7 +293,7 @@ pub mod tests { use serde_json; use devtools::RandomTempPath; use ethkey::{Random, Generator, Public, Secret}; - use kvdb::Database; + use kvdb_rocksdb::Database; use types::all::{Error, NodeAddress, ServiceConfiguration, ClusterConfiguration, ServerKeyId}; use super::{DB_META_KEY_VERSION, CURRENT_VERSION, KeyStorage, PersistentKeyStorage, DocumentKeyShare, SerializableDocumentKeyShareV0, SerializableDocumentKeyShareV1, diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index 6aa3bd708..73bf92627 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -49,6 +49,7 @@ extern crate ethkey; extern crate native_contracts; extern crate hash; extern crate kvdb; +extern crate kvdb_rocksdb; mod key_server_cluster; mod types; diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 9260f99ce..8f1e08093 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -34,11 +34,14 @@ ethcore-ipc = { path = "../ipc/rpc" } semver = "0.6" smallvec = { version = "0.4", features = ["heapsizeof"] } ethcore-ipc-nano = { path = "../ipc/nano" } -ethcore-devtools = { path = "../devtools" } -ethkey = { path = "../ethkey" } parking_lot = "0.4" ipnetwork = "0.12.6" +[dev-dependencies] +ethkey = { path = "../ethkey" } +kvdb-memorydb = { path = "../util/kvdb-memorydb" } +ethcore-devtools = { path = "../devtools" } + [features] default = [] dev = ["clippy", "ethcore/dev", "ethcore-util/dev"] diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 33b1d021f..1bcc744a7 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -48,6 +48,7 @@ extern crate ethcore_light as light; #[cfg(test)] extern crate ethcore_devtools as devtools; #[cfg(test)] extern crate ethkey; +#[cfg(test)] extern crate kvdb_memorydb; #[macro_use] extern crate macros; diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 1fe4fde46..c5b318cd7 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -291,7 +291,7 @@ impl TestNet> { let client = EthcoreClient::new( ClientConfig::default(), &spec, - Arc::new(::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))), + Arc::new(::kvdb_memorydb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))), Arc::new(Miner::with_spec_and_accounts(&spec, accounts)), IoChannel::disconnected(), ).unwrap(); diff --git a/util/Cargo.toml b/util/Cargo.toml index d9faa1786..1877c7466 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -35,6 +35,10 @@ memorydb = { path = "memorydb" } util-error = { path = "error" } kvdb = { path = "kvdb" } +[dev-dependencies] +kvdb-rocksdb = { path = "kvdb-rocksdb" } +kvdb-memorydb = { path = "kvdb-memorydb" } + [features] default = [] dev = ["clippy"] diff --git a/util/kvdb-memorydb/Cargo.toml b/util/kvdb-memorydb/Cargo.toml new file mode 100644 index 000000000..e980fb158 --- /dev/null +++ b/util/kvdb-memorydb/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "kvdb-memorydb" +version = "0.1.0" +authors = ["debris "] + +[dependencies] +parking_lot = "0.4" +rlp = { path = "../rlp" } +kvdb = { path = "../kvdb" } diff --git a/util/kvdb-memorydb/src/lib.rs b/util/kvdb-memorydb/src/lib.rs new file mode 100644 index 000000000..8b6d6c0a4 --- /dev/null +++ b/util/kvdb-memorydb/src/lib.rs @@ -0,0 +1,124 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate parking_lot; +extern crate kvdb; +extern crate rlp; + +use std::collections::{BTreeMap, HashMap}; +use parking_lot::RwLock; +use kvdb::{DBValue, Error, DBTransaction, KeyValueDB, DBOp}; +use rlp::{RlpType, UntrustedRlp, Compressible}; + +/// A key-value database fulfilling the `KeyValueDB` trait, living in memory. +/// This is generally intended for tests and is not particularly optimized. +#[derive(Default)] +pub struct InMemory { + columns: RwLock, BTreeMap, DBValue>>>, +} + +/// Create an in-memory database with the given number of columns. +/// Columns will be indexable by 0..`num_cols` +pub fn in_memory(num_cols: u32) -> InMemory { + let mut cols = HashMap::new(); + cols.insert(None, BTreeMap::new()); + + for idx in 0..num_cols { + cols.insert(Some(idx), BTreeMap::new()); + } + + InMemory { + columns: RwLock::new(cols) + } +} + +impl KeyValueDB for InMemory { + fn get(&self, col: Option, key: &[u8]) -> Result, String> { + let columns = self.columns.read(); + match columns.get(&col) { + None => Err(format!("No such column family: {:?}", col)), + Some(map) => Ok(map.get(key).cloned()), + } + } + + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + let columns = self.columns.read(); + match columns.get(&col) { + None => None, + Some(map) => + map.iter() + .find(|&(ref k ,_)| k.starts_with(prefix)) + .map(|(_, v)| v.to_vec().into_boxed_slice()) + } + } + + fn write_buffered(&self, transaction: DBTransaction) { + let mut columns = self.columns.write(); + let ops = transaction.ops; + for op in ops { + match op { + DBOp::Insert { col, key, value } => { + if let Some(mut col) = columns.get_mut(&col) { + col.insert(key.into_vec(), value); + } + }, + DBOp::InsertCompressed { col, key, value } => { + if let Some(mut col) = columns.get_mut(&col) { + let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); + let mut value = DBValue::new(); + value.append_slice(&compressed); + col.insert(key.into_vec(), value); + } + }, + DBOp::Delete { col, key } => { + if let Some(mut col) = columns.get_mut(&col) { + col.remove(&*key); + } + }, + } + } + } + + fn flush(&self) -> Result<(), String> { Ok(()) } + fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { + match self.columns.read().get(&col) { + Some(map) => Box::new( // TODO: worth optimizing at all? + map.clone() + .into_iter() + .map(|(k, v)| (k.into_boxed_slice(), v.into_vec().into_boxed_slice())) + ), + None => Box::new(None.into_iter()), + } + } + + fn iter_from_prefix<'a>(&'a self, col: Option, prefix: &'a [u8]) + -> Box, Box<[u8]>)> + 'a> + { + match self.columns.read().get(&col) { + Some(map) => Box::new( + map.clone() + .into_iter() + .skip_while(move |&(ref k, _)| !k.starts_with(prefix)) + .map(|(k, v)| (k.into_boxed_slice(), v.into_vec().into_boxed_slice())) + ), + None => Box::new(None.into_iter()), + } + } + + fn restore(&self, _new_db: &str) -> Result<(), Error> { + Err("Attempted to restore in-memory database".into()) + } +} diff --git a/util/kvdb-rocksdb/Cargo.toml b/util/kvdb-rocksdb/Cargo.toml new file mode 100644 index 000000000..1fafbdae9 --- /dev/null +++ b/util/kvdb-rocksdb/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "kvdb-rocksdb" +version = "0.1.0" +authors = ["debris "] + +[dependencies] +log = "0.3" +elastic-array = "0.9" +ethcore-bigint = { path = "../bigint" } +ethcore-devtools = { path = "../../devtools" } +parking_lot = "0.4" +regex = "0.2" +rlp = { path = "../rlp" } +rocksdb = { git = "https://github.com/paritytech/rust-rocksdb" } +kvdb = { path = "../kvdb" } diff --git a/util/kvdb-rocksdb/src/lib.rs b/util/kvdb-rocksdb/src/lib.rs new file mode 100644 index 000000000..f17be8fb3 --- /dev/null +++ b/util/kvdb-rocksdb/src/lib.rs @@ -0,0 +1,805 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#[macro_use] +extern crate log; + +extern crate elastic_array; +extern crate parking_lot; +extern crate regex; +extern crate rocksdb; + +extern crate ethcore_bigint as bigint; +extern crate ethcore_devtools as devtools; +extern crate kvdb; +extern crate rlp; + +use std::collections::HashMap; +use std::marker::PhantomData; +use std::path::{PathBuf, Path}; +use std::{mem, fs, io}; + +use parking_lot::{Mutex, MutexGuard, RwLock}; +use rocksdb::{ + DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, + Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache, Column, ReadOptions +}; + +use elastic_array::ElasticArray32; +use rlp::{UntrustedRlp, RlpType, Compressible}; +use kvdb::{KeyValueDB, DBTransaction, DBValue, Error, DBOp}; + +#[cfg(target_os = "linux")] +use regex::Regex; +#[cfg(target_os = "linux")] +use std::process::Command; +#[cfg(target_os = "linux")] +use std::fs::File; + +const DB_BACKGROUND_FLUSHES: i32 = 2; +const DB_BACKGROUND_COMPACTIONS: i32 = 2; +const DB_WRITE_BUFFER_SIZE: usize = 2048 * 1000; + +enum KeyState { + Insert(DBValue), + InsertCompressed(DBValue), + Delete, +} + +/// Compaction profile for the database settings +#[derive(Clone, Copy, PartialEq, Debug)] +pub struct CompactionProfile { + /// L0-L1 target file size + pub initial_file_size: u64, + /// L2-LN target file size multiplier + pub file_size_multiplier: i32, + /// rate limiter for background flushes and compactions, bytes/sec, if any + pub write_rate_limit: Option, +} + +impl Default for CompactionProfile { + /// Default profile suitable for most storage + fn default() -> CompactionProfile { + CompactionProfile::ssd() + } +} + +/// Given output of df command return Linux rotational flag file path. +#[cfg(target_os = "linux")] +pub fn rotational_from_df_output(df_out: Vec) -> Option { + use std::str; + str::from_utf8(df_out.as_slice()) + .ok() + // Get the drive name. + .and_then(|df_str| Regex::new(r"/dev/(sd[:alpha:]{1,2})") + .ok() + .and_then(|re| re.captures(df_str)) + .and_then(|captures| captures.get(1))) + // Generate path e.g. /sys/block/sda/queue/rotational + .map(|drive_path| { + let mut p = PathBuf::from("/sys/block"); + p.push(drive_path.as_str()); + p.push("queue/rotational"); + p + }) +} + +impl CompactionProfile { + /// Attempt to determine the best profile automatically, only Linux for now. + #[cfg(target_os = "linux")] + pub fn auto(db_path: &Path) -> CompactionProfile { + use std::io::Read; + let hdd_check_file = db_path + .to_str() + .and_then(|path_str| Command::new("df").arg(path_str).output().ok()) + .and_then(|df_res| match df_res.status.success() { + true => Some(df_res.stdout), + false => None, + }) + .and_then(rotational_from_df_output); + // Read out the file and match compaction profile. + if let Some(hdd_check) = hdd_check_file { + if let Ok(mut file) = File::open(hdd_check.as_path()) { + let mut buffer = [0; 1]; + if file.read_exact(&mut buffer).is_ok() { + // 0 means not rotational. + if buffer == [48] { return Self::ssd(); } + // 1 means rotational. + if buffer == [49] { return Self::hdd(); } + } + } + } + // Fallback if drive type was not determined. + Self::default() + } + + /// Just default for other platforms. + #[cfg(not(target_os = "linux"))] + pub fn auto(_db_path: &Path) -> CompactionProfile { + Self::default() + } + + /// Default profile suitable for SSD storage + pub fn ssd() -> CompactionProfile { + CompactionProfile { + initial_file_size: 32 * 1024 * 1024, + file_size_multiplier: 2, + write_rate_limit: None, + } + } + + /// Slow HDD compaction profile + pub fn hdd() -> CompactionProfile { + CompactionProfile { + initial_file_size: 192 * 1024 * 1024, + file_size_multiplier: 1, + write_rate_limit: Some(8 * 1024 * 1024), + } + } +} + +/// Database configuration +#[derive(Clone)] +pub struct DatabaseConfig { + /// Max number of open files. + pub max_open_files: i32, + /// Cache sizes (in MiB) for specific columns. + pub cache_sizes: HashMap, usize>, + /// Compaction profile + pub compaction: CompactionProfile, + /// Set number of columns + pub columns: Option, + /// Should we keep WAL enabled? + pub wal: bool, +} + +impl DatabaseConfig { + /// Create new `DatabaseConfig` with default parameters and specified set of columns. + /// Note that cache sizes must be explicitly set. + pub fn with_columns(columns: Option) -> Self { + let mut config = Self::default(); + config.columns = columns; + config + } + + /// Set the column cache size in MiB. + pub fn set_cache(&mut self, col: Option, size: usize) { + self.cache_sizes.insert(col, size); + } +} + +impl Default for DatabaseConfig { + fn default() -> DatabaseConfig { + DatabaseConfig { + cache_sizes: HashMap::new(), + max_open_files: 512, + compaction: CompactionProfile::default(), + columns: None, + wal: true, + } + } +} + +/// Database iterator (for flushed data only) +// The compromise of holding only a virtual borrow vs. holding a lock on the +// inner DB (to prevent closing via restoration) may be re-evaluated in the future. +// +pub struct DatabaseIterator<'a> { + iter: DBIterator, + _marker: PhantomData<&'a Database>, +} + +impl<'a> Iterator for DatabaseIterator<'a> { + type Item = (Box<[u8]>, Box<[u8]>); + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +struct DBAndColumns { + db: DB, + cfs: Vec, +} + +// get column family configuration from database config. +fn col_config(col: u32, config: &DatabaseConfig) -> Options { + // default cache size for columns not specified. + const DEFAULT_CACHE: usize = 2; + + let mut opts = Options::new(); + opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + opts.set_target_file_size_base(config.compaction.initial_file_size); + opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); + opts.set_db_write_buffer_size(DB_WRITE_BUFFER_SIZE); + + let col_opt = config.columns.map(|_| col); + + { + let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE); + let mut block_opts = BlockBasedOptions::new(); + // all goes to read cache. + block_opts.set_cache(Cache::new(cache_size * 1024 * 1024)); + opts.set_block_based_table_factory(&block_opts); + } + + opts +} + +/// Key-Value database. +pub struct Database { + db: RwLock>, + config: DatabaseConfig, + write_opts: WriteOptions, + read_opts: ReadOptions, + path: String, + // Dirty values added with `write_buffered`. Cleaned on `flush`. + overlay: RwLock, KeyState>>>, + // Values currently being flushed. Cleared when `flush` completes. + flushing: RwLock, KeyState>>>, + // Prevents concurrent flushes. + // Value indicates if a flush is in progress. + flushing_lock: Mutex, +} + +impl Database { + /// Open database with default settings. + pub fn open_default(path: &str) -> Result { + Database::open(&DatabaseConfig::default(), path) + } + + /// Open database file. Creates if it does not exist. + pub fn open(config: &DatabaseConfig, path: &str) -> Result { + let mut opts = Options::new(); + if let Some(rate_limit) = config.compaction.write_rate_limit { + opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?; + } + opts.set_parsed_options(&format!("max_total_wal_size={}", 64 * 1024 * 1024))?; + opts.set_parsed_options("verify_checksums_in_compaction=0")?; + opts.set_parsed_options("keep_log_file_num=1")?; + opts.set_max_open_files(config.max_open_files); + opts.create_if_missing(true); + opts.set_use_fsync(false); + opts.set_db_write_buffer_size(DB_WRITE_BUFFER_SIZE); + + opts.set_max_background_flushes(DB_BACKGROUND_FLUSHES); + opts.set_max_background_compactions(DB_BACKGROUND_COMPACTIONS); + + // compaction settings + opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + opts.set_target_file_size_base(config.compaction.initial_file_size); + opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); + + let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize); + let cfnames: Vec<_> = (0..config.columns.unwrap_or(0)).map(|c| format!("col{}", c)).collect(); + let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); + + for col in 0 .. config.columns.unwrap_or(0) { + cf_options.push(col_config(col, &config)); + } + + let mut write_opts = WriteOptions::new(); + if !config.wal { + write_opts.disable_wal(true); + } + let mut read_opts = ReadOptions::new(); + read_opts.set_verify_checksums(false); + + let mut cfs: Vec = Vec::new(); + let db = match config.columns { + Some(columns) => { + match DB::open_cf(&opts, path, &cfnames, &cf_options) { + Ok(db) => { + cfs = cfnames.iter().map(|n| db.cf_handle(n) + .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); + assert!(cfs.len() == columns as usize); + Ok(db) + } + Err(_) => { + // retry and create CFs + match DB::open_cf(&opts, path, &[], &[]) { + Ok(mut db) => { + cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::>()?; + Ok(db) + }, + err @ Err(_) => err, + } + } + } + }, + None => DB::open(&opts, path) + }; + + let db = match db { + Ok(db) => db, + Err(ref s) if s.starts_with("Corruption:") => { + info!("{}", s); + info!("Attempting DB repair for {}", path); + DB::repair(&opts, path)?; + + match cfnames.is_empty() { + true => DB::open(&opts, path)?, + false => DB::open_cf(&opts, path, &cfnames, &cf_options)? + } + }, + Err(s) => { return Err(s); } + }; + let num_cols = cfs.len(); + Ok(Database { + db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })), + config: config.clone(), + write_opts: write_opts, + overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + flushing_lock: Mutex::new((false)), + path: path.to_owned(), + read_opts: read_opts, + }) + } + + /// Helper to create new transaction for this database. + pub fn transaction(&self) -> DBTransaction { + DBTransaction::new() + } + + + fn to_overlay_column(col: Option) -> usize { + col.map_or(0, |c| (c + 1) as usize) + } + + /// Commit transaction to database. + pub fn write_buffered(&self, tr: DBTransaction) { + let mut overlay = self.overlay.write(); + let ops = tr.ops; + for op in ops { + match op { + DBOp::Insert { col, key, value } => { + let c = Self::to_overlay_column(col); + overlay[c].insert(key, KeyState::Insert(value)); + }, + DBOp::InsertCompressed { col, key, value } => { + let c = Self::to_overlay_column(col); + overlay[c].insert(key, KeyState::InsertCompressed(value)); + }, + DBOp::Delete { col, key } => { + let c = Self::to_overlay_column(col); + overlay[c].insert(key, KeyState::Delete); + }, + } + }; + } + + /// Commit buffered changes to database. Must be called under `flush_lock` + fn write_flushing_with_lock(&self, _lock: &mut MutexGuard) -> Result<(), String> { + match *self.db.read() { + Some(DBAndColumns { ref db, ref cfs }) => { + let batch = WriteBatch::new(); + mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); + { + for (c, column) in self.flushing.read().iter().enumerate() { + for (ref key, ref state) in column.iter() { + match **state { + KeyState::Delete => { + if c > 0 { + batch.delete_cf(cfs[c - 1], &key)?; + } else { + batch.delete(&key)?; + } + }, + KeyState::Insert(ref value) => { + if c > 0 { + batch.put_cf(cfs[c - 1], &key, value)?; + } else { + batch.put(&key, &value)?; + } + }, + KeyState::InsertCompressed(ref value) => { + let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); + if c > 0 { + batch.put_cf(cfs[c - 1], &key, &compressed)?; + } else { + batch.put(&key, &value)?; + } + } + } + } + } + } + db.write_opt(batch, &self.write_opts)?; + for column in self.flushing.write().iter_mut() { + column.clear(); + column.shrink_to_fit(); + } + Ok(()) + }, + None => Err("Database is closed".to_owned()) + } + } + + /// Commit buffered changes to database. + pub fn flush(&self) -> Result<(), String> { + let mut lock = self.flushing_lock.lock(); + // If RocksDB batch allocation fails the thread gets terminated and the lock is released. + // The value inside the lock is used to detect that. + if *lock { + // This can only happen if another flushing thread is terminated unexpectedly. + return Err("Database write failure. Running low on memory perhaps?".to_owned()); + } + *lock = true; + let result = self.write_flushing_with_lock(&mut lock); + *lock = false; + result + } + + /// Commit transaction to database. + pub fn write(&self, tr: DBTransaction) -> Result<(), String> { + match *self.db.read() { + Some(DBAndColumns { ref db, ref cfs }) => { + let batch = WriteBatch::new(); + let ops = tr.ops; + for op in ops { + match op { + DBOp::Insert { col, key, value } => { + col.map_or_else(|| batch.put(&key, &value), |c| batch.put_cf(cfs[c as usize], &key, &value))? + }, + DBOp::InsertCompressed { col, key, value } => { + let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); + col.map_or_else(|| batch.put(&key, &compressed), |c| batch.put_cf(cfs[c as usize], &key, &compressed))? + }, + DBOp::Delete { col, key } => { + col.map_or_else(|| batch.delete(&key), |c| batch.delete_cf(cfs[c as usize], &key))? + }, + } + } + db.write_opt(batch, &self.write_opts) + }, + None => Err("Database is closed".to_owned()) + } + } + + /// Get value by key. + pub fn get(&self, col: Option, key: &[u8]) -> Result, String> { + match *self.db.read() { + Some(DBAndColumns { ref db, ref cfs }) => { + let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; + match overlay.get(key) { + Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + let flushing = &self.flushing.read()[Self::to_overlay_column(col)]; + match flushing.get(key) { + Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + col.map_or_else( + || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), + |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v)))) + }, + } + }, + } + }, + None => Ok(None), + } + } + + /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. + // TODO: support prefix seek for unflushed data + pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + self.iter_from_prefix(col, prefix).and_then(|mut iter| { + match iter.next() { + // TODO: use prefix_same_as_start read option (not availabele in C API currently) + Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None }, + _ => None + } + }) + } + + /// Get database iterator for flushed data. + pub fn iter(&self, col: Option) -> Option { + //TODO: iterate over overlay + match *self.db.read() { + Some(DBAndColumns { ref db, ref cfs }) => { + let iter = col.map_or_else( + || db.iterator_opt(IteratorMode::Start, &self.read_opts), + |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) + .expect("iterator params are valid; qed") + ); + + Some(DatabaseIterator { + iter: iter, + _marker: PhantomData, + }) + }, + None => None, + } + } + + fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option { + match *self.db.read() { + Some(DBAndColumns { ref db, ref cfs }) => { + let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts), + |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts) + .expect("iterator params are valid; qed")); + + Some(DatabaseIterator { + iter: iter, + _marker: PhantomData, + }) + }, + None => None, + } + } + + /// Close the database + fn close(&self) { + *self.db.write() = None; + self.overlay.write().clear(); + self.flushing.write().clear(); + } + + /// Restore the database from a copy at given path. + pub fn restore(&self, new_db: &str) -> Result<(), Error> { + self.close(); + + let mut backup_db = PathBuf::from(&self.path); + backup_db.pop(); + backup_db.push("backup_db"); + + let existed = match fs::rename(&self.path, &backup_db) { + Ok(_) => true, + Err(e) => if let io::ErrorKind::NotFound = e.kind() { + false + } else { + return Err(e.into()); + } + }; + + match fs::rename(&new_db, &self.path) { + Ok(_) => { + // clean up the backup. + if existed { + fs::remove_dir_all(&backup_db)?; + } + } + Err(e) => { + // restore the backup. + if existed { + fs::rename(&backup_db, &self.path)?; + } + return Err(e.into()) + } + } + + // reopen the database and steal handles into self + let db = Self::open(&self.config, &self.path)?; + *self.db.write() = mem::replace(&mut *db.db.write(), None); + *self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new()); + *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); + Ok(()) + } + + /// The number of non-default column families. + pub fn num_columns(&self) -> u32 { + self.db.read().as_ref() + .and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } ) + .map(|n| n as u32) + .unwrap_or(0) + } + + /// Drop a column family. + pub fn drop_column(&self) -> Result<(), String> { + match *self.db.write() { + Some(DBAndColumns { ref mut db, ref mut cfs }) => { + if let Some(col) = cfs.pop() { + let name = format!("col{}", cfs.len()); + drop(col); + db.drop_cf(&name)?; + } + Ok(()) + }, + None => Ok(()), + } + } + + /// Add a column family. + pub fn add_column(&self) -> Result<(), String> { + match *self.db.write() { + Some(DBAndColumns { ref mut db, ref mut cfs }) => { + let col = cfs.len() as u32; + let name = format!("col{}", col); + cfs.push(db.create_cf(&name, &col_config(col, &self.config))?); + Ok(()) + }, + None => Ok(()), + } + } +} + +// duplicate declaration of methods here to avoid trait import in certain existing cases +// at time of addition. +impl KeyValueDB for Database { + fn get(&self, col: Option, key: &[u8]) -> Result, String> { + Database::get(self, col, key) + } + + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + Database::get_by_prefix(self, col, prefix) + } + + fn write_buffered(&self, transaction: DBTransaction) { + Database::write_buffered(self, transaction) + } + + fn write(&self, transaction: DBTransaction) -> Result<(), String> { + Database::write(self, transaction) + } + + fn flush(&self) -> Result<(), String> { + Database::flush(self) + } + + fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { + let unboxed = Database::iter(self, col); + Box::new(unboxed.into_iter().flat_map(|inner| inner)) + } + + fn iter_from_prefix<'a>(&'a self, col: Option, prefix: &'a [u8]) + -> Box, Box<[u8]>)> + 'a> + { + let unboxed = Database::iter_from_prefix(self, col, prefix); + Box::new(unboxed.into_iter().flat_map(|inner| inner)) + } + + fn restore(&self, new_db: &str) -> Result<(), Error> { + Database::restore(self, new_db) + } +} + +impl Drop for Database { + fn drop(&mut self) { + // write all buffered changes if we can. + let _ = self.flush(); + } +} + +#[cfg(test)] +mod tests { + use bigint::hash::H256; + use super::*; + use devtools::*; + use std::str::FromStr; + + fn test_db(config: &DatabaseConfig) { + let path = RandomTempPath::create_dir(); + let db = Database::open(config, path.as_path().to_str().unwrap()).unwrap(); + let key1 = H256::from_str("02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key3 = H256::from_str("01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + + let mut batch = db.transaction(); + batch.put(None, &key1, b"cat"); + batch.put(None, &key2, b"dog"); + db.write(batch).unwrap(); + + assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"cat"); + + let contents: Vec<_> = db.iter(None).into_iter().flat_map(|inner| inner).collect(); + assert_eq!(contents.len(), 2); + assert_eq!(&*contents[0].0, &*key1); + assert_eq!(&*contents[0].1, b"cat"); + assert_eq!(&*contents[1].0, &*key2); + assert_eq!(&*contents[1].1, b"dog"); + + let mut batch = db.transaction(); + batch.delete(None, &key1); + db.write(batch).unwrap(); + + assert!(db.get(None, &key1).unwrap().is_none()); + + let mut batch = db.transaction(); + batch.put(None, &key1, b"cat"); + db.write(batch).unwrap(); + + let mut transaction = db.transaction(); + transaction.put(None, &key3, b"elephant"); + transaction.delete(None, &key1); + db.write(transaction).unwrap(); + assert!(db.get(None, &key1).unwrap().is_none()); + assert_eq!(&*db.get(None, &key3).unwrap().unwrap(), b"elephant"); + + assert_eq!(&*db.get_by_prefix(None, &key3).unwrap(), b"elephant"); + assert_eq!(&*db.get_by_prefix(None, &key2).unwrap(), b"dog"); + + let mut transaction = db.transaction(); + transaction.put(None, &key1, b"horse"); + transaction.delete(None, &key3); + db.write_buffered(transaction); + assert!(db.get(None, &key3).unwrap().is_none()); + assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse"); + + db.flush().unwrap(); + assert!(db.get(None, &key3).unwrap().is_none()); + assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse"); + } + + #[test] + fn kvdb() { + let path = RandomTempPath::create_dir(); + let _ = Database::open_default(path.as_path().to_str().unwrap()).unwrap(); + test_db(&DatabaseConfig::default()); + } + + #[test] + #[cfg(target_os = "linux")] + fn df_to_rotational() { + use std::path::PathBuf; + // Example df output. + let example_df = vec![70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107, 115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101, 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32, 32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57, 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10]; + let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational")); + assert_eq!(rotational_from_df_output(example_df), expected_output); + } + + #[test] + fn add_columns() { + let config = DatabaseConfig::default(); + let config_5 = DatabaseConfig::with_columns(Some(5)); + + let path = RandomTempPath::create_dir(); + + // open empty, add 5. + { + let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 0); + + for i in 0..5 { + db.add_column().unwrap(); + assert_eq!(db.num_columns(), i + 1); + } + } + + // reopen as 5. + { + let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 5); + } + } + + #[test] + fn drop_columns() { + let config = DatabaseConfig::default(); + let config_5 = DatabaseConfig::with_columns(Some(5)); + + let path = RandomTempPath::create_dir(); + + // open 5, remove all. + { + let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 5); + + for i in (0..5).rev() { + db.drop_column().unwrap(); + assert_eq!(db.num_columns(), i); + } + } + + // reopen as 0. + { + let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap(); + assert_eq!(db.num_columns(), 0); + } + } +} diff --git a/util/kvdb/Cargo.toml b/util/kvdb/Cargo.toml index 16c3cb86d..c3418a714 100644 --- a/util/kvdb/Cargo.toml +++ b/util/kvdb/Cargo.toml @@ -4,14 +4,6 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -log = "0.3" -ethcore-bytes = { path = "../bytes" } -ethcore-bigint = { path = "../bigint" } -ethcore-devtools = { path = "../../devtools" } elastic-array = "0.9" -hashdb = { path = "../hashdb" } -parking_lot = "0.4" -regex = "0.2" -rlp = { path = "../rlp" } -rocksdb = { git = "https://github.com/paritytech/rust-rocksdb" } error-chain = "0.11.0-rc.2" +ethcore-bytes = { path = "../bytes" } diff --git a/util/kvdb/src/lib.rs b/util/kvdb/src/lib.rs index d10d663ba..bff09baeb 100644 --- a/util/kvdb/src/lib.rs +++ b/util/kvdb/src/lib.rs @@ -16,48 +16,20 @@ //! Key-Value store abstraction with `RocksDB` backend. -#[macro_use] -extern crate log; #[macro_use] extern crate error_chain; - -extern crate ethcore_bytes as bytes; -extern crate ethcore_bigint as bigint; -extern crate ethcore_devtools as devtools; extern crate elastic_array; -extern crate hashdb; -extern crate parking_lot; -extern crate rlp; -extern crate rocksdb; -extern crate regex; +extern crate ethcore_bytes as bytes; -use std::{mem, fs, io}; -use std::collections::{HashMap, BTreeMap}; -use std::marker::PhantomData; -use std::path::{PathBuf, Path}; -use parking_lot::{Mutex, MutexGuard, RwLock}; - -use elastic_array::*; -use hashdb::DBValue; -use rlp::{UntrustedRlp, RlpType, Compressible}; -use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, - Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache, Column, ReadOptions}; +use std::io; +use elastic_array::{ElasticArray128, ElasticArray32}; use bytes::Bytes; -#[cfg(target_os = "linux")] -use regex::Regex; -#[cfg(target_os = "linux")] -use std::process::Command; -#[cfg(target_os = "linux")] -use std::fs::File; - -const DB_BACKGROUND_FLUSHES: i32 = 2; -const DB_BACKGROUND_COMPACTIONS: i32 = 2; -const DB_WRITE_BUFFER_SIZE: usize = 2048 * 1000; - /// Required length of prefixes. pub const PREFIX_LEN: usize = 12; +pub type DBValue = ElasticArray128; + error_chain! { types { Error, ErrorKind, ResultExt; @@ -71,11 +43,11 @@ error_chain! { /// Write transaction. Batches a sequence of put/delete operations for efficiency. #[derive(Default, Clone, PartialEq)] pub struct DBTransaction { - ops: Vec, + pub ops: Vec, } #[derive(Clone, PartialEq)] -enum DBOp { +pub enum DBOp { Insert { col: Option, key: ElasticArray32, @@ -150,12 +122,6 @@ impl DBTransaction { } } -enum KeyState { - Insert(DBValue), - InsertCompressed(DBValue), - Delete, -} - /// Generic key-value database. /// /// This makes a distinction between "buffered" and "flushed" values. Values which have been @@ -206,847 +172,3 @@ pub trait KeyValueDB: Sync + Send { /// Attempt to replace this database with a new one located at the given path. fn restore(&self, new_db: &str) -> Result<(), Error>; } - -/// A key-value database fulfilling the `KeyValueDB` trait, living in memory. -/// This is generally intended for tests and is not particularly optimized. -pub struct InMemory { - columns: RwLock, BTreeMap, DBValue>>>, -} - -/// Create an in-memory database with the given number of columns. -/// Columns will be indexable by 0..`num_cols` -pub fn in_memory(num_cols: u32) -> InMemory { - let mut cols = HashMap::new(); - cols.insert(None, BTreeMap::new()); - - for idx in 0..num_cols { - cols.insert(Some(idx), BTreeMap::new()); - } - - InMemory { - columns: RwLock::new(cols) - } -} - -impl KeyValueDB for InMemory { - fn get(&self, col: Option, key: &[u8]) -> Result, String> { - let columns = self.columns.read(); - match columns.get(&col) { - None => Err(format!("No such column family: {:?}", col)), - Some(map) => Ok(map.get(key).cloned()), - } - } - - fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - let columns = self.columns.read(); - match columns.get(&col) { - None => None, - Some(map) => - map.iter() - .find(|&(ref k ,_)| k.starts_with(prefix)) - .map(|(_, v)| v.to_vec().into_boxed_slice()) - } - } - - fn write_buffered(&self, transaction: DBTransaction) { - let mut columns = self.columns.write(); - let ops = transaction.ops; - for op in ops { - match op { - DBOp::Insert { col, key, value } => { - if let Some(mut col) = columns.get_mut(&col) { - col.insert(key.into_vec(), value); - } - }, - DBOp::InsertCompressed { col, key, value } => { - if let Some(mut col) = columns.get_mut(&col) { - let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); - let mut value = DBValue::new(); - value.append_slice(&compressed); - col.insert(key.into_vec(), value); - } - }, - DBOp::Delete { col, key } => { - if let Some(mut col) = columns.get_mut(&col) { - col.remove(&*key); - } - }, - } - } - } - - fn flush(&self) -> Result<(), String> { Ok(()) } - fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { - match self.columns.read().get(&col) { - Some(map) => Box::new( // TODO: worth optimizing at all? - map.clone() - .into_iter() - .map(|(k, v)| (k.into_boxed_slice(), v.into_vec().into_boxed_slice())) - ), - None => Box::new(None.into_iter()), - } - } - - fn iter_from_prefix<'a>(&'a self, col: Option, prefix: &'a [u8]) - -> Box, Box<[u8]>)> + 'a> - { - match self.columns.read().get(&col) { - Some(map) => Box::new( - map.clone() - .into_iter() - .skip_while(move |&(ref k, _)| !k.starts_with(prefix)) - .map(|(k, v)| (k.into_boxed_slice(), v.into_vec().into_boxed_slice())) - ), - None => Box::new(None.into_iter()), - } - } - - fn restore(&self, _new_db: &str) -> Result<(), Error> { - Err("Attempted to restore in-memory database".into()) - } -} - -/// Compaction profile for the database settings -#[derive(Clone, Copy, PartialEq, Debug)] -pub struct CompactionProfile { - /// L0-L1 target file size - pub initial_file_size: u64, - /// L2-LN target file size multiplier - pub file_size_multiplier: i32, - /// rate limiter for background flushes and compactions, bytes/sec, if any - pub write_rate_limit: Option, -} - -impl Default for CompactionProfile { - /// Default profile suitable for most storage - fn default() -> CompactionProfile { - CompactionProfile::ssd() - } -} - -/// Given output of df command return Linux rotational flag file path. -#[cfg(target_os = "linux")] -pub fn rotational_from_df_output(df_out: Vec) -> Option { - use std::str; - str::from_utf8(df_out.as_slice()) - .ok() - // Get the drive name. - .and_then(|df_str| Regex::new(r"/dev/(sd[:alpha:]{1,2})") - .ok() - .and_then(|re| re.captures(df_str)) - .and_then(|captures| captures.get(1))) - // Generate path e.g. /sys/block/sda/queue/rotational - .map(|drive_path| { - let mut p = PathBuf::from("/sys/block"); - p.push(drive_path.as_str()); - p.push("queue/rotational"); - p - }) -} - -impl CompactionProfile { - /// Attempt to determine the best profile automatically, only Linux for now. - #[cfg(target_os = "linux")] - pub fn auto(db_path: &Path) -> CompactionProfile { - use std::io::Read; - let hdd_check_file = db_path - .to_str() - .and_then(|path_str| Command::new("df").arg(path_str).output().ok()) - .and_then(|df_res| match df_res.status.success() { - true => Some(df_res.stdout), - false => None, - }) - .and_then(rotational_from_df_output); - // Read out the file and match compaction profile. - if let Some(hdd_check) = hdd_check_file { - if let Ok(mut file) = File::open(hdd_check.as_path()) { - let mut buffer = [0; 1]; - if file.read_exact(&mut buffer).is_ok() { - // 0 means not rotational. - if buffer == [48] { return Self::ssd(); } - // 1 means rotational. - if buffer == [49] { return Self::hdd(); } - } - } - } - // Fallback if drive type was not determined. - Self::default() - } - - /// Just default for other platforms. - #[cfg(not(target_os = "linux"))] - pub fn auto(_db_path: &Path) -> CompactionProfile { - Self::default() - } - - /// Default profile suitable for SSD storage - pub fn ssd() -> CompactionProfile { - CompactionProfile { - initial_file_size: 32 * 1024 * 1024, - file_size_multiplier: 2, - write_rate_limit: None, - } - } - - /// Slow HDD compaction profile - pub fn hdd() -> CompactionProfile { - CompactionProfile { - initial_file_size: 192 * 1024 * 1024, - file_size_multiplier: 1, - write_rate_limit: Some(8 * 1024 * 1024), - } - } -} - -/// Database configuration -#[derive(Clone)] -pub struct DatabaseConfig { - /// Max number of open files. - pub max_open_files: i32, - /// Cache sizes (in MiB) for specific columns. - pub cache_sizes: HashMap, usize>, - /// Compaction profile - pub compaction: CompactionProfile, - /// Set number of columns - pub columns: Option, - /// Should we keep WAL enabled? - pub wal: bool, -} - -impl DatabaseConfig { - /// Create new `DatabaseConfig` with default parameters and specified set of columns. - /// Note that cache sizes must be explicitly set. - pub fn with_columns(columns: Option) -> Self { - let mut config = Self::default(); - config.columns = columns; - config - } - - /// Set the column cache size in MiB. - pub fn set_cache(&mut self, col: Option, size: usize) { - self.cache_sizes.insert(col, size); - } -} - -impl Default for DatabaseConfig { - fn default() -> DatabaseConfig { - DatabaseConfig { - cache_sizes: HashMap::new(), - max_open_files: 512, - compaction: CompactionProfile::default(), - columns: None, - wal: true, - } - } -} - -/// Database iterator (for flushed data only) -// The compromise of holding only a virtual borrow vs. holding a lock on the -// inner DB (to prevent closing via restoration) may be re-evaluated in the future. -// -pub struct DatabaseIterator<'a> { - iter: DBIterator, - _marker: PhantomData<&'a Database>, -} - -impl<'a> Iterator for DatabaseIterator<'a> { - type Item = (Box<[u8]>, Box<[u8]>); - - fn next(&mut self) -> Option { - self.iter.next() - } -} - -struct DBAndColumns { - db: DB, - cfs: Vec, -} - -// get column family configuration from database config. -fn col_config(col: u32, config: &DatabaseConfig) -> Options { - // default cache size for columns not specified. - const DEFAULT_CACHE: usize = 2; - - let mut opts = Options::new(); - opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); - opts.set_target_file_size_base(config.compaction.initial_file_size); - opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); - opts.set_db_write_buffer_size(DB_WRITE_BUFFER_SIZE); - - let col_opt = config.columns.map(|_| col); - - { - let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE); - let mut block_opts = BlockBasedOptions::new(); - // all goes to read cache. - block_opts.set_cache(Cache::new(cache_size * 1024 * 1024)); - opts.set_block_based_table_factory(&block_opts); - } - - opts -} - -/// Key-Value database. -pub struct Database { - db: RwLock>, - config: DatabaseConfig, - write_opts: WriteOptions, - read_opts: ReadOptions, - path: String, - // Dirty values added with `write_buffered`. Cleaned on `flush`. - overlay: RwLock, KeyState>>>, - // Values currently being flushed. Cleared when `flush` completes. - flushing: RwLock, KeyState>>>, - // Prevents concurrent flushes. - // Value indicates if a flush is in progress. - flushing_lock: Mutex, -} - -impl Database { - /// Open database with default settings. - pub fn open_default(path: &str) -> Result { - Database::open(&DatabaseConfig::default(), path) - } - - /// Open database file. Creates if it does not exist. - pub fn open(config: &DatabaseConfig, path: &str) -> Result { - let mut opts = Options::new(); - if let Some(rate_limit) = config.compaction.write_rate_limit { - opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?; - } - opts.set_parsed_options(&format!("max_total_wal_size={}", 64 * 1024 * 1024))?; - opts.set_parsed_options("verify_checksums_in_compaction=0")?; - opts.set_parsed_options("keep_log_file_num=1")?; - opts.set_max_open_files(config.max_open_files); - opts.create_if_missing(true); - opts.set_use_fsync(false); - opts.set_db_write_buffer_size(DB_WRITE_BUFFER_SIZE); - - opts.set_max_background_flushes(DB_BACKGROUND_FLUSHES); - opts.set_max_background_compactions(DB_BACKGROUND_COMPACTIONS); - - // compaction settings - opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); - opts.set_target_file_size_base(config.compaction.initial_file_size); - opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier); - - let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize); - let cfnames: Vec<_> = (0..config.columns.unwrap_or(0)).map(|c| format!("col{}", c)).collect(); - let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); - - for col in 0 .. config.columns.unwrap_or(0) { - cf_options.push(col_config(col, &config)); - } - - let mut write_opts = WriteOptions::new(); - if !config.wal { - write_opts.disable_wal(true); - } - let mut read_opts = ReadOptions::new(); - read_opts.set_verify_checksums(false); - - let mut cfs: Vec = Vec::new(); - let db = match config.columns { - Some(columns) => { - match DB::open_cf(&opts, path, &cfnames, &cf_options) { - Ok(db) => { - cfs = cfnames.iter().map(|n| db.cf_handle(n) - .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); - assert!(cfs.len() == columns as usize); - Ok(db) - } - Err(_) => { - // retry and create CFs - match DB::open_cf(&opts, path, &[], &[]) { - Ok(mut db) => { - cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::>()?; - Ok(db) - }, - err @ Err(_) => err, - } - } - } - }, - None => DB::open(&opts, path) - }; - - let db = match db { - Ok(db) => db, - Err(ref s) if s.starts_with("Corruption:") => { - info!("{}", s); - info!("Attempting DB repair for {}", path); - DB::repair(&opts, path)?; - - match cfnames.is_empty() { - true => DB::open(&opts, path)?, - false => DB::open_cf(&opts, path, &cfnames, &cf_options)? - } - }, - Err(s) => { return Err(s); } - }; - let num_cols = cfs.len(); - Ok(Database { - db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })), - config: config.clone(), - write_opts: write_opts, - overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing_lock: Mutex::new((false)), - path: path.to_owned(), - read_opts: read_opts, - }) - } - - /// Helper to create new transaction for this database. - pub fn transaction(&self) -> DBTransaction { - DBTransaction::new() - } - - - fn to_overlay_column(col: Option) -> usize { - col.map_or(0, |c| (c + 1) as usize) - } - - /// Commit transaction to database. - pub fn write_buffered(&self, tr: DBTransaction) { - let mut overlay = self.overlay.write(); - let ops = tr.ops; - for op in ops { - match op { - DBOp::Insert { col, key, value } => { - let c = Self::to_overlay_column(col); - overlay[c].insert(key, KeyState::Insert(value)); - }, - DBOp::InsertCompressed { col, key, value } => { - let c = Self::to_overlay_column(col); - overlay[c].insert(key, KeyState::InsertCompressed(value)); - }, - DBOp::Delete { col, key } => { - let c = Self::to_overlay_column(col); - overlay[c].insert(key, KeyState::Delete); - }, - } - }; - } - - /// Commit buffered changes to database. Must be called under `flush_lock` - fn write_flushing_with_lock(&self, _lock: &mut MutexGuard) -> Result<(), String> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); - mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); - { - for (c, column) in self.flushing.read().iter().enumerate() { - for (ref key, ref state) in column.iter() { - match **state { - KeyState::Delete => { - if c > 0 { - batch.delete_cf(cfs[c - 1], &key)?; - } else { - batch.delete(&key)?; - } - }, - KeyState::Insert(ref value) => { - if c > 0 { - batch.put_cf(cfs[c - 1], &key, value)?; - } else { - batch.put(&key, &value)?; - } - }, - KeyState::InsertCompressed(ref value) => { - let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); - if c > 0 { - batch.put_cf(cfs[c - 1], &key, &compressed)?; - } else { - batch.put(&key, &value)?; - } - } - } - } - } - } - db.write_opt(batch, &self.write_opts)?; - for column in self.flushing.write().iter_mut() { - column.clear(); - column.shrink_to_fit(); - } - Ok(()) - }, - None => Err("Database is closed".to_owned()) - } - } - - /// Commit buffered changes to database. - pub fn flush(&self) -> Result<(), String> { - let mut lock = self.flushing_lock.lock(); - // If RocksDB batch allocation fails the thread gets terminated and the lock is released. - // The value inside the lock is used to detect that. - if *lock { - // This can only happen if another flushing thread is terminated unexpectedly. - return Err("Database write failure. Running low on memory perhaps?".to_owned()); - } - *lock = true; - let result = self.write_flushing_with_lock(&mut lock); - *lock = false; - result - } - - /// Commit transaction to database. - pub fn write(&self, tr: DBTransaction) -> Result<(), String> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); - let ops = tr.ops; - for op in ops { - match op { - DBOp::Insert { col, key, value } => { - col.map_or_else(|| batch.put(&key, &value), |c| batch.put_cf(cfs[c as usize], &key, &value))? - }, - DBOp::InsertCompressed { col, key, value } => { - let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); - col.map_or_else(|| batch.put(&key, &compressed), |c| batch.put_cf(cfs[c as usize], &key, &compressed))? - }, - DBOp::Delete { col, key } => { - col.map_or_else(|| batch.delete(&key), |c| batch.delete_cf(cfs[c as usize], &key))? - }, - } - } - db.write_opt(batch, &self.write_opts) - }, - None => Err("Database is closed".to_owned()) - } - } - - /// Get value by key. - pub fn get(&self, col: Option, key: &[u8]) -> Result, String> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; - match overlay.get(key) { - Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), - Some(&KeyState::Delete) => Ok(None), - None => { - let flushing = &self.flushing.read()[Self::to_overlay_column(col)]; - match flushing.get(key) { - Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), - Some(&KeyState::Delete) => Ok(None), - None => { - col.map_or_else( - || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), - |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v)))) - }, - } - }, - } - }, - None => Ok(None), - } - } - - /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. - // TODO: support prefix seek for unflushed data - pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - self.iter_from_prefix(col, prefix).and_then(|mut iter| { - match iter.next() { - // TODO: use prefix_same_as_start read option (not availabele in C API currently) - Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None }, - _ => None - } - }) - } - - /// Get database iterator for flushed data. - pub fn iter(&self, col: Option) -> Option { - //TODO: iterate over overlay - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else( - || db.iterator_opt(IteratorMode::Start, &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) - .expect("iterator params are valid; qed") - ); - - Some(DatabaseIterator { - iter: iter, - _marker: PhantomData, - }) - }, - None => None, - } - } - - fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts) - .expect("iterator params are valid; qed")); - - Some(DatabaseIterator { - iter: iter, - _marker: PhantomData, - }) - }, - None => None, - } - } - - /// Close the database - fn close(&self) { - *self.db.write() = None; - self.overlay.write().clear(); - self.flushing.write().clear(); - } - - /// Restore the database from a copy at given path. - pub fn restore(&self, new_db: &str) -> Result<(), Error> { - self.close(); - - let mut backup_db = PathBuf::from(&self.path); - backup_db.pop(); - backup_db.push("backup_db"); - - let existed = match fs::rename(&self.path, &backup_db) { - Ok(_) => true, - Err(e) => if let io::ErrorKind::NotFound = e.kind() { - false - } else { - return Err(e.into()); - } - }; - - match fs::rename(&new_db, &self.path) { - Ok(_) => { - // clean up the backup. - if existed { - fs::remove_dir_all(&backup_db)?; - } - } - Err(e) => { - // restore the backup. - if existed { - fs::rename(&backup_db, &self.path)?; - } - return Err(e.into()) - } - } - - // reopen the database and steal handles into self - let db = Self::open(&self.config, &self.path)?; - *self.db.write() = mem::replace(&mut *db.db.write(), None); - *self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new()); - *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); - Ok(()) - } - - /// The number of non-default column families. - pub fn num_columns(&self) -> u32 { - self.db.read().as_ref() - .and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } ) - .map(|n| n as u32) - .unwrap_or(0) - } - - /// Drop a column family. - pub fn drop_column(&self) -> Result<(), String> { - match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - if let Some(col) = cfs.pop() { - let name = format!("col{}", cfs.len()); - drop(col); - db.drop_cf(&name)?; - } - Ok(()) - }, - None => Ok(()), - } - } - - /// Add a column family. - pub fn add_column(&self) -> Result<(), String> { - match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - let col = cfs.len() as u32; - let name = format!("col{}", col); - cfs.push(db.create_cf(&name, &col_config(col, &self.config))?); - Ok(()) - }, - None => Ok(()), - } - } -} - -// duplicate declaration of methods here to avoid trait import in certain existing cases -// at time of addition. -impl KeyValueDB for Database { - fn get(&self, col: Option, key: &[u8]) -> Result, String> { - Database::get(self, col, key) - } - - fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - Database::get_by_prefix(self, col, prefix) - } - - fn write_buffered(&self, transaction: DBTransaction) { - Database::write_buffered(self, transaction) - } - - fn write(&self, transaction: DBTransaction) -> Result<(), String> { - Database::write(self, transaction) - } - - fn flush(&self) -> Result<(), String> { - Database::flush(self) - } - - fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { - let unboxed = Database::iter(self, col); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) - } - - fn iter_from_prefix<'a>(&'a self, col: Option, prefix: &'a [u8]) - -> Box, Box<[u8]>)> + 'a> - { - let unboxed = Database::iter_from_prefix(self, col, prefix); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) - } - - fn restore(&self, new_db: &str) -> Result<(), Error> { - Database::restore(self, new_db) - } -} - -impl Drop for Database { - fn drop(&mut self) { - // write all buffered changes if we can. - let _ = self.flush(); - } -} - -#[cfg(test)] -mod tests { - use bigint::hash::H256; - use super::*; - use devtools::*; - use std::str::FromStr; - - fn test_db(config: &DatabaseConfig) { - let path = RandomTempPath::create_dir(); - let db = Database::open(config, path.as_path().to_str().unwrap()).unwrap(); - let key1 = H256::from_str("02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key3 = H256::from_str("01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - - let mut batch = db.transaction(); - batch.put(None, &key1, b"cat"); - batch.put(None, &key2, b"dog"); - db.write(batch).unwrap(); - - assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"cat"); - - let contents: Vec<_> = db.iter(None).into_iter().flat_map(|inner| inner).collect(); - assert_eq!(contents.len(), 2); - assert_eq!(&*contents[0].0, &*key1); - assert_eq!(&*contents[0].1, b"cat"); - assert_eq!(&*contents[1].0, &*key2); - assert_eq!(&*contents[1].1, b"dog"); - - let mut batch = db.transaction(); - batch.delete(None, &key1); - db.write(batch).unwrap(); - - assert!(db.get(None, &key1).unwrap().is_none()); - - let mut batch = db.transaction(); - batch.put(None, &key1, b"cat"); - db.write(batch).unwrap(); - - let mut transaction = db.transaction(); - transaction.put(None, &key3, b"elephant"); - transaction.delete(None, &key1); - db.write(transaction).unwrap(); - assert!(db.get(None, &key1).unwrap().is_none()); - assert_eq!(&*db.get(None, &key3).unwrap().unwrap(), b"elephant"); - - assert_eq!(&*db.get_by_prefix(None, &key3).unwrap(), b"elephant"); - assert_eq!(&*db.get_by_prefix(None, &key2).unwrap(), b"dog"); - - let mut transaction = db.transaction(); - transaction.put(None, &key1, b"horse"); - transaction.delete(None, &key3); - db.write_buffered(transaction); - assert!(db.get(None, &key3).unwrap().is_none()); - assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse"); - - db.flush().unwrap(); - assert!(db.get(None, &key3).unwrap().is_none()); - assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"horse"); - } - - #[test] - fn kvdb() { - let path = RandomTempPath::create_dir(); - let _ = Database::open_default(path.as_path().to_str().unwrap()).unwrap(); - test_db(&DatabaseConfig::default()); - } - - #[test] - #[cfg(target_os = "linux")] - fn df_to_rotational() { - use std::path::PathBuf; - // Example df output. - let example_df = vec![70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107, 115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101, 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32, 32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57, 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10]; - let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational")); - assert_eq!(rotational_from_df_output(example_df), expected_output); - } - - #[test] - fn add_columns() { - let config = DatabaseConfig::default(); - let config_5 = DatabaseConfig::with_columns(Some(5)); - - let path = RandomTempPath::create_dir(); - - // open empty, add 5. - { - let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap(); - assert_eq!(db.num_columns(), 0); - - for i in 0..5 { - db.add_column().unwrap(); - assert_eq!(db.num_columns(), i + 1); - } - } - - // reopen as 5. - { - let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap(); - assert_eq!(db.num_columns(), 5); - } - } - - #[test] - fn drop_columns() { - let config = DatabaseConfig::default(); - let config_5 = DatabaseConfig::with_columns(Some(5)); - - let path = RandomTempPath::create_dir(); - - // open 5, remove all. - { - let db = Database::open(&config_5, path.as_path().to_str().unwrap()).unwrap(); - assert_eq!(db.num_columns(), 5); - - for i in (0..5).rev() { - db.drop_column().unwrap(); - assert_eq!(db.num_columns(), i); - } - } - - // reopen as 0. - { - let db = Database::open(&config, path.as_path().to_str().unwrap()).unwrap(); - assert_eq!(db.num_columns(), 0); - } - } -} diff --git a/util/migration/Cargo.toml b/util/migration/Cargo.toml index 927ea232d..d16264198 100644 --- a/util/migration/Cargo.toml +++ b/util/migration/Cargo.toml @@ -7,4 +7,5 @@ authors = ["Parity Technologies "] log = "0.3" macros = { path = "../macros" } kvdb = { path = "../kvdb" } +kvdb-rocksdb = { path = "../kvdb-rocksdb" } ethcore-devtools = { path = "../../devtools" } diff --git a/util/migration/src/lib.rs b/util/migration/src/lib.rs index e854c12f2..c29682124 100644 --- a/util/migration/src/lib.rs +++ b/util/migration/src/lib.rs @@ -25,14 +25,15 @@ extern crate macros; extern crate ethcore_devtools as devtools; extern crate kvdb; +extern crate kvdb_rocksdb; use std::collections::BTreeMap; -use std::fs; -use std::fmt; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::{fs, fmt}; -use kvdb::{CompactionProfile, Database, DatabaseConfig, DBTransaction}; +use kvdb::DBTransaction; +use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig}; /// Migration config. #[derive(Clone)] diff --git a/util/migration/src/tests.rs b/util/migration/src/tests.rs index 6445d58f7..1f712262f 100644 --- a/util/migration/src/tests.rs +++ b/util/migration/src/tests.rs @@ -22,7 +22,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::path::{Path, PathBuf}; use {Batch, Config, Error, SimpleMigration, Migration, Manager, ChangeColumns}; -use kvdb::Database; +use kvdb_rocksdb::Database; use devtools::RandomTempPath; fn db_path(path: &Path) -> PathBuf { @@ -229,7 +229,7 @@ fn pre_columns() { #[test] fn change_columns() { - use kvdb::DatabaseConfig; + use kvdb_rocksdb::DatabaseConfig; let mut manager = Manager::new(Config::default()); manager.add_migration(ChangeColumns { diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs index 52f80a39f..1ea8e8559 100644 --- a/util/src/journaldb/archivedb.rs +++ b/util/src/journaldb/archivedb.rs @@ -58,7 +58,7 @@ impl ArchiveDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] fn new_temp() -> ArchiveDB { - let backing = Arc::new(::kvdb::in_memory(0)); + let backing = Arc::new(::kvdb_memorydb::in_memory(0)); Self::new(backing, None) } @@ -211,7 +211,7 @@ mod tests { use hashdb::{HashDB, DBValue}; use super::*; use journaldb::traits::JournalDB; - use kvdb::Database; + use kvdb_rocksdb::Database; use bigint::hash::H32; #[test] diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index a21a6eedb..025955839 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -143,7 +143,7 @@ impl EarlyMergeDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] fn new_temp() -> EarlyMergeDB { - let backing = Arc::new(::kvdb::in_memory(0)); + let backing = Arc::new(::kvdb_memorydb::in_memory(0)); Self::new(backing, None) } @@ -560,7 +560,7 @@ mod tests { use super::*; use super::super::traits::JournalDB; use ethcore_logger::init_log; - use kvdb::{DatabaseConfig}; + use kvdb_rocksdb::{DatabaseConfig}; use bigint::hash::H32; #[test] @@ -820,7 +820,7 @@ mod tests { fn new_db(path: &Path) -> EarlyMergeDB { let config = DatabaseConfig::with_columns(Some(1)); - let backing = Arc::new(::kvdb::Database::open(&config, path.to_str().unwrap()).unwrap()); + let backing = Arc::new(::kvdb_rocksdb::Database::open(&config, path.to_str().unwrap()).unwrap()); EarlyMergeDB::new(backing, Some(0)) } diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 687333d67..7ffe4cf70 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -120,7 +120,7 @@ impl OverlayRecentDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] pub fn new_temp() -> OverlayRecentDB { - let backing = Arc::new(::kvdb::in_memory(0)); + let backing = Arc::new(::kvdb_memorydb::in_memory(0)); Self::new(backing, None) } @@ -468,7 +468,7 @@ mod tests { use hashdb::{HashDB, DBValue}; use ethcore_logger::init_log; use journaldb::JournalDB; - use kvdb::Database; + use kvdb_rocksdb::Database; use bigint::hash::H32; fn new_db(path: &Path) -> OverlayRecentDB { diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index eeca11085..bcb41723e 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -79,7 +79,7 @@ impl RefCountedDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] fn new_temp() -> RefCountedDB { - let backing = Arc::new(::kvdb::in_memory(0)); + let backing = Arc::new(::kvdb_memorydb::in_memory(0)); Self::new(backing, None) } } diff --git a/util/src/lib.rs b/util/src/lib.rs index 863f811c4..aac5024e0 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -110,6 +110,11 @@ extern crate patricia_trie as trie; extern crate kvdb; extern crate util_error as error; +#[cfg(test)] +extern crate kvdb_memorydb; +#[cfg(test)] +extern crate kvdb_rocksdb; + #[macro_use] extern crate log as rlog; diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index b4c0beb25..d6f6b102e 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -50,7 +50,7 @@ impl OverlayDB { /// Create a new instance of OverlayDB with an anonymous temporary database. #[cfg(test)] pub fn new_temp() -> OverlayDB { - let backing = Arc::new(::kvdb::in_memory(0)); + let backing = Arc::new(::kvdb_memorydb::in_memory(0)); Self::new(backing, None) }