Save pending local transactions in the database (#4566)

* Create new column family for local node info

* remove DBTransaction::new reliance on DB

* KeyValueDB trait

* InMemory KeyValueDB implementation

* journaldb generic over KVDB

* make most of `ethcore` generic over KVDB

* fix json tests compilation

* get all tests compiling

* implement local store (just for transactions)

* finish local store API, test

* put everything into place

* better test for skipping bad transactions

* fix warning

* update local store every 15 minutes

* remove superfluous `{}`s
This commit is contained in:
Robert Habermeier 2017-02-20 17:21:55 +01:00 committed by Gav Wood
parent 00351374e4
commit 62b340f2b9
34 changed files with 801 additions and 236 deletions

16
Cargo.lock generated
View File

@ -35,6 +35,7 @@ dependencies = [
"number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-hash-fetch 1.6.0",
"parity-ipfs-api 1.6.0",
"parity-local-store 0.1.0",
"parity-reactor 0.1.0",
"parity-rpc-client 1.4.0",
"parity-updater 1.6.0",
@ -1641,6 +1642,21 @@ dependencies = [
"rlp 0.1.0",
]
[[package]]
name = "parity-local-store"
version = "0.1.0"
dependencies = [
"ethcore 1.6.0",
"ethcore-io 1.6.0",
"ethcore-util 1.6.0",
"ethkey 0.2.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.1.0",
"serde 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "parity-reactor"
version = "0.1.0"

View File

@ -47,6 +47,7 @@ parity-hash-fetch = { path = "hash-fetch" }
parity-ipfs-api = { path = "ipfs" }
parity-updater = { path = "updater" }
parity-reactor = { path = "util/reactor" }
parity-local-store = { path = "local-store" }
ethcore-dapps = { path = "dapps", optional = true }
clippy = { version = "0.0.103", optional = true}
ethcore-secretstore = { path = "secret_store", optional = true }

View File

@ -191,7 +191,7 @@ pub struct BlockChain {
blocks_blooms: RwLock<HashMap<LogGroupPosition, BloomGroup>>,
block_receipts: RwLock<HashMap<H256, BlockReceipts>>,
db: Arc<Database>,
db: Arc<KeyValueDB>,
cache_man: Mutex<CacheManager<CacheId>>,
@ -421,7 +421,7 @@ impl<'a> Iterator for AncestryIter<'a> {
impl BlockChain {
/// Create new instance of blockchain from given Genesis.
pub fn new(config: Config, genesis: &[u8], db: Arc<Database>) -> BlockChain {
pub fn new(config: Config, genesis: &[u8], db: Arc<KeyValueDB>) -> BlockChain {
// 400 is the avarage size of the key
let cache_man = CacheManager::new(config.pref_cache_size, config.max_cache_size, 400);
@ -467,7 +467,7 @@ impl BlockChain {
children: vec![]
};
let mut batch = DBTransaction::new(&db);
let mut batch = DBTransaction::new();
batch.put(db::COL_HEADERS, &hash, block.header_rlp().as_raw());
batch.put(db::COL_BODIES, &hash, &Self::block_to_body(genesis));
@ -1314,7 +1314,7 @@ impl BlockChain {
}
#[cfg(test)]
pub fn db(&self) -> &Arc<Database> {
pub fn db(&self) -> &Arc<KeyValueDB> {
&self.db
}
}
@ -1324,13 +1324,12 @@ mod tests {
#![cfg_attr(feature="dev", allow(similar_names))]
use std::sync::Arc;
use rustc_serialize::hex::FromHex;
use util::{Database, DatabaseConfig};
use util::kvdb::KeyValueDB;
use util::hash::*;
use util::sha3::Hashable;
use receipt::Receipt;
use blockchain::{BlockProvider, BlockChain, Config, ImportRoute};
use tests::helpers::*;
use devtools::*;
use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer};
use blockchain::extras::TransactionAddress;
use views::BlockView;
@ -1339,11 +1338,11 @@ mod tests {
use ethkey::Secret;
use header::BlockNumber;
fn new_db(path: &str) -> Arc<Database> {
Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), path).unwrap())
fn new_db() -> Arc<KeyValueDB> {
Arc::new(::util::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)))
}
fn new_chain(genesis: &[u8], db: Arc<Database>) -> BlockChain {
fn new_chain(genesis: &[u8], db: Arc<KeyValueDB>) -> BlockChain {
BlockChain::new(Config::default(), genesis, db)
}
@ -1355,13 +1354,12 @@ mod tests {
let genesis = canon_chain.generate(&mut finalizer).unwrap();
let first = canon_chain.generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_number(), 0);
// when
let mut batch =db.transaction();
let mut batch = db.transaction();
bc.insert_block(&mut batch, &first, vec![]);
assert_eq!(bc.best_block_number(), 0);
bc.commit();
@ -1381,8 +1379,7 @@ mod tests {
let genesis_hash = BlockView::new(&genesis).header_view().sha3();
let first_hash = BlockView::new(&first).header_view().sha3();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.genesis_hash(), genesis_hash.clone());
@ -1391,7 +1388,7 @@ mod tests {
assert_eq!(bc.block_hash(1), None);
assert_eq!(bc.block_details(&genesis_hash).unwrap().children, vec![]);
let mut batch =db.transaction();
let mut batch = db.transaction();
bc.insert_block(&mut batch, &first, vec![]);
db.write(batch).unwrap();
bc.commit();
@ -1412,8 +1409,7 @@ mod tests {
let genesis = canon_chain.generate(&mut finalizer).unwrap();
let genesis_hash = BlockView::new(&genesis).header_view().sha3();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut block_hashes = vec![genesis_hash.clone()];
@ -1448,8 +1444,7 @@ mod tests {
let b5b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let b5a = canon_chain.generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut batch =db.transaction();
@ -1514,8 +1509,7 @@ mod tests {
let t1_hash = t1.hash();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut batch = db.transaction();
@ -1602,8 +1596,7 @@ mod tests {
let t2_hash = t2.hash();
let t3_hash = t3.hash();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut batch = db.transaction();
@ -1664,8 +1657,7 @@ mod tests {
// b3a is a part of canon chain, whereas b3b is part of sidechain
let best_block_hash = b3a_hash.clone();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut batch = db.transaction();
@ -1778,10 +1770,9 @@ mod tests {
let first = canon_chain.generate(&mut finalizer).unwrap();
let genesis_hash = BlockView::new(&genesis).header_view().sha3();
let first_hash = BlockView::new(&first).header_view().sha3();
let db = new_db();
let temp = RandomTempPath::new();
{
let db = new_db(temp.as_str());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_hash(), genesis_hash);
let mut batch =db.transaction();
@ -1792,7 +1783,6 @@ mod tests {
}
{
let db = new_db(temp.as_str());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_hash(), first_hash);
@ -1846,8 +1836,7 @@ mod tests {
let b1 = "f904a8f901faa0ce1f26f798dd03c8782d63b3e42e79a64eaea5694ea686ac5d7ce3df5171d1aea01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a0a65c2364cd0f1542d761823dc0109c6b072f14c20459598c5455c274601438f4a070616ebd7ad2ed6fb7860cf7e9df00163842351c38a87cac2c1cb193895035a2a05c5b4fc43c2d45787f54e1ae7d27afdb4ad16dfc567c5692070d5c4556e0b1d7b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000830200000183023ec683021536845685109780a029f07836e4e59229b3a065913afc27702642c683bba689910b2b2fd45db310d3888957e6d004a31802f902a7f85f800a8255f094aaaf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ca0575da4e21b66fa764be5f74da9389e67693d066fb0d1312e19e17e501da00ecda06baf5a5327595f6619dfc2fcb3f2e6fb410b5810af3cb52d0e7508038e91a188f85f010a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba04fa966bf34b93abc1bcd665554b7f316b50f928477b50be0f3285ead29d18c5ba017bba0eeec1625ab433746955e125d46d80b7fdc97386c51266f842d8e02192ef85f020a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ca004377418ae981cc32b1312b4a427a1d69a821b28db8584f5f2bd8c6d42458adaa053a1dba1af177fac92f3b6af0a9fa46a22adf56e686c93794b6a012bf254abf5f85f030a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ca04fe13febd28a05f4fcb2f451d7ddc2dda56486d9f8c79a62b0ba4da775122615a0651b2382dd402df9ebc27f8cb4b2e0f3cea68dda2dca0ee9603608f0b6f51668f85f040a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba078e6a0ba086a08f8450e208a399bb2f2d2a0d984acd2517c7c7df66ccfab567da013254002cd45a97fac049ae00afbc43ed0d9961d0c56a3b2382c80ce41c198ddf85f050a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba0a7174d8f43ea71c8e3ca9477691add8d80ac8e0ed89d8d8b572041eef81f4a54a0534ea2e28ec4da3b5b944b18c51ec84a5cf35f5b3343c5fb86521fd2d388f506f85f060a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba034bd04065833536a10c77ee2a43a5371bc6d34837088b861dd9d4b7f44074b59a078807715786a13876d3455716a6b9cb2186b7a4887a5c31160fc877454958616c0".from_hex().unwrap();
let b1_hash: H256 = "f53f268d23a71e85c7d6d83a9504298712b84c1a2ba220441c86eeda0bf0b6e3".into();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut batch =db.transaction();
bc.insert_block(&mut batch, &b1, vec![]);
@ -1861,7 +1850,7 @@ mod tests {
}
}
fn insert_block(db: &Arc<Database>, bc: &BlockChain, bytes: &[u8], receipts: Vec<Receipt>) -> ImportRoute {
fn insert_block(db: &Arc<KeyValueDB>, bc: &BlockChain, bytes: &[u8], receipts: Vec<Receipt>) -> ImportRoute {
let mut batch = db.transaction();
let res = bc.insert_block(&mut batch, bytes, receipts);
db.write(batch).unwrap();
@ -1906,8 +1895,7 @@ mod tests {
let b1 = canon_chain.with_transaction(t1).with_transaction(t2).generate(&mut finalizer).unwrap();
let b2 = canon_chain.with_transaction(t3).generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
insert_block(&db, &bc, &b1, vec![Receipt {
state_root: Some(H256::default()),
@ -2015,8 +2003,7 @@ mod tests {
let b1a = canon_chain.with_bloom(bloom_ba.clone()).generate(&mut finalizer).unwrap();
let b2a = canon_chain.with_bloom(bloom_ba.clone()).generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let blocks_b1 = bc.blocks_with_bloom(&bloom_b1, 0, 5);
@ -2070,14 +2057,12 @@ mod tests {
let mut finalizer = BlockFinalizer::default();
let genesis = canon_chain.generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let db = new_db();
{
let db = new_db(temp.as_str());
let bc = new_chain(&genesis, db.clone());
let uncle = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let mut batch =db.transaction();
let mut batch = db.transaction();
// create a longer fork
for _ in 0..5 {
let canon_block = canon_chain.generate(&mut finalizer).unwrap();
@ -2092,8 +2077,7 @@ mod tests {
}
// re-loading the blockchain should load the correct best block.
let db = new_db(temp.as_str());
let bc = new_chain(&genesis, db.clone());
let bc = new_chain(&genesis, db);
assert_eq!(bc.best_block_number(), 5);
}
@ -2108,8 +2092,7 @@ mod tests {
let first_hash = BlockView::new(&first).header_view().sha3();
let second_hash = BlockView::new(&second).header_view().sha3();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let bc = new_chain(&genesis, db.clone());
let mut batch =db.transaction();

View File

@ -17,7 +17,6 @@
use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::path::{Path};
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant};
@ -135,7 +134,7 @@ pub struct Client {
engine: Arc<Engine>,
config: ClientConfig,
pruning: journaldb::Algorithm,
db: RwLock<Arc<Database>>,
db: RwLock<Arc<KeyValueDB>>,
state_db: Mutex<StateDB>,
block_queue: BlockQueue,
report: RwLock<ClientReport>,
@ -157,18 +156,16 @@ pub struct Client {
}
impl Client {
/// Create a new client with given spec and DB path and custom verifier.
/// Create a new client with given parameters.
/// The database is assumed to have been initialized with the correct columns.
pub fn new(
config: ClientConfig,
spec: &Spec,
path: &Path,
db: Arc<KeyValueDB>,
miner: Arc<Miner>,
message_channel: IoChannel<ClientIoMessage>,
db_config: &DatabaseConfig,
) -> Result<Arc<Client>, ClientError> {
let path = path.to_path_buf();
let db = Arc::new(Database::open(&db_config, &path.to_str().expect("DB path could not be converted to string.")).map_err(ClientError::Database)?);
let trie_spec = match config.fat_db {
true => TrieSpec::Fat,
false => TrieSpec::Secure,
@ -186,7 +183,7 @@ impl Client {
if state_db.journal_db().is_empty() {
// Sets the correct state root.
state_db = spec.ensure_db_good(state_db, &factories)?;
let mut batch = DBTransaction::new(&db);
let mut batch = DBTransaction::new();
state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash())?;
db.write(batch).map_err(ClientError::Database)?;
}
@ -530,7 +527,7 @@ impl Client {
// Commit results
let receipts = ::rlp::decode(&receipts_bytes);
let mut batch = DBTransaction::new(&self.db.read());
let mut batch = DBTransaction::new();
chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true);
// Final commit to the DB
self.db.read().write_buffered(batch);
@ -554,7 +551,7 @@ impl Client {
//let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new));
let mut batch = DBTransaction::new(&self.db.read());
let mut batch = DBTransaction::new();
// CHECK! I *think* this is fine, even if the state_root is equal to another
// already-imported block of the same number.
// TODO: Prove it with a test.
@ -603,7 +600,7 @@ impl Client {
trace!(target: "client", "Pruning state for ancient era {}", era);
match chain.block_hash(era) {
Some(ancient_hash) => {
let mut batch = DBTransaction::new(&self.db.read());
let mut batch = DBTransaction::new();
state_db.mark_canonical(&mut batch, era, &ancient_hash)?;
self.db.read().write_buffered(batch);
state_db.journal_db().flush();
@ -1691,7 +1688,7 @@ mod tests {
let go_thread = go.clone();
let another_client = client.reference().clone();
thread::spawn(move || {
let mut batch = DBTransaction::new(&*another_client.chain.read().db().clone());
let mut batch = DBTransaction::new();
another_client.chain.read().insert_block(&mut batch, &new_block, Vec::new());
go_thread.store(true, Ordering::SeqCst);
});

View File

@ -19,7 +19,7 @@
use std::ops::Deref;
use std::hash::Hash;
use std::collections::HashMap;
use util::{DBTransaction, Database, RwLock};
use util::{DBTransaction, KeyValueDB, RwLock};
use rlp;
@ -34,10 +34,12 @@ pub const COL_BODIES: Option<u32> = Some(2);
pub const COL_EXTRA: Option<u32> = Some(3);
/// Column for Traces
pub const COL_TRACE: Option<u32> = Some(4);
/// Column for Traces
/// Column for the empty accounts bloom filter.
pub const COL_ACCOUNT_BLOOM: Option<u32> = Some(5);
/// Column for general information from the local node which can persist.
pub const COL_NODE_INFO: Option<u32> = Some(6);
/// Number of columns in DB
pub const NUM_COLUMNS: Option<u32> = Some(6);
pub const NUM_COLUMNS: Option<u32> = Some(7);
/// Modes for updating caches.
#[derive(Clone, Copy)]
@ -212,7 +214,7 @@ impl Writable for DBTransaction {
}
}
impl Readable for Database {
impl<KVDB: KeyValueDB + ?Sized> Readable for KVDB {
fn read<T, R>(&self, col: Option<u32>, key: &Key<T, Target = R>) -> Option<T> where T: rlp::Decodable, R: Deref<Target = [u8]> {
let result = self.get(col, &key.key());

View File

@ -649,8 +649,7 @@ mod tests {
use account_provider::AccountProvider;
use spec::Spec;
use engines::{Engine, EngineError, Seal};
use super::{Step, View, Height, message_info_rlp, message_full_rlp};
use super::message::VoteStep;
use super::*;
/// Accounts inserted with "0" and "1" are validators. First proposer is "0".
fn setup() -> (Spec, Arc<AccountProvider>) {

View File

@ -19,7 +19,6 @@ use client::{BlockChainClient, Client, ClientConfig};
use block::Block;
use ethereum;
use tests::helpers::*;
use devtools::*;
use spec::Genesis;
use ethjson;
use miner::Miner;
@ -58,16 +57,14 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec<String> {
spec
};
let temp = RandomTempPath::new();
{
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let db = Arc::new(::util::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)));
let client = Client::new(
ClientConfig::default(),
&spec,
temp.as_path(),
db,
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config,
).unwrap();
for b in &blockchain.blocks_rlp() {
if Block::is_good(&b) {

View File

@ -26,3 +26,6 @@ pub use self::v9::Extract;
mod v10;
pub use self::v10::ToV10;
mod v11;
pub use self::v11::ToV11;

View File

@ -70,7 +70,7 @@ pub fn generate_bloom(source: Arc<Database>, dest: &mut Database) -> Result<(),
trace!(target: "migration", "Generated {} bloom updates", bloom_journal.entries.len());
let mut batch = DBTransaction::new(dest);
let mut batch = DBTransaction::new();
StateDB::commit_bloom(&mut batch, bloom_journal).map_err(|_| Error::Custom("Failed to commit bloom".to_owned()))?;
dest.write(batch)?;

View File

@ -0,0 +1,46 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Adds a seventh column for node information.
use util::kvdb::Database;
use util::migration::{Batch, Config, Error, Migration, Progress};
use std::sync::Arc;
/// Copies over data for all existing columns.
#[derive(Default)]
pub struct ToV11(Progress);
impl Migration for ToV11 {
fn pre_columns(&self) -> Option<u32> { Some(6) }
fn columns(&self) -> Option<u32> { Some(7) }
fn version(&self) -> u32 { 11 }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
// just copy everything over.
let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col) {
self.0.tick();
batch.insert(key.to_vec(), value.to_vec(), dest)?
}
batch.commit(dest)
}
}

View File

@ -1109,7 +1109,7 @@ impl TransactionQueue {
r
}
/// Return all ready transactions.
/// Return all future transactions.
pub fn future_transactions(&self) -> Vec<PendingTransaction> {
self.future.by_priority
.iter()

View File

@ -57,6 +57,7 @@ pub struct ClientService {
client: Arc<Client>,
snapshot: Arc<SnapshotService>,
panic_handler: Arc<PanicHandler>,
database: Arc<Database>,
_stop_guard: ::devtools::StopGuard,
}
@ -88,8 +89,14 @@ impl ClientService {
db_config.compaction = config.db_compaction.compaction_profile(client_path);
db_config.wal = config.db_wal;
let db = Arc::new(Database::open(
&db_config,
&client_path.to_str().expect("DB path could not be converted to string.")
).map_err(::client::Error::Database)?);
let pruning = config.pruning;
let client = Client::new(config, &spec, client_path, miner, io_service.channel(), &db_config)?;
let client = Client::new(config, &spec, db.clone(), miner, io_service.channel())?;
let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(),
@ -119,15 +126,11 @@ impl ClientService {
client: client,
snapshot: snapshot,
panic_handler: panic_handler,
database: db,
_stop_guard: stop_guard,
})
}
/// Add a node to network
pub fn add_node(&mut self, _enode: &str) {
unimplemented!();
}
/// Get general IO interface
pub fn register_io_handler(&self, handler: Arc<IoHandler<ClientIoMessage> + Send>) -> Result<(), IoError> {
self.io_service.register_handler(handler)
@ -152,6 +155,9 @@ impl ClientService {
pub fn add_notify(&self, notify: Arc<ChainNotify>) {
self.client.add_notify(notify);
}
/// Get a handle to the database.
pub fn db(&self) -> Arc<KeyValueDB> { self.database.clone() }
}
impl MayPanic for ClientService {

View File

@ -27,7 +27,7 @@ use tests::helpers::generate_dummy_client_with_spec_and_data;
use devtools::RandomTempPath;
use io::IoChannel;
use util::kvdb::DatabaseConfig;
use util::kvdb::{Database, DatabaseConfig};
struct NoopDBRestore;
@ -54,15 +54,15 @@ fn restored_is_equivalent() {
path.push("snapshot");
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Database::open(&db_config, client_db.to_str().unwrap()).unwrap();
let spec = Spec::new_null();
let client2 = Client::new(
Default::default(),
&spec,
&client_db,
Arc::new(client_db),
Arc::new(::miner::Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config,
).unwrap();
let service_params = ServiceParams {
@ -140,4 +140,4 @@ fn guards_delete_folders() {
drop(service);
assert!(!path.exists());
}
}

View File

@ -18,11 +18,12 @@ use std::collections::{VecDeque, HashSet};
use lru_cache::LruCache;
use util::cache::MemoryLruCache;
use util::journaldb::JournalDB;
use util::kvdb::KeyValueDB;
use util::hash::{H256};
use util::hashdb::HashDB;
use state::Account;
use header::BlockNumber;
use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable};
use util::{Arc, Address, DBTransaction, UtilError, Mutex, Hashable};
use bloom_journal::{Bloom, BloomJournal};
use db::COL_ACCOUNT_BLOOM;
use byteorder::{LittleEndian, ByteOrder};
@ -116,7 +117,7 @@ impl StateDB {
// TODO: make the cache size actually accurate by moving the account storage cache
// into the `AccountCache` structure as its own `LruCache<(Address, H256), H256>`.
pub fn new(db: Box<JournalDB>, cache_size: usize) -> StateDB {
let bloom = Self::load_bloom(db.backing());
let bloom = Self::load_bloom(&**db.backing());
let acc_cache_size = cache_size * ACCOUNT_CACHE_RATIO / 100;
let code_cache_size = cache_size - acc_cache_size;
let cache_items = acc_cache_size / ::std::mem::size_of::<Option<Account>>();
@ -139,7 +140,7 @@ impl StateDB {
/// Loads accounts bloom from the database
/// This bloom is used to handle request for the non-existant account fast
pub fn load_bloom(db: &Database) -> Bloom {
pub fn load_bloom(db: &KeyValueDB) -> Bloom {
let hash_count_entry = db.get(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY)
.expect("Low-level database error");
@ -477,7 +478,7 @@ mod tests {
let h2b = H256::random();
let h3a = H256::random();
let h3b = H256::random();
let mut batch = DBTransaction::new(state_db.journal_db().backing());
let mut batch = DBTransaction::new();
// blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ]
// balance [ 5 5 4 3 2 2 ]

View File

@ -37,14 +37,14 @@ fn imports_from_empty() {
let dir = RandomTempPath::new();
let spec = get_test_spec();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
client_db,
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
client.import_verified_blocks();
client.flush_queue();
@ -55,14 +55,14 @@ fn should_return_registrar() {
let dir = RandomTempPath::new();
let spec = ethereum::new_morden();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
client_db,
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
let params = client.additional_params();
let address = &params["registrar"];
@ -86,14 +86,14 @@ fn imports_good_block() {
let dir = RandomTempPath::new();
let spec = get_test_spec();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
client_db,
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
let good_block = get_good_dummy_block();
if client.import_block(good_block).is_err() {
@ -111,14 +111,14 @@ fn query_none_block() {
let dir = RandomTempPath::new();
let spec = get_test_spec();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
client_db,
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
let non_existant = client.block_header(BlockId::Number(188));
assert!(non_existant.is_none());
@ -277,10 +277,19 @@ fn change_history_size() {
let test_spec = Spec::new_null();
let mut config = ClientConfig::default();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
config.history = 2;
let address = Address::random();
{
let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected(), &db_config).unwrap();
let client = Client::new(
ClientConfig::default(),
&test_spec,
client_db.clone(),
Arc::new(Miner::with_spec(&test_spec)),
IoChannel::disconnected()
).unwrap();
for _ in 0..20 {
let mut b = client.prepare_open_block(Address::default(), (3141562.into(), 31415620.into()), vec![]);
b.block_mut().fields_mut().state.add_balance(&address, &5.into(), CleanupMode::NoEmpty);
@ -291,7 +300,13 @@ fn change_history_size() {
}
let mut config = ClientConfig::default();
config.history = 10;
let client = Client::new(config, &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected(), &db_config).unwrap();
let client = Client::new(
config,
&test_spec,
client_db,
Arc::new(Miner::with_spec(&test_spec)),
IoChannel::disconnected(),
).unwrap();
assert_eq!(client.state().balance(&address), 100.into());
}

View File

@ -154,14 +154,14 @@ pub fn generate_dummy_client_with_spec_accounts_and_data<F>(get_test_spec: F, ac
let dir = RandomTempPath::new();
let test_spec = get_test_spec();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
let client = Client::new(
ClientConfig::default(),
&test_spec,
dir.as_path(),
client_db,
Arc::new(Miner::with_spec_and_accounts(&test_spec, accounts)),
IoChannel::disconnected(),
&db_config
).unwrap();
let test_engine = &*test_spec.engine;
@ -260,14 +260,14 @@ pub fn get_test_client_with_blocks(blocks: Vec<Bytes>) -> GuardedTempResult<Arc<
let dir = RandomTempPath::new();
let test_spec = get_test_spec();
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap());
let client = Client::new(
ClientConfig::default(),
&test_spec,
dir.as_path(),
client_db,
Arc::new(Miner::with_spec(&test_spec)),
IoChannel::disconnected(),
&db_config
).unwrap();
for block in &blocks {

View File

@ -20,7 +20,7 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use bloomchain::{Number, Config as BloomConfig};
use bloomchain::group::{BloomGroupDatabase, BloomGroupChain, GroupPosition, BloomGroup};
use util::{H256, H264, Database, DBTransaction, RwLock, HeapSizeOf};
use util::{H256, H264, KeyValueDB, DBTransaction, RwLock, HeapSizeOf};
use header::BlockNumber;
use trace::{LocalizedTrace, Config, Filter, Database as TraceDatabase, ImportRequest, DatabaseExtras};
use db::{self, Key, Writable, Readable, CacheUpdatePolicy};
@ -106,7 +106,7 @@ pub struct TraceDB<T> where T: DatabaseExtras {
blooms: RwLock<HashMap<TraceGroupPosition, blooms::BloomGroup>>,
cache_manager: RwLock<CacheManager<CacheId>>,
// db
tracesdb: Arc<Database>,
tracesdb: Arc<KeyValueDB>,
// config,
bloom_config: BloomConfig,
// tracing enabled
@ -126,8 +126,8 @@ impl<T> BloomGroupDatabase for TraceDB<T> where T: DatabaseExtras {
impl<T> TraceDB<T> where T: DatabaseExtras {
/// Creates new instance of `TraceDB`.
pub fn new(config: Config, tracesdb: Arc<Database>, extras: Arc<T>) -> Self {
let mut batch = DBTransaction::new(&tracesdb);
pub fn new(config: Config, tracesdb: Arc<KeyValueDB>, extras: Arc<T>) -> Self {
let mut batch = DBTransaction::new();
let genesis = extras.block_hash(0)
.expect("Genesis block is always inserted upon extras db creation qed");
batch.write(db::COL_TRACE, &genesis, &FlatBlockTraces::default());
@ -404,8 +404,7 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use util::{Address, U256, H256, Database, DatabaseConfig, DBTransaction};
use devtools::RandomTempPath;
use util::{Address, U256, H256, DBTransaction};
use header::BlockNumber;
use trace::{Config, TraceDB, Database as TraceDatabase, DatabaseExtras, ImportRequest};
use trace::{Filter, LocalizedTrace, AddressesFilter, TraceError};
@ -455,14 +454,13 @@ mod tests {
}
}
fn new_db(path: &str) -> Arc<Database> {
Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), path).unwrap())
fn new_db() -> Arc<::util::kvdb::KeyValueDB> {
Arc::new(::util::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0)))
}
#[test]
fn test_reopening_db_with_tracing_off() {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let mut config = Config::default();
// set autotracing
@ -476,8 +474,7 @@ mod tests {
#[test]
fn test_reopening_db_with_tracing_on() {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let mut config = Config::default();
// set tracing on
@ -555,8 +552,7 @@ mod tests {
#[test]
fn test_import_non_canon_traces() {
let temp = RandomTempPath::new();
let db = Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), temp.as_str()).unwrap());
let db = new_db();
let mut config = Config::default();
config.enabled = true;
let block_0 = H256::from(0xa1);
@ -574,7 +570,7 @@ mod tests {
// import block 0
let request = create_noncanon_import_request(0, block_0.clone());
let mut batch = DBTransaction::new(&db);
let mut batch = DBTransaction::new();
tracedb.import(&mut batch, request);
db.write(batch).unwrap();
@ -584,8 +580,7 @@ mod tests {
#[test]
fn test_import() {
let temp = RandomTempPath::new();
let db = Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), temp.as_str()).unwrap());
let db = new_db();
let mut config = Config::default();
config.enabled = true;
let block_1 = H256::from(0xa1);
@ -605,7 +600,7 @@ mod tests {
// import block 1
let request = create_simple_import_request(1, block_1.clone());
let mut batch = DBTransaction::new(&db);
let mut batch = DBTransaction::new();
tracedb.import(&mut batch, request);
db.write(batch).unwrap();
@ -621,7 +616,7 @@ mod tests {
// import block 2
let request = create_simple_import_request(2, block_2.clone());
let mut batch = DBTransaction::new(&db);
let mut batch = DBTransaction::new();
tracedb.import(&mut batch, request);
db.write(batch).unwrap();
@ -664,8 +659,7 @@ mod tests {
#[test]
fn query_trace_after_reopen() {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let mut config = Config::default();
let mut extras = Extras::default();
let block_0 = H256::from(0xa1);
@ -684,7 +678,7 @@ mod tests {
// import block 1
let request = create_simple_import_request(1, block_0.clone());
let mut batch = DBTransaction::new(&db);
let mut batch = DBTransaction::new();
tracedb.import(&mut batch, request);
db.write(batch).unwrap();
}
@ -698,8 +692,7 @@ mod tests {
#[test]
fn query_genesis() {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let db = new_db();
let mut config = Config::default();
let mut extras = Extras::default();
let block_0 = H256::from(0xa1);

16
local-store/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "parity-local-store"
description = "Manages persistent local node data."
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
ethcore-util = { path = "../util" }
ethcore-io = { path = "../util/io" }
ethcore = { path = "../ethcore" }
rlp = {path = "../util/rlp" }
serde = "0.9"
serde_derive = "0.9"
serde_json = "0.9"
log = "0.3"
ethkey = { path = "../ethkey" }

315
local-store/src/lib.rs Normal file
View File

@ -0,0 +1,315 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Manages local node data: pending local transactions, sync security level
use std::sync::Arc;
use std::fmt;
use ethcore::transaction::{
SignedTransaction, PendingTransaction, UnverifiedTransaction,
Condition as TransactionCondition
};
use ethcore::service::ClientIoMessage;
use io::IoHandler;
use rlp::{UntrustedRlp, View};
use util::kvdb::KeyValueDB;
extern crate ethcore;
extern crate ethcore_util as util;
extern crate ethcore_io as io;
extern crate rlp;
extern crate serde_json;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate log;
#[cfg(test)]
extern crate ethkey;
const LOCAL_TRANSACTIONS_KEY: &'static [u8] = &*b"LOCAL_TXS";
const UPDATE_TIMER: ::io::TimerToken = 0;
const UPDATE_TIMEOUT_MS: u64 = 15 * 60 * 1000; // once every 15 minutes.
/// Errors which can occur while using the local data store.
#[derive(Debug)]
pub enum Error {
/// Database errors: these manifest as `String`s.
Database(String),
/// JSON errors.
Json(::serde_json::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Database(ref val) => write!(f, "{}", val),
Error::Json(ref err) => write!(f, "{}", err),
}
}
}
#[derive(Serialize, Deserialize)]
enum Condition {
Number(::ethcore::header::BlockNumber),
Timestamp(u64),
}
impl From<TransactionCondition> for Condition {
fn from(cond: TransactionCondition) -> Self {
match cond {
TransactionCondition::Number(num) => Condition::Number(num),
TransactionCondition::Timestamp(tm) => Condition::Timestamp(tm),
}
}
}
impl Into<TransactionCondition> for Condition {
fn into(self) -> TransactionCondition {
match self {
Condition::Number(num) => TransactionCondition::Number(num),
Condition::Timestamp(tm) => TransactionCondition::Timestamp(tm),
}
}
}
#[derive(Serialize, Deserialize)]
struct TransactionEntry {
rlp_bytes: Vec<u8>,
condition: Option<Condition>,
}
impl TransactionEntry {
fn into_pending(self) -> Option<PendingTransaction> {
let tx: UnverifiedTransaction = match UntrustedRlp::new(&self.rlp_bytes).as_val() {
Err(e) => {
warn!(target: "local_store", "Invalid persistent transaction stored: {}", e);
return None
}
Ok(tx) => tx,
};
let hash = tx.hash();
match SignedTransaction::new(tx) {
Ok(tx) => Some(PendingTransaction::new(tx, self.condition.map(Into::into))),
Err(_) => {
warn!(target: "local_store", "Bad signature on persistent transaction: {}", hash);
return None
}
}
}
}
impl From<PendingTransaction> for TransactionEntry {
fn from(pending: PendingTransaction) -> Self {
TransactionEntry {
rlp_bytes: ::rlp::encode(&pending.transaction).to_vec(),
condition: pending.condition.map(Into::into),
}
}
}
/// Something which can provide information about the local node.
pub trait NodeInfo: Send + Sync {
/// Get all pending transactions of local origin.
fn pending_transactions(&self) -> Vec<PendingTransaction>;
}
/// Create a new local data store, given a database, a column to write to, and a node.
/// Attempts to read data out of the store, and move it into the node.
pub fn create<T: NodeInfo>(db: Arc<KeyValueDB>, col: Option<u32>, node: T) -> LocalDataStore<T> {
LocalDataStore {
db: db,
col: col,
node: node,
}
}
/// Manages local node data.
///
/// In specific, this will be used to store things like unpropagated local transactions
/// and the node security level.
pub struct LocalDataStore<T: NodeInfo> {
db: Arc<KeyValueDB>,
col: Option<u32>,
node: T,
}
impl<T: NodeInfo> LocalDataStore<T> {
/// Attempt to read pending transactions out of the local store.
pub fn pending_transactions(&self) -> Result<Vec<PendingTransaction>, Error> {
if let Some(val) = self.db.get(self.col, LOCAL_TRANSACTIONS_KEY).map_err(Error::Database)? {
let local_txs: Vec<_> = ::serde_json::from_slice::<Vec<TransactionEntry>>(&val)
.map_err(Error::Json)?
.into_iter()
.filter_map(TransactionEntry::into_pending)
.collect();
Ok(local_txs)
} else {
Ok(Vec::new())
}
}
/// Update the entries in the database.
pub fn update(&self) -> Result<(), Error> {
trace!(target: "local_store", "Updating local store entries.");
let mut batch = self.db.transaction();
let local_entries: Vec<TransactionEntry> = self.node.pending_transactions()
.into_iter()
.map(Into::into)
.collect();
let local_json = ::serde_json::to_value(&local_entries).map_err(Error::Json)?;
let json_str = format!("{}", local_json);
batch.put_vec(self.col, LOCAL_TRANSACTIONS_KEY, json_str.into_bytes());
self.db.write(batch).map_err(Error::Database)
}
}
impl<T: NodeInfo> IoHandler<ClientIoMessage> for LocalDataStore<T> {
fn initialize(&self, io: &::io::IoContext<ClientIoMessage>) {
if let Err(e) = io.register_timer(UPDATE_TIMER, UPDATE_TIMEOUT_MS) {
warn!(target: "local_store", "Error registering local store update timer: {}", e);
}
}
fn timeout(&self, _io: &::io::IoContext<ClientIoMessage>, timer: ::io::TimerToken) {
if let UPDATE_TIMER = timer {
if let Err(e) = self.update() {
debug!(target: "local_store", "Error updating local store: {}", e);
}
}
}
}
impl<T: NodeInfo> Drop for LocalDataStore<T> {
fn drop(&mut self) {
debug!(target: "local_store", "Updating node data store on shutdown.");
let _ = self.update();
}
}
#[cfg(test)]
mod tests {
use super::NodeInfo;
use std::sync::Arc;
use ethcore::transaction::{Transaction, Condition, PendingTransaction};
use ethkey::{Brain, Generator};
// we want to test: round-trip of good transactions.
// failure to roundtrip bad transactions (but that it doesn't panic)
struct Dummy(Vec<PendingTransaction>);
impl NodeInfo for Dummy {
fn pending_transactions(&self) -> Vec<PendingTransaction> { self.0.clone() }
}
#[test]
fn twice_empty() {
let db = Arc::new(::util::kvdb::in_memory(0));
{
let store = super::create(db.clone(), None, Dummy(vec![]));
assert_eq!(store.pending_transactions().unwrap(), vec![])
}
{
let store = super::create(db.clone(), None, Dummy(vec![]));
assert_eq!(store.pending_transactions().unwrap(), vec![])
}
}
#[test]
fn with_condition() {
let keypair = Brain::new("abcd".into()).generate().unwrap();
let transactions: Vec<_> = (0..10u64).map(|nonce| {
let mut tx = Transaction::default();
tx.nonce = nonce.into();
let signed = tx.sign(keypair.secret(), None);
let condition = match nonce {
5 => Some(Condition::Number(100_000)),
_ => None,
};
PendingTransaction::new(signed, condition)
}).collect();
let db = Arc::new(::util::kvdb::in_memory(0));
{
// nothing written yet, will write pending.
let store = super::create(db.clone(), None, Dummy(transactions.clone()));
assert_eq!(store.pending_transactions().unwrap(), vec![])
}
{
// pending written, will write nothing.
let store = super::create(db.clone(), None, Dummy(vec![]));
assert_eq!(store.pending_transactions().unwrap(), transactions)
}
{
// pending removed, will write nothing.
let store = super::create(db.clone(), None, Dummy(vec![]));
assert_eq!(store.pending_transactions().unwrap(), vec![])
}
}
#[test]
fn skips_bad_transactions() {
let keypair = Brain::new("abcd".into()).generate().unwrap();
let mut transactions: Vec<_> = (0..10u64).map(|nonce| {
let mut tx = Transaction::default();
tx.nonce = nonce.into();
let signed = tx.sign(keypair.secret(), None);
PendingTransaction::new(signed, None)
}).collect();
transactions.push({
let mut tx = Transaction::default();
tx.nonce = 10.into();
let signed = tx.fake_sign(Default::default());
PendingTransaction::new(signed, None)
});
let db = Arc::new(::util::kvdb::in_memory(0));
{
// nothing written, will write bad.
let store = super::create(db.clone(), None, Dummy(transactions.clone()));
assert_eq!(store.pending_transactions().unwrap(), vec![])
}
{
// try to load transactions. The last transaction, which is invalid, will be skipped.
let store = super::create(db.clone(), None, Dummy(vec![]));
let loaded = store.pending_transactions().unwrap();
transactions.pop();
assert_eq!(loaded, transactions);
}
}
}

View File

@ -59,6 +59,7 @@ extern crate parity_hash_fetch as hash_fetch;
extern crate parity_ipfs_api;
extern crate parity_reactor;
extern crate parity_updater as updater;
extern crate parity_local_store as local_store;
extern crate rpc_cli;
#[macro_use]

View File

@ -30,7 +30,7 @@ use ethcore::migrations::Extract;
/// Database is assumed to be at default version, when no version file is found.
const DEFAULT_VERSION: u32 = 5;
/// Current version of database models.
const CURRENT_VERSION: u32 = 10;
const CURRENT_VERSION: u32 = 11;
/// First version of the consolidated database.
const CONSOLIDATION_VERSION: u32 = 9;
/// Defines how many items are migrated to the new version of database at once.
@ -146,6 +146,7 @@ pub fn default_migration_settings(compaction_profile: &CompactionProfile) -> Mig
fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> Result<MigrationManager, Error> {
let mut manager = MigrationManager::new(default_migration_settings(compaction_profile));
manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible)?;
manager.add_migration(migrations::ToV11::default()).map_err(|_| Error::MigrationImpossible)?;
Ok(manager)
}

View File

@ -136,6 +136,21 @@ pub fn open_dapp(dapps_conf: &dapps::Configuration, dapp: &str) -> Result<(), St
Ok(())
}
// node info fetcher for the local store.
struct FullNodeInfo {
miner: Arc<Miner>, // TODO: only TXQ needed, just use that after decoupling.
}
impl ::local_store::NodeInfo for FullNodeInfo {
fn pending_transactions(&self) -> Vec<::ethcore::transaction::PendingTransaction> {
let local_txs = self.miner.local_transactions();
self.miner.pending_transactions()
.into_iter()
.chain(self.miner.future_transactions())
.filter(|tx| local_txs.contains_key(&tx.hash()))
.collect()
}
}
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<bool, String> {
if cmd.ui && cmd.dapps_conf.enabled {
@ -318,6 +333,33 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
let client = service.client();
let snapshot_service = service.snapshot_service();
// initialize the local node information store.
let store = {
let db = service.db();
let node_info = FullNodeInfo {
miner: miner.clone(),
};
let store = ::local_store::create(db, ::ethcore::db::COL_NODE_INFO, node_info);
// re-queue pending transactions.
match store.pending_transactions() {
Ok(pending) => {
for pending_tx in pending {
if let Err(e) = miner.import_own_transaction(&*client, pending_tx) {
warn!("Error importing saved transaction: {}", e)
}
}
}
Err(e) => warn!("Error loading cached pending transactions from disk: {}", e),
}
Arc::new(store)
};
// register it as an IO service to update periodically.
service.register_io_handler(store).map_err(|_| "Unable to register local store handler".to_owned())?;
// create external miner
let external_miner = Arc::new(ExternalMiner::default());

View File

@ -116,19 +116,16 @@ impl EthTester {
}
fn from_spec(spec: Spec) -> Self {
let dir = RandomTempPath::new();
let account_provider = account_provider();
let miner_service = miner_service(&spec, account_provider.clone());
let snapshot_service = snapshot_service();
let db_config = ::util::kvdb::DatabaseConfig::with_columns(::ethcore::db::NUM_COLUMNS);
let client = Client::new(
ClientConfig::default(),
&spec,
dir.as_path(),
Arc::new(::util::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))),
miner_service.clone(),
IoChannel::disconnected(),
&db_config
).unwrap();
let sync_provider = sync_provider();
let external_miner = Arc::new(ExternalMiner::default());

View File

@ -62,7 +62,6 @@ fn authority_round() {
ap.insert_account(s1.secret().clone(), "").unwrap();
let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_round, Some(ap));
let mut net = &mut *net;
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(0).chain.clone() });
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(1).chain.clone() });
// Push transaction to both clients. Only one of them gets lucky to produce a block.
@ -121,7 +120,6 @@ fn tendermint() {
ap.insert_account(s1.secret().clone(), "").unwrap();
let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_tendermint, Some(ap));
let mut net = &mut *net;
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(0).chain.clone() });
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(1).chain.clone() });
// Push transaction to both clients. Only one of them issues a proposal.

View File

@ -23,13 +23,11 @@ use ethcore::snapshot::SnapshotService;
use ethcore::spec::Spec;
use ethcore::account_provider::AccountProvider;
use ethcore::miner::Miner;
use ethcore::db::NUM_COLUMNS;
use sync_io::SyncIo;
use io::IoChannel;
use api::WARP_SYNC_PROTOCOL_ID;
use chain::ChainSync;
use ::SyncConfig;
use devtools::{self, GuardedTempResult};
pub trait FlushingBlockChainClient: BlockChainClient {
fn flush(&self) {}
@ -271,7 +269,7 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
}
impl TestNet<EthPeer<EthcoreClient>> {
pub fn with_spec_and_accounts<F>(n: usize, config: SyncConfig, spec_factory: F, accounts: Option<Arc<AccountProvider>>) -> GuardedTempResult<Self>
pub fn with_spec_and_accounts<F>(n: usize, config: SyncConfig, spec_factory: F, accounts: Option<Arc<AccountProvider>>) -> Self
where F: Fn() -> Spec
{
let mut net = TestNet {
@ -279,21 +277,15 @@ impl TestNet<EthPeer<EthcoreClient>> {
started: false,
disconnect_events: Vec::new(),
};
let dir = devtools::RandomTempPath::new();
for _ in 0..n {
let mut client_dir = dir.as_path().clone();
client_dir.push(devtools::random_filename());
let db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
let spec = spec_factory();
let client = EthcoreClient::new(
ClientConfig::default(),
&spec,
client_dir.as_path(),
Arc::new(::util::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))),
Arc::new(Miner::with_spec_and_accounts(&spec, accounts.clone())),
IoChannel::disconnected(),
&db_config
).unwrap();
let ss = Arc::new(TestSnapshotService::new());
@ -307,10 +299,8 @@ impl TestNet<EthPeer<EthcoreClient>> {
peer.chain.add_notify(peer.clone());
net.peers.push(peer);
}
GuardedTempResult {
_temp: dir,
result: Some(net)
}
net
}
}

View File

@ -22,9 +22,7 @@ use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
@ -35,14 +33,14 @@ use std::env;
/// that the states of any block the node has ever processed will be accessible.
pub struct ArchiveDB {
overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
latest_era: Option<u64>,
column: Option<u32>,
}
impl ArchiveDB {
/// Create a new instance from file
pub fn new(backing: Arc<Database>, col: Option<u32>) -> ArchiveDB {
/// Create a new instance from a key-value db.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> ArchiveDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
ArchiveDB {
overlay: MemoryDB::new(),
@ -55,9 +53,7 @@ impl ArchiveDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
fn new_temp() -> ArchiveDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
@ -186,7 +182,7 @@ impl JournalDB for ArchiveDB {
fn is_pruned(&self) -> bool { false }
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}

View File

@ -22,9 +22,7 @@ use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
#[derive(Clone, PartialEq, Eq)]
struct RefInfo {
@ -112,7 +110,7 @@ enum RemoveFrom {
/// TODO: `store_reclaim_period`
pub struct EarlyMergeDB {
overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
refs: Option<Arc<RwLock<HashMap<H256, RefInfo>>>>,
latest_era: Option<u64>,
column: Option<u32>,
@ -122,8 +120,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl EarlyMergeDB {
/// Create a new instance from file
pub fn new(backing: Arc<Database>, col: Option<u32>) -> EarlyMergeDB {
let (latest_era, refs) = EarlyMergeDB::read_refs(&backing, col);
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> EarlyMergeDB {
let (latest_era, refs) = EarlyMergeDB::read_refs(&*backing, col);
let refs = Some(Arc::new(RwLock::new(refs)));
EarlyMergeDB {
overlay: MemoryDB::new(),
@ -137,9 +135,7 @@ impl EarlyMergeDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
fn new_temp() -> EarlyMergeDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
@ -152,11 +148,11 @@ impl EarlyMergeDB {
// The next three are valid only as long as there is an insert operation of `key` in the journal.
fn set_already_in(batch: &mut DBTransaction, col: Option<u32>, key: &H256) { batch.put(col, &Self::morph_key(key, 0), &[1u8]); }
fn reset_already_in(batch: &mut DBTransaction, col: Option<u32>, key: &H256) { batch.delete(col, &Self::morph_key(key, 0)); }
fn is_already_in(backing: &Database, col: Option<u32>, key: &H256) -> bool {
fn is_already_in(backing: &KeyValueDB, col: Option<u32>, key: &H256) -> bool {
backing.get(col, &Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some()
}
fn insert_keys(inserts: &[(H256, DBValue)], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
for &(ref h, ref d) in inserts {
if let Some(c) = refs.get_mut(h) {
// already counting. increment.
@ -189,7 +185,7 @@ impl EarlyMergeDB {
}
}
fn replay_keys(inserts: &[H256], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs);
for h in inserts {
if let Some(c) = refs.get_mut(h) {
@ -262,7 +258,7 @@ impl EarlyMergeDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing, self.column);
let (latest_era, reconstructed) = Self::read_refs(&*self.backing, self.column);
let refs = self.refs.as_ref().unwrap().write();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
@ -278,7 +274,7 @@ impl EarlyMergeDB {
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?")
}
fn read_refs(db: &Database, col: Option<u32>) -> (Option<u64>, HashMap<H256, RefInfo>) {
fn read_refs(db: &KeyValueDB, col: Option<u32>) -> (Option<u64>, HashMap<H256, RefInfo>) {
let mut refs = HashMap::new();
let mut latest_era = None;
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
@ -361,7 +357,7 @@ impl JournalDB for EarlyMergeDB {
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}
@ -432,7 +428,7 @@ impl JournalDB for EarlyMergeDB {
r.begin_list(inserts.len());
inserts.iter().foreach(|&(k, _)| {r.append(&k);});
r.append(&removes);
Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace);
Self::insert_keys(&inserts, &*self.backing, self.column, &mut refs, batch, trace);
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
@ -558,7 +554,7 @@ mod tests {
use super::*;
use super::super::traits::JournalDB;
use log::init_log;
use kvdb::{Database, DatabaseConfig};
use kvdb::{DatabaseConfig};
#[test]
fn insert_same_in_fork() {
@ -817,7 +813,7 @@ mod tests {
fn new_db(path: &Path) -> EarlyMergeDB {
let config = DatabaseConfig::with_columns(Some(1));
let backing = Arc::new(Database::open(&config, path.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::Database::open(&config, path.to_str().unwrap()).unwrap());
EarlyMergeDB::new(backing, Some(0))
}

View File

@ -17,7 +17,6 @@
//! `JournalDB` interface and implementation.
use common::*;
use kvdb::Database;
/// Export the journaldb module.
pub mod traits;
@ -115,8 +114,8 @@ impl fmt::Display for Algorithm {
}
}
/// Create a new `JournalDB` trait object.
pub fn new(backing: Arc<Database>, algorithm: Algorithm, col: Option<u32>) -> Box<JournalDB> {
/// Create a new `JournalDB` trait object over a generic key-value database.
pub fn new(backing: Arc<::kvdb::KeyValueDB>, algorithm: Algorithm, col: Option<u32>) -> Box<JournalDB> {
match algorithm {
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(backing, col)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(backing, col)),
@ -184,4 +183,4 @@ mod tests {
assert_eq!(overlayrecent, 1);
assert_eq!(refcounted, 1);
}
}
}

View File

@ -21,9 +21,7 @@ use rlp::*;
use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
use super::JournalDB;
/// Implementation of the `JournalDB` trait for a disk-backed database with a memory overlay
@ -59,7 +57,7 @@ use super::JournalDB;
pub struct OverlayRecentDB {
transaction_overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
journal_overlay: Arc<RwLock<JournalOverlay>>,
column: Option<u32>,
}
@ -102,8 +100,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl OverlayRecentDB {
/// Create a new instance.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> OverlayRecentDB {
let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&backing, col)));
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayRecentDB {
let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&*backing, col)));
OverlayRecentDB {
transaction_overlay: MemoryDB::new(),
backing: backing,
@ -115,15 +113,13 @@ impl OverlayRecentDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
pub fn new_temp() -> OverlayRecentDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing, self.column);
let reconstructed = Self::read_overlay(&*self.backing, self.column);
let journal_overlay = self.journal_overlay.read();
journal_overlay.backing_overlay == reconstructed.backing_overlay &&
journal_overlay.pending_overlay == reconstructed.pending_overlay &&
@ -136,7 +132,7 @@ impl OverlayRecentDB {
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?")
}
fn read_overlay(db: &Database, col: Option<u32>) -> JournalOverlay {
fn read_overlay(db: &KeyValueDB, col: Option<u32>) -> JournalOverlay {
let mut journal = HashMap::new();
let mut overlay = MemoryDB::new();
let mut count = 0;
@ -235,7 +231,7 @@ impl JournalDB for OverlayRecentDB {
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}

View File

@ -23,9 +23,7 @@ use overlaydb::OverlayDB;
use memorydb::MemoryDB;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
@ -49,7 +47,7 @@ use std::env;
// TODO: store last_era, reclaim_period.
pub struct RefCountedDB {
forward: OverlayDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
latest_era: Option<u64>,
inserts: Vec<H256>,
removes: Vec<H256>,
@ -60,7 +58,7 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB {
/// Create a new instance given a `backing` database.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> RefCountedDB {
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> RefCountedDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
RefCountedDB {
@ -76,9 +74,7 @@ impl RefCountedDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
fn new_temp() -> RefCountedDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
}
@ -112,7 +108,7 @@ impl JournalDB for RefCountedDB {
self.latest_era.is_none()
}
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}

View File

@ -18,7 +18,7 @@
use common::*;
use hashdb::*;
use kvdb::{Database, DBTransaction};
use kvdb::{self, DBTransaction};
/// A `HashDB` which can manage a short-term journal potentially containing many forks of mutually
/// exclusive actions.
@ -66,7 +66,7 @@ pub trait JournalDB: HashDB {
fn is_pruned(&self) -> bool { true }
/// Get backing database.
fn backing(&self) -> &Arc<Database>;
fn backing(&self) -> &Arc<kvdb::KeyValueDB>;
/// Clear internal strucutres. This should called after changes have been written
/// to the backing strage

View File

@ -17,10 +17,11 @@
//! Key-Value store abstraction with `RocksDB` backend.
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::PathBuf;
use common::*;
use elastic_array::*;
use std::default::Default;
use std::path::PathBuf;
use hashdb::DBValue;
use rlp::{UntrustedRlp, RlpType, View, Compressible};
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
@ -36,10 +37,12 @@ const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2;
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
#[derive(Default, Clone, PartialEq)]
pub struct DBTransaction {
ops: Vec<DBOp>,
}
#[derive(Clone, PartialEq)]
enum DBOp {
Insert {
col: Option<u32>,
@ -59,9 +62,14 @@ enum DBOp {
impl DBTransaction {
/// Create new transaction.
pub fn new(_db: &Database) -> DBTransaction {
pub fn new() -> DBTransaction {
DBTransaction::with_capacity(256)
}
/// Create new transaction with capacity.
pub fn with_capacity(cap: usize) -> DBTransaction {
DBTransaction {
ops: Vec::with_capacity(256),
ops: Vec::with_capacity(cap)
}
}
@ -116,6 +124,138 @@ enum KeyState {
Delete,
}
/// Generic key-value database.
///
/// This makes a distinction between "buffered" and "flushed" values. Values which have been
/// written can always be read, but may be present in an in-memory buffer. Values which have
/// been flushed have been moved to backing storage, like a RocksDB instance. There are certain
/// operations which are only guaranteed to operate on flushed data and not buffered,
/// although implementations may differ in this regard.
///
/// The contents of an interior buffer may be explicitly flushed using the `flush` method.
///
/// The `KeyValueDB` also deals in "column families", which can be thought of as distinct
/// stores within a database. Keys written in one column family will not be accessible from
/// any other. The number of column families must be specified at initialization, with a
/// differing interface for each database. The `None` argument in place of a column index
/// is always supported.
///
/// The API laid out here, along with the `Sync` bound implies interior synchronization for
/// implementation.
pub trait KeyValueDB: Sync + Send {
/// Helper to create a new transaction.
fn transaction(&self) -> DBTransaction { DBTransaction::new() }
/// Get a value by key.
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String>;
/// Get a value by partial key. Only works for flushed data.
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>>;
/// Write a transaction of changes to the buffer.
fn write_buffered(&self, transaction: DBTransaction);
/// Write a transaction of changes to the backing store.
fn write(&self, transaction: DBTransaction) -> Result<(), String> {
self.write_buffered(transaction);
self.flush()
}
/// Flush all buffered data.
fn flush(&self) -> Result<(), String>;
/// Iterate over flushed data for a given column.
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
/// Attempt to replace this database with a new one located at the given path.
fn restore(&self, new_db: &str) -> Result<(), UtilError>;
}
/// A key-value database fulfilling the `KeyValueDB` trait, living in memory.
/// This is generally intended for tests and is not particularly optimized.
pub struct InMemory {
columns: RwLock<HashMap<Option<u32>, BTreeMap<Vec<u8>, DBValue>>>,
}
/// Create an in-memory database with the given number of columns.
/// Columns will be indexable by 0..`num_cols`
pub fn in_memory(num_cols: u32) -> InMemory {
let mut cols = HashMap::new();
cols.insert(None, BTreeMap::new());
for idx in 0..num_cols {
cols.insert(Some(idx), BTreeMap::new());
}
InMemory {
columns: RwLock::new(cols)
}
}
impl KeyValueDB for InMemory {
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
let columns = self.columns.read();
match columns.get(&col) {
None => Err(format!("No such column family: {:?}", col)),
Some(map) => Ok(map.get(key).cloned()),
}
}
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
let columns = self.columns.read();
match columns.get(&col) {
None => None,
Some(map) =>
map.iter()
.find(|&(ref k ,_)| k.starts_with(prefix))
.map(|(_, v)| (&**v).to_vec().into_boxed_slice())
}
}
fn write_buffered(&self, transaction: DBTransaction) {
let mut columns = self.columns.write();
let ops = transaction.ops;
for op in ops {
match op {
DBOp::Insert { col, key, value } => {
if let Some(mut col) = columns.get_mut(&col) {
col.insert(key.to_vec(), value);
}
},
DBOp::InsertCompressed { col, key, value } => {
if let Some(mut col) = columns.get_mut(&col) {
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
let mut value = DBValue::new();
value.append_slice(&compressed);
col.insert(key.to_vec(), value);
}
},
DBOp::Delete { col, key } => {
if let Some(mut col) = columns.get_mut(&col) {
col.remove(&*key);
}
},
}
}
}
fn flush(&self) -> Result<(), String> { Ok(()) }
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
match self.columns.read().get(&col) {
Some(map) => Box::new( // TODO: worth optimizing at all?
map.clone()
.into_iter()
.map(|(k, v)| (k.into_boxed_slice(), v.to_vec().into_boxed_slice()))
),
None => Box::new(None.into_iter())
}
}
fn restore(&self, _new_db: &str) -> Result<(), UtilError> {
Err(UtilError::SimpleString("Attempted to restore in-memory database".into()))
}
}
/// Compaction profile for the database settings
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct CompactionProfile {
@ -248,12 +388,16 @@ impl Default for DatabaseConfig {
}
}
/// Database iterator for flushed data only
pub struct DatabaseIterator {
/// Database iterator (for flushed data only)
// The compromise of holding only a virtual borrow vs. holding a lock on the
// inner DB (to prevent closing via restoration) may be re-evaluated in the future.
//
pub struct DatabaseIterator<'a> {
iter: DBIterator,
_marker: PhantomData<&'a Database>,
}
impl<'a> Iterator for DatabaseIterator {
impl<'a> Iterator for DatabaseIterator<'a> {
type Item = (Box<[u8]>, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> {
@ -393,9 +537,9 @@ impl Database {
})
}
/// Creates new transaction for this database.
/// Helper to create new transaction for this database.
pub fn transaction(&self) -> DBTransaction {
DBTransaction::new(self)
DBTransaction::new()
}
@ -562,9 +706,16 @@ impl Database {
//TODO: iterate over overlay
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
col.map_or_else(|| DatabaseIterator { iter: db.iterator_opt(IteratorMode::Start, &self.read_opts) },
|c| DatabaseIterator { iter: db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts)
.expect("iterator params are valid; qed") })
let iter = col.map_or_else(
|| db.iterator_opt(IteratorMode::Start, &self.read_opts),
|c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts)
.expect("iterator params are valid; qed")
);
DatabaseIterator {
iter: iter,
_marker: PhantomData,
}
},
None => panic!("Not supported yet") //TODO: return an empty iterator or change return type
}
@ -619,6 +770,39 @@ impl Database {
}
}
// duplicate declaration of methods here to avoid trait import in certain existing cases
// at time of addition.
impl KeyValueDB for Database {
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
Database::get(self, col, key)
}
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
Database::get_by_prefix(self, col, prefix)
}
fn write_buffered(&self, transaction: DBTransaction) {
Database::write_buffered(self, transaction)
}
fn write(&self, transaction: DBTransaction) -> Result<(), String> {
Database::write(self, transaction)
}
fn flush(&self) -> Result<(), String> {
Database::flush(self)
}
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
let unboxed = Database::iter(self, col);
Box::new(unboxed)
}
fn restore(&self, new_db: &str) -> Result<(), UtilError> {
Database::restore(self, new_db)
}
}
impl Drop for Database {
fn drop(&mut self) {
// write all buffered changes if we can.

View File

@ -74,7 +74,7 @@ impl Batch {
pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> {
if self.inner.is_empty() { return Ok(()) }
let mut transaction = DBTransaction::new(dest);
let mut transaction = DBTransaction::new();
for keypair in &self.inner {
transaction.put(self.column, &keypair.0, &keypair.1);

View File

@ -23,7 +23,7 @@ use hashdb::*;
use memorydb::*;
use std::sync::*;
use std::collections::HashMap;
use kvdb::{Database, DBTransaction};
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay.
///
@ -36,22 +36,21 @@ use kvdb::{Database, DBTransaction};
#[derive(Clone)]
pub struct OverlayDB {
overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
column: Option<u32>,
}
impl OverlayDB {
/// Create a new instance of OverlayDB given a `backing` database.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> OverlayDB {
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayDB {
OverlayDB{ overlay: MemoryDB::new(), backing: backing, column: col }
}
/// Create a new instance of OverlayDB with an anonymous temporary database.
#[cfg(test)]
pub fn new_temp() -> OverlayDB {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
Self::new(Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()), None)
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
/// Commit all operations in a single batch.
@ -295,23 +294,3 @@ fn overlaydb_complex() {
trie.commit().unwrap(); //
assert_eq!(trie.get(&hfoo), None);
}
#[test]
fn playpen() {
use std::fs;
{
let db = Database::open_default("/tmp/test").unwrap();
let mut batch = db.transaction();
batch.put(None, b"test", b"test2");
db.write(batch).unwrap();
match db.get(None, b"test") {
Ok(Some(value)) => println!("Got value {:?}", &*value),
Ok(None) => println!("No value for that key"),
Err(..) => println!("Gah"),
}
let mut batch = db.transaction();
batch.delete(None, b"test");
db.write(batch).unwrap();
}
fs::remove_dir_all("/tmp/test").unwrap();
}