simplify kvdb error types (#8924)

This commit is contained in:
Marek Kotewicz 2018-07-02 05:04:48 -04:00 committed by André Silva
parent 67721f3413
commit 5ef41ed53e
21 changed files with 125 additions and 189 deletions

3
Cargo.lock generated
View File

@ -1512,7 +1512,6 @@ name = "kvdb"
version = "0.1.0"
dependencies = [
"elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-bytes 0.1.0",
]
@ -1694,7 +1693,6 @@ dependencies = [
name = "migration-rocksdb"
version = "0.1.0"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb 0.1.0",
"kvdb-rocksdb 0.1.0",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3617,7 +3615,6 @@ version = "0.1.0"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb 0.1.0",
"rlp 0.2.1",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

View File

@ -31,7 +31,7 @@ use std::sync::Arc;
use cht;
use ethcore::block_status::BlockStatus;
use ethcore::error::{Error, ErrorKind, BlockImportError, BlockImportErrorKind, BlockError};
use ethcore::error::{Error, BlockImportError, BlockImportErrorKind, BlockError};
use ethcore::encoded;
use ethcore::header::Header;
use ethcore::ids::BlockId;
@ -260,7 +260,7 @@ impl HeaderChain {
let best_block = {
let era = match candidates.get(&curr.best_num) {
Some(era) => era,
None => bail!(ErrorKind::Database("Database corrupt: highest block referenced but no data.".into())),
None => bail!("Database corrupt: highest block referenced but no data."),
};
let best = &era.candidates[0];
@ -583,7 +583,7 @@ impl HeaderChain {
} else {
let msg = format!("header of block #{} not found in DB ; database in an \
inconsistent state", h_num);
bail!(ErrorKind::Database(msg.into()));
bail!(msg);
};
let decoded = header.decode().expect("decoding db value failed");
@ -591,9 +591,8 @@ impl HeaderChain {
let entry: Entry = {
let bytes = self.db.get(self.col, era_key(h_num).as_bytes())?
.ok_or_else(|| {
let msg = format!("entry for era #{} not found in DB ; database \
in an inconsistent state", h_num);
ErrorKind::Database(msg.into())
format!("entry for era #{} not found in DB ; database \
in an inconsistent state", h_num)
})?;
::rlp::decode(&bytes).expect("decoding db value failed")
};
@ -601,9 +600,8 @@ impl HeaderChain {
let total_difficulty = entry.candidates.iter()
.find(|c| c.hash == decoded.hash())
.ok_or_else(|| {
let msg = "no candidate matching block found in DB ; database in an \
inconsistent state";
ErrorKind::Database(msg.into())
"no candidate matching block found in DB ; database in an \
inconsistent state"
})?
.total_difficulty;

View File

@ -17,8 +17,9 @@
//! Blockchain database.
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use std::mem;
use std::{mem, io};
use itertools::Itertools;
use blooms_db;
use heapsize::HeapSizeOf;
@ -47,8 +48,6 @@ use engines::epoch::{Transition as EpochTransition, PendingTransition as Pending
use rayon::prelude::*;
use ansi_term::Colour;
use kvdb::{DBTransaction, KeyValueDB};
use error::Error;
use std::path::Path;
/// Database backing `BlockChain`.
pub trait BlockChainDB: Send + Sync {
@ -66,7 +65,7 @@ pub trait BlockChainDB: Send + Sync {
/// predefined config.
pub trait BlockChainDBHandler: Send + Sync {
/// Open the predefined key-value database.
fn open(&self, path: &Path) -> Result<Arc<BlockChainDB>, Error>;
fn open(&self, path: &Path) -> io::Result<Arc<BlockChainDB>>;
}
/// Interface for querying blocks by hash and by number.

View File

@ -718,7 +718,7 @@ impl Client {
state_db = spec.ensure_db_good(state_db, &factories)?;
let mut batch = DBTransaction::new();
state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash())?;
db.key_value().write(batch).map_err(ClientError::Database)?;
db.key_value().write(batch)?;
}
let gb = spec.genesis_block();
@ -821,7 +821,7 @@ impl Client {
}
// ensure buffered changes are flushed.
client.db.read().key_value().flush().map_err(ClientError::Database)?;
client.db.read().key_value().flush()?;
Ok(client)
}

View File

@ -16,7 +16,6 @@
use std::fmt::{Display, Formatter, Error as FmtError};
use util_error::UtilError;
use kvdb;
use trie::TrieError;
/// Client configuration errors.
@ -24,8 +23,6 @@ use trie::TrieError;
pub enum Error {
/// TrieDB-related error.
Trie(TrieError),
/// Database error
Database(kvdb::Error),
/// Util error
Util(UtilError),
}
@ -53,7 +50,6 @@ impl Display for Error {
match *self {
Error::Trie(ref err) => write!(f, "{}", err),
Error::Util(ref err) => write!(f, "{}", err),
Error::Database(ref s) => write!(f, "Database error: {}", s),
}
}
}

View File

@ -35,8 +35,6 @@ pub enum EvmTestError {
Evm(vm::Error),
/// Initialization error.
ClientError(::error::Error),
/// Low-level database error.
Database(kvdb::Error),
/// Post-condition failure,
PostCondition(String),
}
@ -55,7 +53,6 @@ impl fmt::Display for EvmTestError {
Trie(ref err) => write!(fmt, "Trie: {}", err),
Evm(ref err) => write!(fmt, "EVM: {}", err),
ClientError(ref err) => write!(fmt, "{}", err),
Database(ref err) => write!(fmt, "DB: {}", err),
PostCondition(ref err) => write!(fmt, "{}", err),
}
}
@ -135,7 +132,7 @@ impl<'a> EvmTestClient<'a> {
{
let mut batch = kvdb::DBTransaction::new();
state_db.journal_under(&mut batch, 0, &genesis.hash())?;
db.write(batch).map_err(EvmTestError::Database)?;
db.write(batch)?;
}
state::State::from_existing(

View File

@ -18,7 +18,6 @@
use std::{fmt, error};
use std::time::SystemTime;
use kvdb;
use ethereum_types::{H256, U256, Address, Bloom};
use util_error::{self, UtilError};
use snappy::InvalidInput;
@ -237,11 +236,10 @@ error_chain! {
}
links {
Database(kvdb::Error, kvdb::ErrorKind) #[doc = "Database error."];
Util(UtilError, util_error::ErrorKind) #[doc = "Error concerning a utility"];
Import(ImportError, ImportErrorKind) #[doc = "Error concerning block import." ];
}
foreign_links {
Io(IoError) #[doc = "Io create error"];
StdIo(::std::io::Error) #[doc = "Error concerning the Rust standard library's IO subsystem."];
@ -271,14 +269,14 @@ error_chain! {
AccountProvider(err: AccountsError) {
description("Accounts Provider error")
display("Accounts Provider error {}", err)
}
}
#[doc = "PoW hash is invalid or out of date."]
PowHashInvalid {
description("PoW hash is invalid or out of date.")
display("PoW hash is invalid or out of date.")
}
#[doc = "The value of the nonce or mishash is invalid."]
PowInvalid {
description("The value of the nonce or mishash is invalid.")
@ -311,10 +309,10 @@ impl From<ClientError> for Error {
}
}
impl From<AccountsError> for Error {
fn from(err: AccountsError) -> Error {
impl From<AccountsError> for Error {
fn from(err: AccountsError) -> Error {
ErrorKind::AccountProvider(err).into()
}
}
}
impl From<::rlp::DecoderError> for Error {

View File

@ -17,7 +17,8 @@
//! Set of different helpers for client tests
use std::path::Path;
use std::fs;
use std::sync::Arc;
use std::{fs, io};
use account_provider::AccountProvider;
use ethereum_types::{H256, U256, Address};
use block::{OpenBlock, Drain};
@ -25,7 +26,6 @@ use blockchain::{BlockChain, BlockChainDB, BlockChainDBHandler, Config as BlockC
use bytes::Bytes;
use client::{Client, ClientConfig, ChainInfo, ImportBlock, ChainNotify, ChainMessageType, PrepareOpenBlock};
use ethkey::KeyPair;
use error::Error;
use evm::Factory as EvmFactory;
use factory::Factories;
use hash::keccak;
@ -37,7 +37,6 @@ use rlp::{self, RlpStream};
use spec::Spec;
use state_db::StateDB;
use state::*;
use std::sync::Arc;
use transaction::{Action, Transaction, SignedTransaction};
use views::BlockView;
use blooms_db;
@ -327,7 +326,7 @@ pub fn restoration_db_handler(config: kvdb_rocksdb::DatabaseConfig) -> Box<Block
}
impl BlockChainDBHandler for RestorationDBHandler {
fn open(&self, db_path: &Path) -> Result<Arc<BlockChainDB>, Error> {
fn open(&self, db_path: &Path) -> io::Result<Arc<BlockChainDB>> {
let key_value = Arc::new(kvdb_rocksdb::Database::open(&self.config, &db_path.to_string_lossy())?);
let blooms_path = db_path.join("blooms");
let trace_blooms_path = db_path.join("trace_blooms");

View File

@ -56,8 +56,8 @@ const UPDATE_TIMEOUT: Duration = Duration::from_secs(15 * 60); // once every 15
/// Errors which can occur while using the local data store.
#[derive(Debug)]
pub enum Error {
/// Database errors: these manifest as `String`s.
Database(kvdb::Error),
/// Io and database errors: these manifest as `String`s.
Io(::std::io::Error),
/// JSON errors.
Json(::serde_json::Error),
}
@ -65,7 +65,7 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Database(ref val) => write!(f, "{}", val),
Error::Io(ref val) => write!(f, "{}", val),
Error::Json(ref err) => write!(f, "{}", err),
}
}
@ -160,7 +160,7 @@ pub struct LocalDataStore<T: NodeInfo> {
impl<T: NodeInfo> LocalDataStore<T> {
/// Attempt to read pending transactions out of the local store.
pub fn pending_transactions(&self) -> Result<Vec<PendingTransaction>, Error> {
if let Some(val) = self.db.get(self.col, LOCAL_TRANSACTIONS_KEY).map_err(Error::Database)? {
if let Some(val) = self.db.get(self.col, LOCAL_TRANSACTIONS_KEY).map_err(Error::Io)? {
let local_txs: Vec<_> = ::serde_json::from_slice::<Vec<TransactionEntry>>(&val)
.map_err(Error::Json)?
.into_iter()
@ -200,7 +200,7 @@ impl<T: NodeInfo> LocalDataStore<T> {
let json_str = format!("{}", local_json);
batch.put_vec(self.col, LOCAL_TRANSACTIONS_KEY, json_str.into_bytes());
self.db.write(batch).map_err(Error::Database)
self.db.write(batch).map_err(Error::Io)
}
}

View File

@ -18,7 +18,7 @@ use std::fs;
use std::io::{Read, Write, Error as IoError, ErrorKind};
use std::path::{Path, PathBuf};
use std::fmt::{Display, Formatter, Error as FmtError};
use super::migration_rocksdb::{self, Manager as MigrationManager, Config as MigrationConfig, ChangeColumns};
use super::migration_rocksdb::{Manager as MigrationManager, Config as MigrationConfig, ChangeColumns};
use super::kvdb_rocksdb::{CompactionProfile, DatabaseConfig};
use ethcore::client::DatabaseCompactionProfile;
use ethcore::{self, db};
@ -62,8 +62,6 @@ pub enum Error {
FutureDBVersion,
/// Migration is not possible.
MigrationImpossible,
/// Internal migration error.
Internal(migration_rocksdb::Error),
/// Blooms-db migration error.
BloomsDB(ethcore::error::Error),
/// Migration was completed succesfully,
@ -77,7 +75,6 @@ impl Display for Error {
Error::UnknownDatabaseVersion => "Current database version cannot be read".into(),
Error::FutureDBVersion => "Database was created with newer client version. Upgrade your client or delete DB and resync.".into(),
Error::MigrationImpossible => format!("Database migration to version {} is not possible.", CURRENT_VERSION),
Error::Internal(ref err) => format!("{}", err),
Error::BloomsDB(ref err) => format!("blooms-db migration error: {}", err),
Error::Io(ref err) => format!("Unexpected io error on DB migration: {}.", err),
};
@ -92,15 +89,6 @@ impl From<IoError> for Error {
}
}
impl From<migration_rocksdb::Error> for Error {
fn from(err: migration_rocksdb::Error) -> Self {
match err.into() {
migration_rocksdb::ErrorKind::Io(e) => Error::Io(e),
err => Error::Internal(err.into()),
}
}
}
/// Returns the version file path.
fn version_file_path(path: &Path) -> PathBuf {
let mut file_path = path.to_owned();

View File

@ -17,12 +17,11 @@
extern crate kvdb_rocksdb;
extern crate migration_rocksdb;
use std::fs;
use std::{io, fs};
use std::sync::Arc;
use std::path::Path;
use blooms_db;
use ethcore::{BlockChainDBHandler, BlockChainDB};
use ethcore::error::Error;
use ethcore::db::NUM_COLUMNS;
use ethcore::client::{ClientConfig, DatabaseCompactionProfile};
use kvdb::KeyValueDB;
@ -76,7 +75,7 @@ pub fn restoration_db_handler(client_path: &Path, client_config: &ClientConfig)
}
impl BlockChainDBHandler for RestorationDBHandler {
fn open(&self, db_path: &Path) -> Result<Arc<BlockChainDB>, Error> {
fn open(&self, db_path: &Path) -> io::Result<Arc<BlockChainDB>> {
open_database(&db_path.to_string_lossy(), &self.config)
}
}
@ -87,7 +86,7 @@ pub fn restoration_db_handler(client_path: &Path, client_config: &ClientConfig)
}
/// Open a new main DB.
pub fn open_db(client_path: &str, cache_config: &CacheConfig, compaction: &DatabaseCompactionProfile, wal: bool) -> Result<Arc<BlockChainDB>, Error> {
pub fn open_db(client_path: &str, cache_config: &CacheConfig, compaction: &DatabaseCompactionProfile, wal: bool) -> io::Result<Arc<BlockChainDB>> {
let path = Path::new(client_path);
let db_config = DatabaseConfig {
@ -100,7 +99,7 @@ pub fn open_db(client_path: &str, cache_config: &CacheConfig, compaction: &Datab
open_database(client_path, &db_config)
}
pub fn open_database(client_path: &str, config: &DatabaseConfig) -> Result<Arc<BlockChainDB>, Error> {
pub fn open_database(client_path: &str, config: &DatabaseConfig) -> io::Result<Arc<BlockChainDB>> {
let path = Path::new(client_path);
let blooms_path = path.join("blooms");

View File

@ -18,7 +18,7 @@ use std::fmt;
use std::net;
use std::io::Error as IoError;
use {ethkey, crypto, kvdb};
use {ethkey, crypto};
/// Secret store error.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@ -174,12 +174,6 @@ impl From<ethkey::crypto::Error> for Error {
}
}
impl From<kvdb::Error> for Error {
fn from(err: kvdb::Error) -> Self {
Error::Database(err.to_string())
}
}
impl From<crypto::Error> for Error {
fn from(err: crypto::Error) -> Self {
Error::EthKey(err.to_string())

View File

@ -5,7 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
rlp = { path = "../rlp" }
kvdb = { path = "../kvdb" }
ethereum-types = "0.3"
error-chain = { version = "0.12", default-features = false }
rustc-hex = "1.0"

View File

@ -25,7 +25,6 @@ extern crate error_chain;
extern crate ethereum_types;
extern crate rlp;
extern crate rustc_hex;
extern crate kvdb;
use std::fmt;
use rustc_hex::FromHexError;
@ -63,10 +62,6 @@ error_chain! {
UtilError, ErrorKind, ResultExt, Result;
}
links {
Db(kvdb::Error, kvdb::ErrorKind);
}
foreign_links {
Io(::std::io::Error);
FromHex(FromHexError);

View File

@ -18,8 +18,9 @@ extern crate parking_lot;
extern crate kvdb;
use std::collections::{BTreeMap, HashMap};
use std::io;
use parking_lot::RwLock;
use kvdb::{DBValue, DBTransaction, KeyValueDB, DBOp, Result};
use kvdb::{DBValue, DBTransaction, KeyValueDB, DBOp};
/// A key-value database fulfilling the `KeyValueDB` trait, living in memory.
/// This is generally intended for tests and is not particularly optimized.
@ -44,10 +45,10 @@ pub fn create(num_cols: u32) -> InMemory {
}
impl KeyValueDB for InMemory {
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>> {
fn get(&self, col: Option<u32>, key: &[u8]) -> io::Result<Option<DBValue>> {
let columns = self.columns.read();
match columns.get(&col) {
None => Err(format!("No such column family: {:?}", col).into()),
None => Err(io::Error::new(io::ErrorKind::Other, format!("No such column family: {:?}", col))),
Some(map) => Ok(map.get(key).cloned()),
}
}
@ -82,7 +83,7 @@ impl KeyValueDB for InMemory {
}
}
fn flush(&self) -> Result<()> {
fn flush(&self) -> io::Result<()> {
Ok(())
}
@ -111,7 +112,7 @@ impl KeyValueDB for InMemory {
}
}
fn restore(&self, _new_db: &str) -> Result<()> {
Err("Attempted to restore in-memory database".into())
fn restore(&self, _new_db: &str) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "Attempted to restore in-memory database"))
}
}

View File

@ -28,11 +28,10 @@ extern crate rocksdb;
extern crate ethereum_types;
extern crate kvdb;
use std::cmp;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::{cmp, fs, io, mem, result, error};
use std::path::Path;
use std::{fs, mem, result};
use parking_lot::{Mutex, MutexGuard, RwLock};
use rocksdb::{
@ -43,7 +42,7 @@ use interleaved_ordered::{interleave_ordered, InterleaveOrdered};
use elastic_array::ElasticArray32;
use fs_swap::{swap, swap_nonatomic};
use kvdb::{KeyValueDB, DBTransaction, DBValue, DBOp, Result};
use kvdb::{KeyValueDB, DBTransaction, DBValue, DBOp};
#[cfg(target_os = "linux")]
use regex::Regex;
@ -54,6 +53,10 @@ use std::fs::File;
#[cfg(target_os = "linux")]
use std::path::PathBuf;
fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<error::Error + Send + Sync>> {
io::Error::new(io::ErrorKind::Other, e)
}
const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128;
enum KeyState {
@ -221,22 +224,22 @@ struct DBAndColumns {
}
// get column family configuration from database config.
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> Result<Options> {
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result<Options> {
let mut opts = Options::new();
opts.set_parsed_options("level_compaction_dynamic_level_bytes=true")?;
opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;
opts.set_block_based_table_factory(block_opts);
opts.set_parsed_options(
&format!("block_based_table_factory={{{};{}}}",
"cache_index_and_filter_blocks=true",
"pin_l0_filter_and_index_blocks_in_cache=true"))?;
"pin_l0_filter_and_index_blocks_in_cache=true")).map_err(other_io_err)?;
opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
opts.set_target_file_size_base(config.compaction.initial_file_size);
opts.set_parsed_options("compression_per_level=")?;
opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?;
Ok(opts)
}
@ -259,7 +262,7 @@ pub struct Database {
}
#[inline]
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> result::Result<T, String> {
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> io::Result<T> {
if let Err(ref s) = res {
if s.starts_with("Corruption:") {
warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
@ -267,7 +270,7 @@ fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, Strin
}
}
res
res.map_err(other_io_err)
}
fn is_corrupted(s: &str) -> bool {
@ -278,22 +281,22 @@ impl Database {
const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
/// Open database with default settings.
pub fn open_default(path: &str) -> Result<Database> {
pub fn open_default(path: &str) -> io::Result<Database> {
Database::open(&DatabaseConfig::default(), path)
}
/// Open database file. Creates if it does not exist.
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database> {
pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
let mut opts = Options::new();
if let Some(rate_limit) = config.compaction.write_rate_limit {
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?;
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?;
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
opts.set_parsed_options("keep_log_file_num=1")?;
opts.set_parsed_options("bytes_per_sync=1048576")?;
opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?;
opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?;
opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2);
opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2));
@ -310,7 +313,7 @@ impl Database {
let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
if db_corrupted.exists() {
warn!("DB has been previously marked as corrupted, attempting repair");
DB::repair(&opts, path)?;
DB::repair(&opts, path).map_err(other_io_err)?;
fs::remove_file(db_corrupted)?;
}
@ -344,7 +347,11 @@ impl Database {
// retry and create CFs
match DB::open_cf(&opts, path, &[], &[]) {
Ok(mut db) => {
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<::std::result::Result<_, _>>()?;
cfs = cfnames.iter()
.enumerate()
.map(|(i, n)| db.create_cf(n, &cf_options[i]))
.collect::<::std::result::Result<_, _>>()
.map_err(other_io_err)?;
Ok(db)
},
err => err,
@ -359,19 +366,21 @@ impl Database {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path)?;
DB::repair(&opts, path).map_err(other_io_err)?;
match cfnames.is_empty() {
true => DB::open(&opts, path)?,
true => DB::open(&opts, path).map_err(other_io_err)?,
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options)?;
let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
db
},
}
},
Err(s) => { return Err(s.into()); }
Err(s) => {
return Err(other_io_err(s))
}
};
let num_cols = cfs.len();
Ok(Database {
@ -415,27 +424,27 @@ impl Database {
}
/// Commit buffered changes to database. Must be called under `flush_lock`
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> Result<()> {
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> io::Result<()> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let batch = WriteBatch::new();
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in self.flushing.read().iter().enumerate() {
for (ref key, ref state) in column.iter() {
match **state {
for (key, state) in column.iter() {
match *state {
KeyState::Delete => {
if c > 0 {
batch.delete_cf(cfs[c - 1], &key)?;
batch.delete_cf(cfs[c - 1], key).map_err(other_io_err)?;
} else {
batch.delete(&key)?;
batch.delete(key).map_err(other_io_err)?;
}
},
KeyState::Insert(ref value) => {
if c > 0 {
batch.put_cf(cfs[c - 1], &key, value)?;
batch.put_cf(cfs[c - 1], key, value).map_err(other_io_err)?;
} else {
batch.put(&key, &value)?;
batch.put(key, value).map_err(other_io_err)?;
}
},
}
@ -453,18 +462,18 @@ impl Database {
}
Ok(())
},
None => Err("Database is closed".into())
None => Err(other_io_err("Database is closed"))
}
}
/// Commit buffered changes to database.
pub fn flush(&self) -> Result<()> {
pub fn flush(&self) -> io::Result<()> {
let mut lock = self.flushing_lock.lock();
// If RocksDB batch allocation fails the thread gets terminated and the lock is released.
// The value inside the lock is used to detect that.
if *lock {
// This can only happen if another flushing thread is terminated unexpectedly.
return Err("Database write failure. Running low on memory perhaps?".into());
return Err(other_io_err("Database write failure. Running low on memory perhaps?"))
}
*lock = true;
let result = self.write_flushing_with_lock(&mut lock);
@ -473,7 +482,7 @@ impl Database {
}
/// Commit transaction to database.
pub fn write(&self, tr: DBTransaction) -> Result<()> {
pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let batch = WriteBatch::new();
@ -483,25 +492,25 @@ impl Database {
self.overlay.write()[Self::to_overlay_column(op.col())].remove(op.key());
match op {
DBOp::Insert { col, key, value } => {
col.map_or_else(|| batch.put(&key, &value), |c| batch.put_cf(cfs[c as usize], &key, &value))?
},
DBOp::Delete { col, key } => {
col.map_or_else(|| batch.delete(&key), |c| batch.delete_cf(cfs[c as usize], &key))?
DBOp::Insert { col, key, value } => match col {
None => batch.put(&key, &value).map_err(other_io_err)?,
Some(c) => batch.put_cf(cfs[c as usize], &key, &value).map_err(other_io_err)?,
},
DBOp::Delete { col, key } => match col {
None => batch.delete(&key).map_err(other_io_err)?,
Some(c) => batch.delete_cf(cfs[c as usize], &key).map_err(other_io_err)?,
}
}
}
check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts)).map_err(Into::into)
check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts))
},
None => Err("Database is closed".into())
None => Err(other_io_err("Database is closed")),
}
}
/// Get value by key.
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>> {
pub fn get(&self, col: Option<u32>, key: &[u8]) -> io::Result<Option<DBValue>> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
@ -517,7 +526,7 @@ impl Database {
col.map_or_else(
|| db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))),
|c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))))
.map_err(Into::into)
.map_err(other_io_err)
},
}
},
@ -591,7 +600,7 @@ impl Database {
}
/// Restore the database from a copy at given path.
pub fn restore(&self, new_db: &str) -> Result<()> {
pub fn restore(&self, new_db: &str) -> io::Result<()> {
self.close();
// swap is guaranteed to be atomic
@ -632,13 +641,13 @@ impl Database {
}
/// Drop a column family.
pub fn drop_column(&self) -> Result<()> {
pub fn drop_column(&self) -> io::Result<()> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
if let Some(col) = cfs.pop() {
let name = format!("col{}", cfs.len());
drop(col);
db.drop_cf(&name)?;
db.drop_cf(&name).map_err(other_io_err)?;
}
Ok(())
},
@ -647,12 +656,12 @@ impl Database {
}
/// Add a column family.
pub fn add_column(&self) -> Result<()> {
pub fn add_column(&self) -> io::Result<()> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
let col = cfs.len() as u32;
let name = format!("col{}", col);
cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?)?);
cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?);
Ok(())
},
None => Ok(()),
@ -663,7 +672,7 @@ impl Database {
// duplicate declaration of methods here to avoid trait import in certain existing cases
// at time of addition.
impl KeyValueDB for Database {
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>> {
fn get(&self, col: Option<u32>, key: &[u8]) -> io::Result<Option<DBValue>> {
Database::get(self, col, key)
}
@ -675,11 +684,11 @@ impl KeyValueDB for Database {
Database::write_buffered(self, transaction)
}
fn write(&self, transaction: DBTransaction) -> Result<()> {
fn write(&self, transaction: DBTransaction) -> io::Result<()> {
Database::write(self, transaction)
}
fn flush(&self) -> Result<()> {
fn flush(&self) -> io::Result<()> {
Database::flush(self)
}
@ -695,7 +704,7 @@ impl KeyValueDB for Database {
Box::new(unboxed.into_iter().flat_map(|inner| inner))
}
fn restore(&self, new_db: &str) -> Result<()> {
fn restore(&self, new_db: &str) -> io::Result<()> {
Database::restore(self, new_db)
}
}

View File

@ -5,5 +5,4 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
elastic-array = "0.10"
error-chain = { version = "0.12", default-features = false }
ethcore-bytes = { path = "../bytes" }

View File

@ -16,8 +16,6 @@
//! Key-Value store abstraction with `RocksDB` backend.
#[macro_use]
extern crate error_chain;
extern crate elastic_array;
extern crate ethcore_bytes as bytes;
@ -33,16 +31,6 @@ pub const PREFIX_LEN: usize = 12;
/// Database value.
pub type DBValue = ElasticArray128<u8>;
error_chain! {
types {
Error, ErrorKind, ResultExt, Result;
}
foreign_links {
Io(io::Error);
}
}
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
#[derive(Default, Clone, PartialEq)]
pub struct DBTransaction {
@ -151,7 +139,7 @@ pub trait KeyValueDB: Sync + Send {
fn transaction(&self) -> DBTransaction { DBTransaction::new() }
/// Get a value by key.
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>>;
fn get(&self, col: Option<u32>, key: &[u8]) -> io::Result<Option<DBValue>>;
/// Get a value by partial key. Only works for flushed data.
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>>;
@ -160,13 +148,13 @@ pub trait KeyValueDB: Sync + Send {
fn write_buffered(&self, transaction: DBTransaction);
/// Write a transaction of changes to the backing store.
fn write(&self, transaction: DBTransaction) -> Result<()> {
fn write(&self, transaction: DBTransaction) -> io::Result<()> {
self.write_buffered(transaction);
self.flush()
}
/// Flush all buffered data.
fn flush(&self) -> Result<()>;
fn flush(&self) -> io::Result<()>;
/// Iterate over flushed data for a given column.
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
@ -176,12 +164,12 @@ pub trait KeyValueDB: Sync + Send {
-> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
/// Attempt to replace this database with a new one located at the given path.
fn restore(&self, new_db: &str) -> Result<()>;
fn restore(&self, new_db: &str) -> io::Result<()>;
}
/// Generic key-value database handler. This trait contains one function `open`. When called, it opens database with a
/// predefined config.
pub trait KeyValueDBHandler: Send + Sync {
/// Open the predefined key-value database.
fn open(&self, path: &Path) -> Result<Arc<KeyValueDB>>;
fn open(&self, path: &Path) -> io::Result<Arc<KeyValueDB>>;
}

View File

@ -8,7 +8,6 @@ log = "0.3"
macros = { path = "../macros" }
kvdb = { path = "../kvdb" }
kvdb-rocksdb = { path = "../kvdb-rocksdb" }
error-chain = { version = "0.12", default-features = false }
[dev-dependencies]
tempdir = "0.3"

View File

@ -20,8 +20,6 @@
extern crate log;
#[macro_use]
extern crate macros;
#[macro_use]
extern crate error_chain;
extern crate kvdb;
extern crate kvdb_rocksdb;
@ -29,31 +27,13 @@ extern crate kvdb_rocksdb;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{fs, io};
use std::{fs, io, error};
use kvdb::DBTransaction;
use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
error_chain! {
links {
Db(kvdb::Error, kvdb::ErrorKind);
}
foreign_links {
Io(io::Error);
}
errors {
CannotAddMigration {
description("Cannot add migration"),
display("Cannot add migration"),
}
MigrationImpossible {
description("Migration impossible"),
display("Migration impossible"),
}
}
fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<error::Error + Send + Sync>> {
io::Error::new(io::ErrorKind::Other, e)
}
/// Migration config.
@ -92,7 +72,7 @@ impl Batch {
}
/// Insert a value into the batch, committing if necessary.
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> Result<()> {
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> io::Result<()> {
self.inner.insert(key, value);
if self.inner.len() == self.batch_size {
self.commit(dest)?;
@ -101,7 +81,7 @@ impl Batch {
}
/// Commit all the items in the batch to the given database.
pub fn commit(&mut self, dest: &mut Database) -> Result<()> {
pub fn commit(&mut self, dest: &mut Database) -> io::Result<()> {
if self.inner.is_empty() { return Ok(()) }
let mut transaction = DBTransaction::new();
@ -111,7 +91,7 @@ impl Batch {
}
self.inner.clear();
dest.write(transaction).map_err(Into::into)
dest.write(transaction)
}
}
@ -127,7 +107,7 @@ pub trait Migration: 'static {
/// Version of the database after the migration.
fn version(&self) -> u32;
/// Migrate a source to a destination.
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: Option<u32>) -> Result<()>;
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: Option<u32>) -> io::Result<()>;
}
/// A simple migration over key-value pairs of a single column.
@ -150,7 +130,7 @@ impl<T: SimpleMigration> Migration for T {
fn alters_existing(&self) -> bool { true }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<()> {
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> io::Result<()> {
let migration_needed = col == SimpleMigration::migrated_column_index(self);
let mut batch = Batch::new(config, col);
@ -188,7 +168,7 @@ impl Migration for ChangeColumns {
fn columns(&self) -> Option<u32> { self.post_columns }
fn version(&self) -> u32 { self.version }
fn alters_existing(&self) -> bool { false }
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: Option<u32>) -> Result<()> {
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: Option<u32>) -> io::Result<()> {
Ok(())
}
}
@ -242,7 +222,7 @@ impl Manager {
}
/// Adds new migration rules.
pub fn add_migration<T>(&mut self, migration: T) -> Result<()> where T: Migration {
pub fn add_migration<T>(&mut self, migration: T) -> io::Result<()> where T: Migration {
let is_new = match self.migrations.last() {
Some(last) => migration.version() > last.version(),
None => true,
@ -250,18 +230,18 @@ impl Manager {
match is_new {
true => Ok(self.migrations.push(Box::new(migration))),
false => Err(ErrorKind::CannotAddMigration.into()),
false => Err(other_io_err("Cannot add migration.")),
}
}
/// Performs migration in order, starting with a source path, migrating between two temporary databases,
/// and producing a path where the final migration lives.
pub fn execute(&mut self, old_path: &Path, version: u32) -> Result<PathBuf> {
pub fn execute(&mut self, old_path: &Path, version: u32) -> io::Result<PathBuf> {
let config = self.config.clone();
let migrations = self.migrations_from(version);
trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
if migrations.is_empty() {
return Err(ErrorKind::MigrationImpossible.into())
return Err(other_io_err("Migration impossible"));
};
let columns = migrations.get(0).and_then(|m| m.pre_columns());
@ -280,7 +260,7 @@ impl Manager {
let mut temp_path = old_path.to_path_buf();
// start with the old db.
let old_path_str = old_path.to_str().ok_or(ErrorKind::MigrationImpossible)?;
let old_path_str = old_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
let mut cur_db = Arc::new(Database::open(&db_config, old_path_str)?);
for migration in migrations {
@ -294,7 +274,7 @@ impl Manager {
temp_path = temp_idx.path(&db_root);
// open the target temporary database.
let temp_path_str = temp_path.to_str().ok_or(ErrorKind::MigrationImpossible)?;
let temp_path_str = temp_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
let mut new_db = Database::open(&db_config, temp_path_str)?;
match current_columns {
@ -318,11 +298,11 @@ impl Manager {
// we can do this in-place.
let goal_columns = migration.columns().unwrap_or(0);
while cur_db.num_columns() < goal_columns {
cur_db.add_column().map_err(kvdb::Error::from)?;
cur_db.add_column().map_err(other_io_err)?;
}
while cur_db.num_columns() > goal_columns {
cur_db.drop_column().map_err(kvdb::Error::from)?;
cur_db.drop_column().map_err(other_io_err)?;
}
}
}

View File

@ -25,11 +25,12 @@ extern crate kvdb_rocksdb;
extern crate migration_rocksdb as migration;
use std::collections::BTreeMap;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempdir::TempDir;
use kvdb_rocksdb::Database;
use migration::{Batch, Config, Error, SimpleMigration, Migration, Manager, ChangeColumns};
use migration::{Batch, Config, SimpleMigration, Migration, Manager, ChangeColumns};
#[inline]
fn db_path(path: &Path) -> PathBuf {
@ -112,7 +113,7 @@ impl Migration for AddsColumn {
fn version(&self) -> u32 { 1 }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> io::Result<()> {
let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col).into_iter().flat_map(|inner| inner) {