Merge pull request #6792 from paritytech/kvdb_error
consistent KeyValueDB errors
This commit is contained in:
commit
bb1be15dc4
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -837,7 +837,6 @@ dependencies = [
|
||||
"clippy 0.0.103 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"elastic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"eth-secp256k1 0.5.6 (git+https://github.com/paritytech/rust-secp256k1)",
|
||||
"ethcore-bigint 0.1.3",
|
||||
"ethcore-bytes 0.1.0",
|
||||
@ -1628,6 +1627,7 @@ dependencies = [
|
||||
name = "migration"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ethcore-devtools 1.9.0",
|
||||
"kvdb 0.1.0",
|
||||
"kvdb-rocksdb 0.1.0",
|
||||
@ -2051,6 +2051,7 @@ dependencies = [
|
||||
"ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"isatty 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jsonrpc-core 8.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.8)",
|
||||
"kvdb 0.1.0",
|
||||
"kvdb-rocksdb 0.1.0",
|
||||
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"migration 0.1.0",
|
||||
@ -3459,6 +3460,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ethcore-bigint 0.1.3",
|
||||
"kvdb 0.1.0",
|
||||
"rlp 0.2.0",
|
||||
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
@ -63,6 +63,7 @@ path = { path = "util/path" }
|
||||
panic_hook = { path = "panic_hook" }
|
||||
hash = { path = "util/hash" }
|
||||
migration = { path = "util/migration" }
|
||||
kvdb = { path = "util/kvdb" }
|
||||
kvdb-rocksdb = { path = "util/kvdb-rocksdb" }
|
||||
|
||||
parity-dapps = { path = "dapps", optional = true }
|
||||
|
@ -45,7 +45,7 @@ use rlp::{Encodable, Decodable, DecoderError, RlpStream, Rlp, UntrustedRlp};
|
||||
use heapsize::HeapSizeOf;
|
||||
use bigint::prelude::U256;
|
||||
use bigint::hash::{H256, H256FastMap, H264};
|
||||
use kvdb::{DBTransaction, KeyValueDB};
|
||||
use kvdb::{self, DBTransaction, KeyValueDB};
|
||||
|
||||
use cache::Cache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
@ -198,7 +198,7 @@ impl HeaderChain {
|
||||
col: Option<u32>,
|
||||
spec: &Spec,
|
||||
cache: Arc<Mutex<Cache>>,
|
||||
) -> Result<Self, String> {
|
||||
) -> Result<Self, kvdb::Error> {
|
||||
let mut live_epoch_proofs = ::std::collections::HashMap::default();
|
||||
|
||||
let genesis = ::rlp::encode(&spec.genesis_header()).into_vec();
|
||||
@ -240,7 +240,7 @@ impl HeaderChain {
|
||||
let best_block = {
|
||||
let era = match candidates.get(&best_number) {
|
||||
Some(era) => era,
|
||||
None => return Err(format!("Database corrupt: highest block referenced but no data.")),
|
||||
None => return Err("Database corrupt: highest block referenced but no data.".into()),
|
||||
};
|
||||
|
||||
let best = &era.candidates[0];
|
||||
|
@ -36,7 +36,7 @@ use bigint::prelude::U256;
|
||||
use bigint::hash::H256;
|
||||
use futures::{IntoFuture, Future};
|
||||
|
||||
use kvdb::KeyValueDB;
|
||||
use kvdb::{self, KeyValueDB};
|
||||
use kvdb_rocksdb::CompactionProfile;
|
||||
|
||||
use self::fetch::ChainDataFetcher;
|
||||
@ -187,7 +187,7 @@ impl<T: ChainDataFetcher> Client<T> {
|
||||
fetcher: T,
|
||||
io_channel: IoChannel<ClientIoMessage>,
|
||||
cache: Arc<Mutex<Cache>>
|
||||
) -> Result<Self, String> {
|
||||
) -> Result<Self, kvdb::Error> {
|
||||
Ok(Client {
|
||||
queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, config.check_seal),
|
||||
engine: spec.engine.clone(),
|
||||
|
@ -25,6 +25,7 @@ use ethcore::db;
|
||||
use ethcore::service::ClientIoMessage;
|
||||
use ethcore::spec::Spec;
|
||||
use io::{IoContext, IoError, IoHandler, IoService};
|
||||
use kvdb;
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
|
||||
use cache::Cache;
|
||||
@ -36,7 +37,7 @@ use super::{ChainDataFetcher, Client, Config as ClientConfig};
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Database error.
|
||||
Database(String),
|
||||
Database(kvdb::Error),
|
||||
/// I/O service error.
|
||||
Io(IoError),
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ use bytes::Bytes;
|
||||
use util::{Address, journaldb, DBValue};
|
||||
use util_error::UtilError;
|
||||
use trie::{TrieSpec, TrieFactory, Trie};
|
||||
use kvdb::*;
|
||||
use kvdb::{KeyValueDB, DBTransaction};
|
||||
|
||||
// other
|
||||
use bigint::prelude::U256;
|
||||
|
@ -14,9 +14,9 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use util_error::UtilError;
|
||||
use std::fmt::{Display, Formatter, Error as FmtError};
|
||||
|
||||
use util_error::UtilError;
|
||||
use kvdb;
|
||||
use trie::TrieError;
|
||||
|
||||
/// Client configuration errors.
|
||||
@ -25,7 +25,7 @@ pub enum Error {
|
||||
/// TrieDB-related error.
|
||||
Trie(TrieError),
|
||||
/// Database error
|
||||
Database(String),
|
||||
Database(kvdb::Error),
|
||||
/// Util error
|
||||
Util(UtilError),
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ pub enum EvmTestError {
|
||||
/// Initialization error.
|
||||
ClientError(::error::Error),
|
||||
/// Low-level database error.
|
||||
Database(String),
|
||||
Database(kvdb::Error),
|
||||
/// Post-condition failure,
|
||||
PostCondition(String),
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ use bigint::hash::H256;
|
||||
use util::Address;
|
||||
use bytes::Bytes;
|
||||
use kvdb_rocksdb::Database;
|
||||
use migration::{Batch, Config, Error, Migration, SimpleMigration, Progress};
|
||||
use migration::{Batch, Config, Error, ErrorKind, Migration, SimpleMigration, Progress};
|
||||
use hash::keccak;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -109,7 +109,7 @@ impl OverlayRecentV7 {
|
||||
// walk all journal entries in the database backwards.
|
||||
// find migrations for any possible inserted keys.
|
||||
fn walk_journal(&mut self, source: Arc<Database>) -> Result<(), Error> {
|
||||
if let Some(val) = source.get(None, V7_LATEST_ERA_KEY).map_err(Error::Custom)? {
|
||||
if let Some(val) = source.get(None, V7_LATEST_ERA_KEY)? {
|
||||
let mut era = decode::<u64>(&val);
|
||||
loop {
|
||||
let mut index: usize = 0;
|
||||
@ -120,7 +120,7 @@ impl OverlayRecentV7 {
|
||||
r.out()
|
||||
};
|
||||
|
||||
if let Some(journal_raw) = source.get(None, &entry_key).map_err(Error::Custom)? {
|
||||
if let Some(journal_raw) = source.get(None, &entry_key)? {
|
||||
let rlp = Rlp::new(&journal_raw);
|
||||
|
||||
// migrate all inserted keys.
|
||||
@ -153,7 +153,7 @@ impl OverlayRecentV7 {
|
||||
// replace all possible inserted/deleted keys with their migrated counterparts
|
||||
// and commit the altered entries.
|
||||
fn migrate_journal(&self, source: Arc<Database>, mut batch: Batch, dest: &mut Database) -> Result<(), Error> {
|
||||
if let Some(val) = source.get(None, V7_LATEST_ERA_KEY).map_err(Error::Custom)? {
|
||||
if let Some(val) = source.get(None, V7_LATEST_ERA_KEY)? {
|
||||
batch.insert(V7_LATEST_ERA_KEY.into(), val.clone().into_vec(), dest)?;
|
||||
|
||||
let mut era = decode::<u64>(&val);
|
||||
@ -166,7 +166,7 @@ impl OverlayRecentV7 {
|
||||
r.out()
|
||||
};
|
||||
|
||||
if let Some(journal_raw) = source.get(None, &entry_key).map_err(Error::Custom)? {
|
||||
if let Some(journal_raw) = source.get(None, &entry_key)? {
|
||||
let rlp = Rlp::new(&journal_raw);
|
||||
let id: H256 = rlp.val_at(0);
|
||||
let mut inserted_keys: Vec<(H256, Bytes)> = Vec::new();
|
||||
@ -233,9 +233,9 @@ impl Migration for OverlayRecentV7 {
|
||||
let mut batch = Batch::new(config, col);
|
||||
|
||||
// check version metadata.
|
||||
match source.get(None, V7_VERSION_KEY).map_err(Error::Custom)? {
|
||||
match source.get(None, V7_VERSION_KEY)? {
|
||||
Some(ref version) if decode::<u32>(&*version) == DB_VERSION => {}
|
||||
_ => return Err(Error::MigrationImpossible), // missing or wrong version
|
||||
_ => return Err(ErrorKind::MigrationImpossible.into()), // missing or wrong version
|
||||
}
|
||||
|
||||
let mut count = 0;
|
||||
|
@ -22,11 +22,11 @@ use state_db::{ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET, StateDB};
|
||||
use trie::TrieDB;
|
||||
use views::HeaderView;
|
||||
use bloom_journal::Bloom;
|
||||
use migration::{Error, Migration, Progress, Batch, Config};
|
||||
use migration::{Error, Migration, Progress, Batch, Config, ErrorKind};
|
||||
use util::journaldb;
|
||||
use bigint::hash::H256;
|
||||
use trie::Trie;
|
||||
use kvdb::DBTransaction;
|
||||
use kvdb::{DBTransaction, ResultExt};
|
||||
use kvdb_rocksdb::Database;
|
||||
|
||||
/// Account bloom upgrade routine. If bloom already present, does nothing.
|
||||
@ -60,9 +60,9 @@ pub fn generate_bloom(source: Arc<Database>, dest: &mut Database) -> Result<(),
|
||||
source.clone(),
|
||||
journaldb::Algorithm::OverlayRecent,
|
||||
COL_STATE);
|
||||
let account_trie = TrieDB::new(state_db.as_hashdb(), &state_root).map_err(|e| Error::Custom(format!("Cannot open trie: {:?}", e)))?;
|
||||
for item in account_trie.iter().map_err(|_| Error::MigrationImpossible)? {
|
||||
let (ref account_key, _) = item.map_err(|_| Error::MigrationImpossible)?;
|
||||
let account_trie = TrieDB::new(state_db.as_hashdb(), &state_root).chain_err(|| "Cannot open trie")?;
|
||||
for item in account_trie.iter().map_err(|_| ErrorKind::MigrationImpossible)? {
|
||||
let (ref account_key, _) = item.map_err(|_| ErrorKind::MigrationImpossible)?;
|
||||
let account_key_hash = H256::from_slice(account_key);
|
||||
bloom.set(&*account_key_hash);
|
||||
}
|
||||
@ -73,7 +73,7 @@ pub fn generate_bloom(source: Arc<Database>, dest: &mut Database) -> Result<(),
|
||||
trace!(target: "migration", "Generated {} bloom updates", bloom_journal.entries.len());
|
||||
|
||||
let mut batch = DBTransaction::new();
|
||||
StateDB::commit_bloom(&mut batch, bloom_journal).map_err(|_| Error::Custom("Failed to commit bloom".to_owned()))?;
|
||||
StateDB::commit_bloom(&mut batch, bloom_journal).chain_err(|| "Failed to commit bloom")?;
|
||||
dest.write(batch)?;
|
||||
|
||||
trace!(target: "migration", "Finished bloom update");
|
||||
|
@ -56,7 +56,7 @@ const UPDATE_TIMEOUT_MS: u64 = 15 * 60 * 1000; // once every 15 minutes.
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Database errors: these manifest as `String`s.
|
||||
Database(String),
|
||||
Database(kvdb::Error),
|
||||
/// JSON errors.
|
||||
Json(::serde_json::Error),
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ extern crate ethcore_bigint as bigint;
|
||||
extern crate ethcore_bytes as bytes;
|
||||
extern crate ethcore_network as network;
|
||||
extern crate migration as migr;
|
||||
extern crate kvdb;
|
||||
extern crate kvdb_rocksdb;
|
||||
extern crate ethkey;
|
||||
extern crate ethsync;
|
||||
|
@ -21,7 +21,8 @@ use std::path::{Path, PathBuf};
|
||||
use std::fmt::{Display, Formatter, Error as FmtError};
|
||||
use std::sync::Arc;
|
||||
use util::journaldb::Algorithm;
|
||||
use migr::{Manager as MigrationManager, Config as MigrationConfig, Error as MigrationError, Migration};
|
||||
use migr::{self, Manager as MigrationManager, Config as MigrationConfig, Migration};
|
||||
use kvdb;
|
||||
use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
|
||||
use ethcore::migrations;
|
||||
use ethcore::db;
|
||||
@ -52,7 +53,7 @@ pub enum Error {
|
||||
/// Migration unexpectadly failed.
|
||||
MigrationFailed,
|
||||
/// Internal migration error.
|
||||
Internal(MigrationError),
|
||||
Internal(migr::Error),
|
||||
/// Migration was completed succesfully,
|
||||
/// but there was a problem with io.
|
||||
Io(IoError),
|
||||
@ -80,11 +81,11 @@ impl From<IoError> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MigrationError> for Error {
|
||||
fn from(err: MigrationError) -> Self {
|
||||
match err {
|
||||
MigrationError::Io(e) => Error::Io(e),
|
||||
_ => Error::Internal(err),
|
||||
impl From<migr::Error> for Error {
|
||||
fn from(err: migr::Error) -> Self {
|
||||
match err.into() {
|
||||
migr::ErrorKind::Io(e) => Error::Io(e),
|
||||
err => Error::Internal(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -158,7 +159,7 @@ fn consolidate_database(
|
||||
column: Option<u32>,
|
||||
extract: Extract,
|
||||
compaction_profile: &CompactionProfile) -> Result<(), Error> {
|
||||
fn db_error(e: String) -> Error {
|
||||
fn db_error(e: kvdb::Error) -> Error {
|
||||
warn!("Cannot open Database for consolidation: {:?}", e);
|
||||
Error::MigrationFailed
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ impl PersistentKeyStorage {
|
||||
db_path.push("db");
|
||||
let db_path = db_path.to_str().ok_or(Error::Database("Invalid secretstore path".to_owned()))?;
|
||||
|
||||
let db = Database::open_default(&db_path).map_err(Error::Database)?;
|
||||
let db = Database::open_default(&db_path)?;
|
||||
let db = upgrade_db(db)?;
|
||||
|
||||
Ok(PersistentKeyStorage {
|
||||
@ -142,7 +142,7 @@ impl PersistentKeyStorage {
|
||||
}
|
||||
|
||||
fn upgrade_db(db: Database) -> Result<Database, Error> {
|
||||
let version = db.get(None, DB_META_KEY_VERSION).map_err(Error::Database)?;
|
||||
let version = db.get(None, DB_META_KEY_VERSION)?;
|
||||
let version = version.and_then(|v| v.get(0).cloned()).unwrap_or(0);
|
||||
match version {
|
||||
0 => {
|
||||
@ -164,7 +164,7 @@ fn upgrade_db(db: Database) -> Result<Database, Error> {
|
||||
let db_value = serde_json::to_vec(&v2_key).map_err(|e| Error::Database(e.to_string()))?;
|
||||
batch.put(None, &*db_key, &*db_value);
|
||||
}
|
||||
db.write(batch).map_err(Error::Database)?;
|
||||
db.write(batch)?;
|
||||
Ok(db)
|
||||
},
|
||||
1 => {
|
||||
@ -184,7 +184,7 @@ fn upgrade_db(db: Database) -> Result<Database, Error> {
|
||||
let db_value = serde_json::to_vec(&v2_key).map_err(|e| Error::Database(e.to_string()))?;
|
||||
batch.put(None, &*db_key, &*db_value);
|
||||
}
|
||||
db.write(batch).map_err(Error::Database)?;
|
||||
db.write(batch)?;
|
||||
Ok(db)
|
||||
}
|
||||
2 => Ok(db),
|
||||
@ -198,7 +198,7 @@ impl KeyStorage for PersistentKeyStorage {
|
||||
let key = serde_json::to_vec(&key).map_err(|e| Error::Database(e.to_string()))?;
|
||||
let mut batch = self.db.transaction();
|
||||
batch.put(None, &document, &key);
|
||||
self.db.write(batch).map_err(Error::Database)
|
||||
self.db.write(batch).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn update(&self, document: ServerKeyId, key: DocumentKeyShare) -> Result<(), Error> {
|
||||
@ -206,8 +206,7 @@ impl KeyStorage for PersistentKeyStorage {
|
||||
}
|
||||
|
||||
fn get(&self, document: &ServerKeyId) -> Result<DocumentKeyShare, Error> {
|
||||
self.db.get(None, document)
|
||||
.map_err(Error::Database)?
|
||||
self.db.get(None, document)?
|
||||
.ok_or(Error::DocumentNotFound)
|
||||
.map(|key| key.into_vec())
|
||||
.and_then(|key| serde_json::from_slice::<CurrentSerializableDocumentKeyShare>(&key).map_err(|e| Error::Database(e.to_string())))
|
||||
@ -217,7 +216,7 @@ impl KeyStorage for PersistentKeyStorage {
|
||||
fn remove(&self, document: &ServerKeyId) -> Result<(), Error> {
|
||||
let mut batch = self.db.transaction();
|
||||
batch.delete(None, &document);
|
||||
self.db.write(batch).map_err(Error::Database)
|
||||
self.db.write(batch).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn contains(&self, document: &ServerKeyId) -> bool {
|
||||
|
@ -18,10 +18,7 @@ use std::fmt;
|
||||
use std::collections::BTreeMap;
|
||||
use serde_json;
|
||||
|
||||
use ethkey;
|
||||
use bytes;
|
||||
use bigint;
|
||||
use key_server_cluster;
|
||||
use {ethkey, kvdb, bytes, bigint, key_server_cluster};
|
||||
|
||||
/// Node id.
|
||||
pub type NodeId = ethkey::Public;
|
||||
@ -134,6 +131,12 @@ impl From<ethkey::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<kvdb::Error> for Error {
|
||||
fn from(err: kvdb::Error) -> Self {
|
||||
Error::Database(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<key_server_cluster::Error> for Error {
|
||||
fn from(err: key_server_cluster::Error) -> Self {
|
||||
match err {
|
||||
|
@ -26,7 +26,6 @@ parking_lot = "0.4"
|
||||
tiny-keccak= "1.0"
|
||||
ethcore-logger = { path = "../logger" }
|
||||
triehash = { path = "triehash" }
|
||||
error-chain = "0.11.0-rc.2"
|
||||
hashdb = { path = "hashdb" }
|
||||
patricia_trie = { path = "patricia_trie" }
|
||||
ethcore-bytes = { path = "bytes" }
|
||||
|
@ -5,6 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
rlp = { path = "../rlp" }
|
||||
kvdb = { path = "../kvdb" }
|
||||
ethcore-bigint = { path = "../bigint" }
|
||||
error-chain = "0.11.0-rc.2"
|
||||
error-chain = "0.11.0"
|
||||
rustc-hex = "1.0"
|
||||
|
@ -25,6 +25,7 @@ extern crate error_chain;
|
||||
extern crate ethcore_bigint as bigint;
|
||||
extern crate rlp;
|
||||
extern crate rustc_hex;
|
||||
extern crate kvdb;
|
||||
|
||||
use std::fmt;
|
||||
use rustc_hex::FromHexError;
|
||||
@ -62,6 +63,10 @@ error_chain! {
|
||||
UtilError, ErrorKind, ResultExt, Result;
|
||||
}
|
||||
|
||||
links {
|
||||
Db(kvdb::Error, kvdb::ErrorKind);
|
||||
}
|
||||
|
||||
foreign_links {
|
||||
Io(::std::io::Error);
|
||||
FromHex(FromHexError);
|
||||
|
@ -20,7 +20,7 @@ extern crate rlp;
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use parking_lot::RwLock;
|
||||
use kvdb::{DBValue, Error, DBTransaction, KeyValueDB, DBOp};
|
||||
use kvdb::{DBValue, DBTransaction, KeyValueDB, DBOp, Result};
|
||||
use rlp::{RlpType, UntrustedRlp, Compressible};
|
||||
|
||||
/// A key-value database fulfilling the `KeyValueDB` trait, living in memory.
|
||||
@ -46,10 +46,10 @@ pub fn create(num_cols: u32) -> InMemory {
|
||||
}
|
||||
|
||||
impl KeyValueDB for InMemory {
|
||||
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
|
||||
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>> {
|
||||
let columns = self.columns.read();
|
||||
match columns.get(&col) {
|
||||
None => Err(format!("No such column family: {:?}", col)),
|
||||
None => Err(format!("No such column family: {:?}", col).into()),
|
||||
Some(map) => Ok(map.get(key).cloned()),
|
||||
}
|
||||
}
|
||||
@ -92,7 +92,10 @@ impl KeyValueDB for InMemory {
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<(), String> { Ok(()) }
|
||||
fn flush(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
|
||||
match self.columns.read().get(&col) {
|
||||
Some(map) => Box::new( // TODO: worth optimizing at all?
|
||||
@ -118,7 +121,7 @@ impl KeyValueDB for InMemory {
|
||||
}
|
||||
}
|
||||
|
||||
fn restore(&self, _new_db: &str) -> Result<(), Error> {
|
||||
fn restore(&self, _new_db: &str) -> Result<()> {
|
||||
Err("Attempted to restore in-memory database".into())
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ use rocksdb::{
|
||||
|
||||
use elastic_array::ElasticArray32;
|
||||
use rlp::{UntrustedRlp, RlpType, Compressible};
|
||||
use kvdb::{KeyValueDB, DBTransaction, DBValue, Error, DBOp};
|
||||
use kvdb::{KeyValueDB, DBTransaction, DBValue, DBOp, Result};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use regex::Regex;
|
||||
@ -257,12 +257,12 @@ pub struct Database {
|
||||
|
||||
impl Database {
|
||||
/// Open database with default settings.
|
||||
pub fn open_default(path: &str) -> Result<Database, String> {
|
||||
pub fn open_default(path: &str) -> Result<Database> {
|
||||
Database::open(&DatabaseConfig::default(), path)
|
||||
}
|
||||
|
||||
/// Open database file. Creates if it does not exist.
|
||||
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> {
|
||||
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database> {
|
||||
let mut opts = Options::new();
|
||||
if let Some(rate_limit) = config.compaction.write_rate_limit {
|
||||
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit))?;
|
||||
@ -312,7 +312,7 @@ impl Database {
|
||||
// retry and create CFs
|
||||
match DB::open_cf(&opts, path, &[], &[]) {
|
||||
Ok(mut db) => {
|
||||
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<Result<_, _>>()?;
|
||||
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<::std::result::Result<_, _>>()?;
|
||||
Ok(db)
|
||||
},
|
||||
err @ Err(_) => err,
|
||||
@ -335,7 +335,7 @@ impl Database {
|
||||
false => DB::open_cf(&opts, path, &cfnames, &cf_options)?
|
||||
}
|
||||
},
|
||||
Err(s) => { return Err(s); }
|
||||
Err(s) => { return Err(s.into()); }
|
||||
};
|
||||
let num_cols = cfs.len();
|
||||
Ok(Database {
|
||||
@ -383,7 +383,7 @@ impl Database {
|
||||
}
|
||||
|
||||
/// Commit buffered changes to database. Must be called under `flush_lock`
|
||||
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> Result<(), String> {
|
||||
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> Result<()> {
|
||||
match *self.db.read() {
|
||||
Some(DBAndColumns { ref db, ref cfs }) => {
|
||||
let batch = WriteBatch::new();
|
||||
@ -425,18 +425,18 @@ impl Database {
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
None => Err("Database is closed".to_owned())
|
||||
None => Err("Database is closed".into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Commit buffered changes to database.
|
||||
pub fn flush(&self) -> Result<(), String> {
|
||||
pub fn flush(&self) -> Result<()> {
|
||||
let mut lock = self.flushing_lock.lock();
|
||||
// If RocksDB batch allocation fails the thread gets terminated and the lock is released.
|
||||
// The value inside the lock is used to detect that.
|
||||
if *lock {
|
||||
// This can only happen if another flushing thread is terminated unexpectedly.
|
||||
return Err("Database write failure. Running low on memory perhaps?".to_owned());
|
||||
return Err("Database write failure. Running low on memory perhaps?".into());
|
||||
}
|
||||
*lock = true;
|
||||
let result = self.write_flushing_with_lock(&mut lock);
|
||||
@ -445,7 +445,7 @@ impl Database {
|
||||
}
|
||||
|
||||
/// Commit transaction to database.
|
||||
pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
|
||||
pub fn write(&self, tr: DBTransaction) -> Result<()> {
|
||||
match *self.db.read() {
|
||||
Some(DBAndColumns { ref db, ref cfs }) => {
|
||||
let batch = WriteBatch::new();
|
||||
@ -464,14 +464,14 @@ impl Database {
|
||||
},
|
||||
}
|
||||
}
|
||||
db.write_opt(batch, &self.write_opts)
|
||||
db.write_opt(batch, &self.write_opts).map_err(Into::into)
|
||||
},
|
||||
None => Err("Database is closed".to_owned())
|
||||
None => Err("Database is closed".into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get value by key.
|
||||
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
|
||||
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>> {
|
||||
match *self.db.read() {
|
||||
Some(DBAndColumns { ref db, ref cfs }) => {
|
||||
let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
|
||||
@ -487,6 +487,7 @@ impl Database {
|
||||
col.map_or_else(
|
||||
|| db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))),
|
||||
|c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))))
|
||||
.map_err(Into::into)
|
||||
},
|
||||
}
|
||||
},
|
||||
@ -552,7 +553,7 @@ impl Database {
|
||||
}
|
||||
|
||||
/// Restore the database from a copy at given path.
|
||||
pub fn restore(&self, new_db: &str) -> Result<(), Error> {
|
||||
pub fn restore(&self, new_db: &str) -> Result<()> {
|
||||
self.close();
|
||||
|
||||
let mut backup_db = PathBuf::from(&self.path);
|
||||
@ -601,7 +602,7 @@ impl Database {
|
||||
}
|
||||
|
||||
/// Drop a column family.
|
||||
pub fn drop_column(&self) -> Result<(), String> {
|
||||
pub fn drop_column(&self) -> Result<()> {
|
||||
match *self.db.write() {
|
||||
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
|
||||
if let Some(col) = cfs.pop() {
|
||||
@ -616,7 +617,7 @@ impl Database {
|
||||
}
|
||||
|
||||
/// Add a column family.
|
||||
pub fn add_column(&self) -> Result<(), String> {
|
||||
pub fn add_column(&self) -> Result<()> {
|
||||
match *self.db.write() {
|
||||
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
|
||||
let col = cfs.len() as u32;
|
||||
@ -632,7 +633,7 @@ impl Database {
|
||||
// duplicate declaration of methods here to avoid trait import in certain existing cases
|
||||
// at time of addition.
|
||||
impl KeyValueDB for Database {
|
||||
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
|
||||
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>> {
|
||||
Database::get(self, col, key)
|
||||
}
|
||||
|
||||
@ -644,11 +645,11 @@ impl KeyValueDB for Database {
|
||||
Database::write_buffered(self, transaction)
|
||||
}
|
||||
|
||||
fn write(&self, transaction: DBTransaction) -> Result<(), String> {
|
||||
fn write(&self, transaction: DBTransaction) -> Result<()> {
|
||||
Database::write(self, transaction)
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<(), String> {
|
||||
fn flush(&self) -> Result<()> {
|
||||
Database::flush(self)
|
||||
}
|
||||
|
||||
@ -664,7 +665,7 @@ impl KeyValueDB for Database {
|
||||
Box::new(unboxed.into_iter().flat_map(|inner| inner))
|
||||
}
|
||||
|
||||
fn restore(&self, new_db: &str) -> Result<(), Error> {
|
||||
fn restore(&self, new_db: &str) -> Result<()> {
|
||||
Database::restore(self, new_db)
|
||||
}
|
||||
}
|
||||
|
@ -5,5 +5,5 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
elastic-array = "0.9"
|
||||
error-chain = "0.11.0-rc.2"
|
||||
error-chain = "0.11.0"
|
||||
ethcore-bytes = { path = "../bytes" }
|
||||
|
@ -33,7 +33,7 @@ pub type DBValue = ElasticArray128<u8>;
|
||||
|
||||
error_chain! {
|
||||
types {
|
||||
Error, ErrorKind, ResultExt;
|
||||
Error, ErrorKind, ResultExt, Result;
|
||||
}
|
||||
|
||||
foreign_links {
|
||||
@ -148,7 +148,7 @@ pub trait KeyValueDB: Sync + Send {
|
||||
fn transaction(&self) -> DBTransaction { DBTransaction::new() }
|
||||
|
||||
/// Get a value by key.
|
||||
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String>;
|
||||
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>>;
|
||||
|
||||
/// Get a value by partial key. Only works for flushed data.
|
||||
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>>;
|
||||
@ -157,13 +157,13 @@ pub trait KeyValueDB: Sync + Send {
|
||||
fn write_buffered(&self, transaction: DBTransaction);
|
||||
|
||||
/// Write a transaction of changes to the backing store.
|
||||
fn write(&self, transaction: DBTransaction) -> Result<(), String> {
|
||||
fn write(&self, transaction: DBTransaction) -> Result<()> {
|
||||
self.write_buffered(transaction);
|
||||
self.flush()
|
||||
}
|
||||
|
||||
/// Flush all buffered data.
|
||||
fn flush(&self) -> Result<(), String>;
|
||||
fn flush(&self) -> Result<()>;
|
||||
|
||||
/// Iterate over flushed data for a given column.
|
||||
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
|
||||
@ -173,5 +173,5 @@ pub trait KeyValueDB: Sync + Send {
|
||||
-> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
|
||||
|
||||
/// Attempt to replace this database with a new one located at the given path.
|
||||
fn restore(&self, new_db: &str) -> Result<(), Error>;
|
||||
fn restore(&self, new_db: &str) -> Result<()>;
|
||||
}
|
||||
|
@ -9,3 +9,4 @@ macros = { path = "../macros" }
|
||||
kvdb = { path = "../kvdb" }
|
||||
kvdb-rocksdb = { path = "../kvdb-rocksdb" }
|
||||
ethcore-devtools = { path = "../../devtools" }
|
||||
error-chain = "0.11.0"
|
||||
|
@ -22,6 +22,8 @@ mod tests;
|
||||
extern crate log;
|
||||
#[macro_use]
|
||||
extern crate macros;
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
|
||||
extern crate ethcore_devtools as devtools;
|
||||
extern crate kvdb;
|
||||
@ -30,11 +32,37 @@ extern crate kvdb_rocksdb;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::{fs, fmt};
|
||||
use std::{fs, io};
|
||||
|
||||
use kvdb::DBTransaction;
|
||||
use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
|
||||
|
||||
error_chain! {
|
||||
types {
|
||||
Error, ErrorKind, ResultExt, Result;
|
||||
}
|
||||
|
||||
links {
|
||||
Db(kvdb::Error, kvdb::ErrorKind);
|
||||
}
|
||||
|
||||
foreign_links {
|
||||
Io(io::Error);
|
||||
}
|
||||
|
||||
errors {
|
||||
CannotAddMigration {
|
||||
description("Cannot add migration"),
|
||||
display("Cannot add migration"),
|
||||
}
|
||||
|
||||
MigrationImpossible {
|
||||
description("Migration impossible"),
|
||||
display("Migration impossible"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Migration config.
|
||||
#[derive(Clone)]
|
||||
pub struct Config {
|
||||
@ -71,7 +99,7 @@ impl Batch {
|
||||
}
|
||||
|
||||
/// Insert a value into the batch, committing if necessary.
|
||||
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> Result<(), Error> {
|
||||
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> Result<()> {
|
||||
self.inner.insert(key, value);
|
||||
if self.inner.len() == self.batch_size {
|
||||
self.commit(dest)?;
|
||||
@ -80,7 +108,7 @@ impl Batch {
|
||||
}
|
||||
|
||||
/// Commit all the items in the batch to the given database.
|
||||
pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> {
|
||||
pub fn commit(&mut self, dest: &mut Database) -> Result<()> {
|
||||
if self.inner.is_empty() { return Ok(()) }
|
||||
|
||||
let mut transaction = DBTransaction::new();
|
||||
@ -90,43 +118,7 @@ impl Batch {
|
||||
}
|
||||
|
||||
self.inner.clear();
|
||||
dest.write(transaction).map_err(Error::Custom)
|
||||
}
|
||||
}
|
||||
|
||||
/// Migration error.
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Error returned when it is impossible to add new migration rules.
|
||||
CannotAddMigration,
|
||||
/// Error returned when migration from specific version can not be performed.
|
||||
MigrationImpossible,
|
||||
/// Io Error.
|
||||
Io(::std::io::Error),
|
||||
/// Custom error.
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
match *self {
|
||||
Error::CannotAddMigration => write!(f, "Cannot add migration"),
|
||||
Error::MigrationImpossible => write!(f, "Migration impossible"),
|
||||
Error::Io(ref err) => write!(f, "{}", err),
|
||||
Error::Custom(ref err) => write!(f, "{}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<::std::io::Error> for Error {
|
||||
fn from(e: ::std::io::Error) -> Self {
|
||||
Error::Io(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Error {
|
||||
fn from(e: String) -> Self {
|
||||
Error::Custom(e)
|
||||
dest.write(transaction).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,7 +134,7 @@ pub trait Migration: 'static {
|
||||
/// Version of the database after the migration.
|
||||
fn version(&self) -> u32;
|
||||
/// Migrate a source to a destination.
|
||||
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: Option<u32>) -> Result<(), Error>;
|
||||
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: Option<u32>) -> Result<()>;
|
||||
}
|
||||
|
||||
/// A simple migration over key-value pairs.
|
||||
@ -163,7 +155,7 @@ impl<T: SimpleMigration> Migration for T {
|
||||
|
||||
fn alters_existing(&self) -> bool { true }
|
||||
|
||||
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
|
||||
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<()> {
|
||||
let mut batch = Batch::new(config, col);
|
||||
|
||||
let iter = match source.iter(col) {
|
||||
@ -196,7 +188,7 @@ impl Migration for ChangeColumns {
|
||||
fn columns(&self) -> Option<u32> { self.post_columns }
|
||||
fn version(&self) -> u32 { self.version }
|
||||
fn alters_existing(&self) -> bool { false }
|
||||
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: Option<u32>) -> Result<(), Error> {
|
||||
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: Option<u32>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -250,7 +242,7 @@ impl Manager {
|
||||
}
|
||||
|
||||
/// Adds new migration rules.
|
||||
pub fn add_migration<T>(&mut self, migration: T) -> Result<(), Error> where T: Migration {
|
||||
pub fn add_migration<T>(&mut self, migration: T) -> Result<()> where T: Migration {
|
||||
let is_new = match self.migrations.last() {
|
||||
Some(last) => migration.version() > last.version(),
|
||||
None => true,
|
||||
@ -258,17 +250,19 @@ impl Manager {
|
||||
|
||||
match is_new {
|
||||
true => Ok(self.migrations.push(Box::new(migration))),
|
||||
false => Err(Error::CannotAddMigration),
|
||||
false => Err(ErrorKind::CannotAddMigration.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Performs migration in order, starting with a source path, migrating between two temporary databases,
|
||||
/// and producing a path where the final migration lives.
|
||||
pub fn execute(&mut self, old_path: &Path, version: u32) -> Result<PathBuf, Error> {
|
||||
pub fn execute(&mut self, old_path: &Path, version: u32) -> Result<PathBuf> {
|
||||
let config = self.config.clone();
|
||||
let migrations = self.migrations_from(version);
|
||||
trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
|
||||
if migrations.is_empty() { return Err(Error::MigrationImpossible) };
|
||||
if migrations.is_empty() {
|
||||
return Err(ErrorKind::MigrationImpossible.into())
|
||||
};
|
||||
|
||||
let columns = migrations.get(0).and_then(|m| m.pre_columns());
|
||||
|
||||
@ -286,8 +280,8 @@ impl Manager {
|
||||
let mut temp_path = old_path.to_path_buf();
|
||||
|
||||
// start with the old db.
|
||||
let old_path_str = old_path.to_str().ok_or(Error::MigrationImpossible)?;
|
||||
let mut cur_db = Arc::new(Database::open(&db_config, old_path_str).map_err(Error::Custom)?);
|
||||
let old_path_str = old_path.to_str().ok_or(ErrorKind::MigrationImpossible)?;
|
||||
let mut cur_db = Arc::new(Database::open(&db_config, old_path_str)?);
|
||||
|
||||
for migration in migrations {
|
||||
trace!(target: "migration", "starting migration to version {}", migration.version());
|
||||
@ -300,8 +294,8 @@ impl Manager {
|
||||
temp_path = temp_idx.path(&db_root);
|
||||
|
||||
// open the target temporary database.
|
||||
let temp_path_str = temp_path.to_str().ok_or(Error::MigrationImpossible)?;
|
||||
let mut new_db = Database::open(&db_config, temp_path_str).map_err(Error::Custom)?;
|
||||
let temp_path_str = temp_path.to_str().ok_or(ErrorKind::MigrationImpossible)?;
|
||||
let mut new_db = Database::open(&db_config, temp_path_str)?;
|
||||
|
||||
match current_columns {
|
||||
// migrate only default column
|
||||
@ -324,11 +318,11 @@ impl Manager {
|
||||
// we can do this in-place.
|
||||
let goal_columns = migration.columns().unwrap_or(0);
|
||||
while cur_db.num_columns() < goal_columns {
|
||||
cur_db.add_column().map_err(Error::Custom)?;
|
||||
cur_db.add_column().map_err(kvdb::Error::from)?;
|
||||
}
|
||||
|
||||
while cur_db.num_columns() > goal_columns {
|
||||
cur_db.drop_column().map_err(Error::Custom)?;
|
||||
cur_db.drop_column().map_err(kvdb::Error::from)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ extern crate ethcore_logger;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use std::fmt;
|
||||
use std::{fmt, error};
|
||||
use bigint::hash::H256;
|
||||
use keccak::KECCAK_NULL_RLP;
|
||||
use hashdb::{HashDB, DBValue};
|
||||
@ -86,6 +86,15 @@ impl fmt::Display for TrieError {
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for TrieError {
|
||||
fn description(&self) -> &str {
|
||||
match *self {
|
||||
TrieError::InvalidStateRoot(_) => "Invalid state root",
|
||||
TrieError::IncompleteDatabase(_) => "Incomplete database",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trie result type. Boxed to avoid copying around extra space for `H256`s on successful queries.
|
||||
pub type Result<T> = ::std::result::Result<T, Box<TrieError>>;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user