From 62b340f2b96279f7b6808620d4c15fd29c6b7624 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 20 Feb 2017 17:21:55 +0100 Subject: [PATCH] Save pending local transactions in the database (#4566) * Create new column family for local node info * remove DBTransaction::new reliance on DB * KeyValueDB trait * InMemory KeyValueDB implementation * journaldb generic over KVDB * make most of `ethcore` generic over KVDB * fix json tests compilation * get all tests compiling * implement local store (just for transactions) * finish local store API, test * put everything into place * better test for skipping bad transactions * fix warning * update local store every 15 minutes * remove superfluous `{}`s --- Cargo.lock | 16 ++ Cargo.toml | 1 + ethcore/src/blockchain/blockchain.rs | 69 ++---- ethcore/src/client/client.rs | 21 +- ethcore/src/db.rs | 10 +- ethcore/src/engines/tendermint/mod.rs | 3 +- ethcore/src/json_tests/chain.rs | 7 +- ethcore/src/migrations/mod.rs | 3 + ethcore/src/migrations/v10.rs | 2 +- ethcore/src/migrations/v11.rs | 46 ++++ ethcore/src/miner/transaction_queue.rs | 2 +- ethcore/src/service.rs | 18 +- ethcore/src/snapshot/tests/service.rs | 8 +- ethcore/src/state_db.rs | 9 +- ethcore/src/tests/client.rs | 35 ++- ethcore/src/tests/helpers.rs | 8 +- ethcore/src/trace/db.rs | 41 ++-- local-store/Cargo.toml | 16 ++ local-store/src/lib.rs | 315 +++++++++++++++++++++++++ parity/main.rs | 1 + parity/migration.rs | 3 +- parity/run.rs | 42 ++++ rpc/src/v1/tests/eth.rs | 5 +- sync/src/tests/consensus.rs | 2 - sync/src/tests/helpers.rs | 20 +- util/src/journaldb/archivedb.rs | 16 +- util/src/journaldb/earlymergedb.rs | 32 ++- util/src/journaldb/mod.rs | 7 +- util/src/journaldb/overlayrecentdb.rs | 20 +- util/src/journaldb/refcounteddb.rs | 14 +- util/src/journaldb/traits.rs | 4 +- util/src/kvdb.rs | 208 +++++++++++++++- util/src/migration/mod.rs | 2 +- util/src/overlaydb.rs | 31 +-- 34 files changed, 801 insertions(+), 236 deletions(-) create mode 100644 ethcore/src/migrations/v11.rs create mode 100644 local-store/Cargo.toml create mode 100644 local-store/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 36af708ea..349284e6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,6 +35,7 @@ dependencies = [ "number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "parity-hash-fetch 1.6.0", "parity-ipfs-api 1.6.0", + "parity-local-store 0.1.0", "parity-reactor 0.1.0", "parity-rpc-client 1.4.0", "parity-updater 1.6.0", @@ -1641,6 +1642,21 @@ dependencies = [ "rlp 0.1.0", ] +[[package]] +name = "parity-local-store" +version = "0.1.0" +dependencies = [ + "ethcore 1.6.0", + "ethcore-io 1.6.0", + "ethcore-util 1.6.0", + "ethkey 0.2.0", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rlp 0.1.0", + "serde 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "parity-reactor" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 3b6bd7e97..28af3c1f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ parity-hash-fetch = { path = "hash-fetch" } parity-ipfs-api = { path = "ipfs" } parity-updater = { path = "updater" } parity-reactor = { path = "util/reactor" } +parity-local-store = { path = "local-store" } ethcore-dapps = { path = "dapps", optional = true } clippy = { version = "0.0.103", optional = true} ethcore-secretstore = { path = "secret_store", optional = true } diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index c4031f954..8986dc6b8 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -191,7 +191,7 @@ pub struct BlockChain { blocks_blooms: RwLock>, block_receipts: RwLock>, - db: Arc, + db: Arc, cache_man: Mutex>, @@ -421,7 +421,7 @@ impl<'a> Iterator for AncestryIter<'a> { impl BlockChain { /// Create new instance of blockchain from given Genesis. - pub fn new(config: Config, genesis: &[u8], db: Arc) -> BlockChain { + pub fn new(config: Config, genesis: &[u8], db: Arc) -> BlockChain { // 400 is the avarage size of the key let cache_man = CacheManager::new(config.pref_cache_size, config.max_cache_size, 400); @@ -467,7 +467,7 @@ impl BlockChain { children: vec![] }; - let mut batch = DBTransaction::new(&db); + let mut batch = DBTransaction::new(); batch.put(db::COL_HEADERS, &hash, block.header_rlp().as_raw()); batch.put(db::COL_BODIES, &hash, &Self::block_to_body(genesis)); @@ -1314,7 +1314,7 @@ impl BlockChain { } #[cfg(test)] - pub fn db(&self) -> &Arc { + pub fn db(&self) -> &Arc { &self.db } } @@ -1324,13 +1324,12 @@ mod tests { #![cfg_attr(feature="dev", allow(similar_names))] use std::sync::Arc; use rustc_serialize::hex::FromHex; - use util::{Database, DatabaseConfig}; + use util::kvdb::KeyValueDB; use util::hash::*; use util::sha3::Hashable; use receipt::Receipt; use blockchain::{BlockProvider, BlockChain, Config, ImportRoute}; use tests::helpers::*; - use devtools::*; use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer}; use blockchain::extras::TransactionAddress; use views::BlockView; @@ -1339,11 +1338,11 @@ mod tests { use ethkey::Secret; use header::BlockNumber; - fn new_db(path: &str) -> Arc { - Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), path).unwrap()) + fn new_db() -> Arc { + Arc::new(::util::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))) } - fn new_chain(genesis: &[u8], db: Arc) -> BlockChain { + fn new_chain(genesis: &[u8], db: Arc) -> BlockChain { BlockChain::new(Config::default(), genesis, db) } @@ -1355,13 +1354,12 @@ mod tests { let genesis = canon_chain.generate(&mut finalizer).unwrap(); let first = canon_chain.generate(&mut finalizer).unwrap(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); assert_eq!(bc.best_block_number(), 0); // when - let mut batch =db.transaction(); + let mut batch = db.transaction(); bc.insert_block(&mut batch, &first, vec![]); assert_eq!(bc.best_block_number(), 0); bc.commit(); @@ -1381,8 +1379,7 @@ mod tests { let genesis_hash = BlockView::new(&genesis).header_view().sha3(); let first_hash = BlockView::new(&first).header_view().sha3(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); assert_eq!(bc.genesis_hash(), genesis_hash.clone()); @@ -1391,7 +1388,7 @@ mod tests { assert_eq!(bc.block_hash(1), None); assert_eq!(bc.block_details(&genesis_hash).unwrap().children, vec![]); - let mut batch =db.transaction(); + let mut batch = db.transaction(); bc.insert_block(&mut batch, &first, vec![]); db.write(batch).unwrap(); bc.commit(); @@ -1412,8 +1409,7 @@ mod tests { let genesis = canon_chain.generate(&mut finalizer).unwrap(); let genesis_hash = BlockView::new(&genesis).header_view().sha3(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut block_hashes = vec![genesis_hash.clone()]; @@ -1448,8 +1444,7 @@ mod tests { let b5b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); let b5a = canon_chain.generate(&mut finalizer).unwrap(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut batch =db.transaction(); @@ -1514,8 +1509,7 @@ mod tests { let t1_hash = t1.hash(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut batch = db.transaction(); @@ -1602,8 +1596,7 @@ mod tests { let t2_hash = t2.hash(); let t3_hash = t3.hash(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut batch = db.transaction(); @@ -1664,8 +1657,7 @@ mod tests { // b3a is a part of canon chain, whereas b3b is part of sidechain let best_block_hash = b3a_hash.clone(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut batch = db.transaction(); @@ -1778,10 +1770,9 @@ mod tests { let first = canon_chain.generate(&mut finalizer).unwrap(); let genesis_hash = BlockView::new(&genesis).header_view().sha3(); let first_hash = BlockView::new(&first).header_view().sha3(); + let db = new_db(); - let temp = RandomTempPath::new(); { - let db = new_db(temp.as_str()); let bc = new_chain(&genesis, db.clone()); assert_eq!(bc.best_block_hash(), genesis_hash); let mut batch =db.transaction(); @@ -1792,7 +1783,6 @@ mod tests { } { - let db = new_db(temp.as_str()); let bc = new_chain(&genesis, db.clone()); assert_eq!(bc.best_block_hash(), first_hash); @@ -1846,8 +1836,7 @@ mod tests { let b1 = "f904a8f901faa0ce1f26f798dd03c8782d63b3e42e79a64eaea5694ea686ac5d7ce3df5171d1aea01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a0a65c2364cd0f1542d761823dc0109c6b072f14c20459598c5455c274601438f4a070616ebd7ad2ed6fb7860cf7e9df00163842351c38a87cac2c1cb193895035a2a05c5b4fc43c2d45787f54e1ae7d27afdb4ad16dfc567c5692070d5c4556e0b1d7b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000830200000183023ec683021536845685109780a029f07836e4e59229b3a065913afc27702642c683bba689910b2b2fd45db310d3888957e6d004a31802f902a7f85f800a8255f094aaaf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ca0575da4e21b66fa764be5f74da9389e67693d066fb0d1312e19e17e501da00ecda06baf5a5327595f6619dfc2fcb3f2e6fb410b5810af3cb52d0e7508038e91a188f85f010a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba04fa966bf34b93abc1bcd665554b7f316b50f928477b50be0f3285ead29d18c5ba017bba0eeec1625ab433746955e125d46d80b7fdc97386c51266f842d8e02192ef85f020a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ca004377418ae981cc32b1312b4a427a1d69a821b28db8584f5f2bd8c6d42458adaa053a1dba1af177fac92f3b6af0a9fa46a22adf56e686c93794b6a012bf254abf5f85f030a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ca04fe13febd28a05f4fcb2f451d7ddc2dda56486d9f8c79a62b0ba4da775122615a0651b2382dd402df9ebc27f8cb4b2e0f3cea68dda2dca0ee9603608f0b6f51668f85f040a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba078e6a0ba086a08f8450e208a399bb2f2d2a0d984acd2517c7c7df66ccfab567da013254002cd45a97fac049ae00afbc43ed0d9961d0c56a3b2382c80ce41c198ddf85f050a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba0a7174d8f43ea71c8e3ca9477691add8d80ac8e0ed89d8d8b572041eef81f4a54a0534ea2e28ec4da3b5b944b18c51ec84a5cf35f5b3343c5fb86521fd2d388f506f85f060a82520894bbbf5374fce5edbc8e2a8697c15331677e6ebf0b0a801ba034bd04065833536a10c77ee2a43a5371bc6d34837088b861dd9d4b7f44074b59a078807715786a13876d3455716a6b9cb2186b7a4887a5c31160fc877454958616c0".from_hex().unwrap(); let b1_hash: H256 = "f53f268d23a71e85c7d6d83a9504298712b84c1a2ba220441c86eeda0bf0b6e3".into(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut batch =db.transaction(); bc.insert_block(&mut batch, &b1, vec![]); @@ -1861,7 +1850,7 @@ mod tests { } } - fn insert_block(db: &Arc, bc: &BlockChain, bytes: &[u8], receipts: Vec) -> ImportRoute { + fn insert_block(db: &Arc, bc: &BlockChain, bytes: &[u8], receipts: Vec) -> ImportRoute { let mut batch = db.transaction(); let res = bc.insert_block(&mut batch, bytes, receipts); db.write(batch).unwrap(); @@ -1906,8 +1895,7 @@ mod tests { let b1 = canon_chain.with_transaction(t1).with_transaction(t2).generate(&mut finalizer).unwrap(); let b2 = canon_chain.with_transaction(t3).generate(&mut finalizer).unwrap(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); insert_block(&db, &bc, &b1, vec![Receipt { state_root: Some(H256::default()), @@ -2015,8 +2003,7 @@ mod tests { let b1a = canon_chain.with_bloom(bloom_ba.clone()).generate(&mut finalizer).unwrap(); let b2a = canon_chain.with_bloom(bloom_ba.clone()).generate(&mut finalizer).unwrap(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let blocks_b1 = bc.blocks_with_bloom(&bloom_b1, 0, 5); @@ -2070,14 +2057,12 @@ mod tests { let mut finalizer = BlockFinalizer::default(); let genesis = canon_chain.generate(&mut finalizer).unwrap(); - let temp = RandomTempPath::new(); - + let db = new_db(); { - let db = new_db(temp.as_str()); let bc = new_chain(&genesis, db.clone()); let uncle = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); - let mut batch =db.transaction(); + let mut batch = db.transaction(); // create a longer fork for _ in 0..5 { let canon_block = canon_chain.generate(&mut finalizer).unwrap(); @@ -2092,8 +2077,7 @@ mod tests { } // re-loading the blockchain should load the correct best block. - let db = new_db(temp.as_str()); - let bc = new_chain(&genesis, db.clone()); + let bc = new_chain(&genesis, db); assert_eq!(bc.best_block_number(), 5); } @@ -2108,8 +2092,7 @@ mod tests { let first_hash = BlockView::new(&first).header_view().sha3(); let second_hash = BlockView::new(&second).header_view().sha3(); - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let bc = new_chain(&genesis, db.clone()); let mut batch =db.transaction(); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index d85685f3f..8545db76b 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,7 +17,6 @@ use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; use std::str::FromStr; use std::sync::{Arc, Weak}; -use std::path::{Path}; use std::fmt; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::time::{Instant}; @@ -135,7 +134,7 @@ pub struct Client { engine: Arc, config: ClientConfig, pruning: journaldb::Algorithm, - db: RwLock>, + db: RwLock>, state_db: Mutex, block_queue: BlockQueue, report: RwLock, @@ -157,18 +156,16 @@ pub struct Client { } impl Client { - /// Create a new client with given spec and DB path and custom verifier. + /// Create a new client with given parameters. + /// The database is assumed to have been initialized with the correct columns. pub fn new( config: ClientConfig, spec: &Spec, - path: &Path, + db: Arc, miner: Arc, message_channel: IoChannel, - db_config: &DatabaseConfig, ) -> Result, ClientError> { - let path = path.to_path_buf(); - let db = Arc::new(Database::open(&db_config, &path.to_str().expect("DB path could not be converted to string.")).map_err(ClientError::Database)?); let trie_spec = match config.fat_db { true => TrieSpec::Fat, false => TrieSpec::Secure, @@ -186,7 +183,7 @@ impl Client { if state_db.journal_db().is_empty() { // Sets the correct state root. state_db = spec.ensure_db_good(state_db, &factories)?; - let mut batch = DBTransaction::new(&db); + let mut batch = DBTransaction::new(); state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash())?; db.write(batch).map_err(ClientError::Database)?; } @@ -530,7 +527,7 @@ impl Client { // Commit results let receipts = ::rlp::decode(&receipts_bytes); - let mut batch = DBTransaction::new(&self.db.read()); + let mut batch = DBTransaction::new(); chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true); // Final commit to the DB self.db.read().write_buffered(batch); @@ -554,7 +551,7 @@ impl Client { //let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new)); - let mut batch = DBTransaction::new(&self.db.read()); + let mut batch = DBTransaction::new(); // CHECK! I *think* this is fine, even if the state_root is equal to another // already-imported block of the same number. // TODO: Prove it with a test. @@ -603,7 +600,7 @@ impl Client { trace!(target: "client", "Pruning state for ancient era {}", era); match chain.block_hash(era) { Some(ancient_hash) => { - let mut batch = DBTransaction::new(&self.db.read()); + let mut batch = DBTransaction::new(); state_db.mark_canonical(&mut batch, era, &ancient_hash)?; self.db.read().write_buffered(batch); state_db.journal_db().flush(); @@ -1691,7 +1688,7 @@ mod tests { let go_thread = go.clone(); let another_client = client.reference().clone(); thread::spawn(move || { - let mut batch = DBTransaction::new(&*another_client.chain.read().db().clone()); + let mut batch = DBTransaction::new(); another_client.chain.read().insert_block(&mut batch, &new_block, Vec::new()); go_thread.store(true, Ordering::SeqCst); }); diff --git a/ethcore/src/db.rs b/ethcore/src/db.rs index 8f55f8025..4e8da714d 100644 --- a/ethcore/src/db.rs +++ b/ethcore/src/db.rs @@ -19,7 +19,7 @@ use std::ops::Deref; use std::hash::Hash; use std::collections::HashMap; -use util::{DBTransaction, Database, RwLock}; +use util::{DBTransaction, KeyValueDB, RwLock}; use rlp; @@ -34,10 +34,12 @@ pub const COL_BODIES: Option = Some(2); pub const COL_EXTRA: Option = Some(3); /// Column for Traces pub const COL_TRACE: Option = Some(4); -/// Column for Traces +/// Column for the empty accounts bloom filter. pub const COL_ACCOUNT_BLOOM: Option = Some(5); +/// Column for general information from the local node which can persist. +pub const COL_NODE_INFO: Option = Some(6); /// Number of columns in DB -pub const NUM_COLUMNS: Option = Some(6); +pub const NUM_COLUMNS: Option = Some(7); /// Modes for updating caches. #[derive(Clone, Copy)] @@ -212,7 +214,7 @@ impl Writable for DBTransaction { } } -impl Readable for Database { +impl Readable for KVDB { fn read(&self, col: Option, key: &Key) -> Option where T: rlp::Decodable, R: Deref { let result = self.get(col, &key.key()); diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index bc4786526..47117f83a 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -649,8 +649,7 @@ mod tests { use account_provider::AccountProvider; use spec::Spec; use engines::{Engine, EngineError, Seal}; - use super::{Step, View, Height, message_info_rlp, message_full_rlp}; - use super::message::VoteStep; + use super::*; /// Accounts inserted with "0" and "1" are validators. First proposer is "0". fn setup() -> (Spec, Arc) { diff --git a/ethcore/src/json_tests/chain.rs b/ethcore/src/json_tests/chain.rs index f4b888060..221fa1aba 100644 --- a/ethcore/src/json_tests/chain.rs +++ b/ethcore/src/json_tests/chain.rs @@ -19,7 +19,6 @@ use client::{BlockChainClient, Client, ClientConfig}; use block::Block; use ethereum; use tests::helpers::*; -use devtools::*; use spec::Genesis; use ethjson; use miner::Miner; @@ -58,16 +57,14 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec { spec }; - let temp = RandomTempPath::new(); { - let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let db = Arc::new(::util::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))); let client = Client::new( ClientConfig::default(), &spec, - temp.as_path(), + db, Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected(), - &db_config, ).unwrap(); for b in &blockchain.blocks_rlp() { if Block::is_good(&b) { diff --git a/ethcore/src/migrations/mod.rs b/ethcore/src/migrations/mod.rs index 7b71cc769..b9a00a15e 100644 --- a/ethcore/src/migrations/mod.rs +++ b/ethcore/src/migrations/mod.rs @@ -26,3 +26,6 @@ pub use self::v9::Extract; mod v10; pub use self::v10::ToV10; + +mod v11; +pub use self::v11::ToV11; diff --git a/ethcore/src/migrations/v10.rs b/ethcore/src/migrations/v10.rs index 1da6673bb..a15265778 100644 --- a/ethcore/src/migrations/v10.rs +++ b/ethcore/src/migrations/v10.rs @@ -70,7 +70,7 @@ pub fn generate_bloom(source: Arc, dest: &mut Database) -> Result<(), trace!(target: "migration", "Generated {} bloom updates", bloom_journal.entries.len()); - let mut batch = DBTransaction::new(dest); + let mut batch = DBTransaction::new(); StateDB::commit_bloom(&mut batch, bloom_journal).map_err(|_| Error::Custom("Failed to commit bloom".to_owned()))?; dest.write(batch)?; diff --git a/ethcore/src/migrations/v11.rs b/ethcore/src/migrations/v11.rs new file mode 100644 index 000000000..8795cf364 --- /dev/null +++ b/ethcore/src/migrations/v11.rs @@ -0,0 +1,46 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + + +//! Adds a seventh column for node information. + +use util::kvdb::Database; +use util::migration::{Batch, Config, Error, Migration, Progress}; +use std::sync::Arc; + +/// Copies over data for all existing columns. +#[derive(Default)] +pub struct ToV11(Progress); + + +impl Migration for ToV11 { + fn pre_columns(&self) -> Option { Some(6) } + fn columns(&self) -> Option { Some(7) } + + fn version(&self) -> u32 { 11 } + + fn migrate(&mut self, source: Arc, config: &Config, dest: &mut Database, col: Option) -> Result<(), Error> { + // just copy everything over. + let mut batch = Batch::new(config, col); + + for (key, value) in source.iter(col) { + self.0.tick(); + batch.insert(key.to_vec(), value.to_vec(), dest)? + } + + batch.commit(dest) + } +} diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index d937c15fc..d686a3ff5 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -1109,7 +1109,7 @@ impl TransactionQueue { r } - /// Return all ready transactions. + /// Return all future transactions. pub fn future_transactions(&self) -> Vec { self.future.by_priority .iter() diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index c45eae411..b48fb9445 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -57,6 +57,7 @@ pub struct ClientService { client: Arc, snapshot: Arc, panic_handler: Arc, + database: Arc, _stop_guard: ::devtools::StopGuard, } @@ -88,8 +89,14 @@ impl ClientService { db_config.compaction = config.db_compaction.compaction_profile(client_path); db_config.wal = config.db_wal; + let db = Arc::new(Database::open( + &db_config, + &client_path.to_str().expect("DB path could not be converted to string.") + ).map_err(::client::Error::Database)?); + + let pruning = config.pruning; - let client = Client::new(config, &spec, client_path, miner, io_service.channel(), &db_config)?; + let client = Client::new(config, &spec, db.clone(), miner, io_service.channel())?; let snapshot_params = SnapServiceParams { engine: spec.engine.clone(), @@ -119,15 +126,11 @@ impl ClientService { client: client, snapshot: snapshot, panic_handler: panic_handler, + database: db, _stop_guard: stop_guard, }) } - /// Add a node to network - pub fn add_node(&mut self, _enode: &str) { - unimplemented!(); - } - /// Get general IO interface pub fn register_io_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { self.io_service.register_handler(handler) @@ -152,6 +155,9 @@ impl ClientService { pub fn add_notify(&self, notify: Arc) { self.client.add_notify(notify); } + + /// Get a handle to the database. + pub fn db(&self) -> Arc { self.database.clone() } } impl MayPanic for ClientService { diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index 1c46985af..555ee665b 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -27,7 +27,7 @@ use tests::helpers::generate_dummy_client_with_spec_and_data; use devtools::RandomTempPath; use io::IoChannel; -use util::kvdb::DatabaseConfig; +use util::kvdb::{Database, DatabaseConfig}; struct NoopDBRestore; @@ -54,15 +54,15 @@ fn restored_is_equivalent() { path.push("snapshot"); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Database::open(&db_config, client_db.to_str().unwrap()).unwrap(); let spec = Spec::new_null(); let client2 = Client::new( Default::default(), &spec, - &client_db, + Arc::new(client_db), Arc::new(::miner::Miner::with_spec(&spec)), IoChannel::disconnected(), - &db_config, ).unwrap(); let service_params = ServiceParams { @@ -140,4 +140,4 @@ fn guards_delete_folders() { drop(service); assert!(!path.exists()); -} \ No newline at end of file +} diff --git a/ethcore/src/state_db.rs b/ethcore/src/state_db.rs index dbf9e2f55..38fe26005 100644 --- a/ethcore/src/state_db.rs +++ b/ethcore/src/state_db.rs @@ -18,11 +18,12 @@ use std::collections::{VecDeque, HashSet}; use lru_cache::LruCache; use util::cache::MemoryLruCache; use util::journaldb::JournalDB; +use util::kvdb::KeyValueDB; use util::hash::{H256}; use util::hashdb::HashDB; use state::Account; use header::BlockNumber; -use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable}; +use util::{Arc, Address, DBTransaction, UtilError, Mutex, Hashable}; use bloom_journal::{Bloom, BloomJournal}; use db::COL_ACCOUNT_BLOOM; use byteorder::{LittleEndian, ByteOrder}; @@ -116,7 +117,7 @@ impl StateDB { // TODO: make the cache size actually accurate by moving the account storage cache // into the `AccountCache` structure as its own `LruCache<(Address, H256), H256>`. pub fn new(db: Box, cache_size: usize) -> StateDB { - let bloom = Self::load_bloom(db.backing()); + let bloom = Self::load_bloom(&**db.backing()); let acc_cache_size = cache_size * ACCOUNT_CACHE_RATIO / 100; let code_cache_size = cache_size - acc_cache_size; let cache_items = acc_cache_size / ::std::mem::size_of::>(); @@ -139,7 +140,7 @@ impl StateDB { /// Loads accounts bloom from the database /// This bloom is used to handle request for the non-existant account fast - pub fn load_bloom(db: &Database) -> Bloom { + pub fn load_bloom(db: &KeyValueDB) -> Bloom { let hash_count_entry = db.get(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY) .expect("Low-level database error"); @@ -477,7 +478,7 @@ mod tests { let h2b = H256::random(); let h3a = H256::random(); let h3b = H256::random(); - let mut batch = DBTransaction::new(state_db.journal_db().backing()); + let mut batch = DBTransaction::new(); // blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ] // balance [ 5 5 4 3 2 2 ] diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 6c2c02c2d..d37551231 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -37,14 +37,14 @@ fn imports_from_empty() { let dir = RandomTempPath::new(); let spec = get_test_spec(); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap()); let client = Client::new( ClientConfig::default(), &spec, - dir.as_path(), + client_db, Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected(), - &db_config ).unwrap(); client.import_verified_blocks(); client.flush_queue(); @@ -55,14 +55,14 @@ fn should_return_registrar() { let dir = RandomTempPath::new(); let spec = ethereum::new_morden(); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap()); let client = Client::new( ClientConfig::default(), &spec, - dir.as_path(), + client_db, Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected(), - &db_config ).unwrap(); let params = client.additional_params(); let address = ¶ms["registrar"]; @@ -86,14 +86,14 @@ fn imports_good_block() { let dir = RandomTempPath::new(); let spec = get_test_spec(); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap()); let client = Client::new( ClientConfig::default(), &spec, - dir.as_path(), + client_db, Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected(), - &db_config ).unwrap(); let good_block = get_good_dummy_block(); if client.import_block(good_block).is_err() { @@ -111,14 +111,14 @@ fn query_none_block() { let dir = RandomTempPath::new(); let spec = get_test_spec(); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap()); let client = Client::new( ClientConfig::default(), &spec, - dir.as_path(), + client_db, Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected(), - &db_config ).unwrap(); let non_existant = client.block_header(BlockId::Number(188)); assert!(non_existant.is_none()); @@ -277,10 +277,19 @@ fn change_history_size() { let test_spec = Spec::new_null(); let mut config = ClientConfig::default(); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap()); + config.history = 2; let address = Address::random(); { - let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected(), &db_config).unwrap(); + let client = Client::new( + ClientConfig::default(), + &test_spec, + client_db.clone(), + Arc::new(Miner::with_spec(&test_spec)), + IoChannel::disconnected() + ).unwrap(); + for _ in 0..20 { let mut b = client.prepare_open_block(Address::default(), (3141562.into(), 31415620.into()), vec![]); b.block_mut().fields_mut().state.add_balance(&address, &5.into(), CleanupMode::NoEmpty); @@ -291,7 +300,13 @@ fn change_history_size() { } let mut config = ClientConfig::default(); config.history = 10; - let client = Client::new(config, &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected(), &db_config).unwrap(); + let client = Client::new( + config, + &test_spec, + client_db, + Arc::new(Miner::with_spec(&test_spec)), + IoChannel::disconnected(), + ).unwrap(); assert_eq!(client.state().balance(&address), 100.into()); } diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 8f008bc39..a937f4cf9 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -154,14 +154,14 @@ pub fn generate_dummy_client_with_spec_accounts_and_data(get_test_spec: F, ac let dir = RandomTempPath::new(); let test_spec = get_test_spec(); let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client_db = Arc::new(Database::open(&db_config, dir.as_path().to_str().unwrap()).unwrap()); let client = Client::new( ClientConfig::default(), &test_spec, - dir.as_path(), + client_db, Arc::new(Miner::with_spec_and_accounts(&test_spec, accounts)), IoChannel::disconnected(), - &db_config ).unwrap(); let test_engine = &*test_spec.engine; @@ -260,14 +260,14 @@ pub fn get_test_client_with_blocks(blocks: Vec) -> GuardedTempResult where T: DatabaseExtras { blooms: RwLock>, cache_manager: RwLock>, // db - tracesdb: Arc, + tracesdb: Arc, // config, bloom_config: BloomConfig, // tracing enabled @@ -126,8 +126,8 @@ impl BloomGroupDatabase for TraceDB where T: DatabaseExtras { impl TraceDB where T: DatabaseExtras { /// Creates new instance of `TraceDB`. - pub fn new(config: Config, tracesdb: Arc, extras: Arc) -> Self { - let mut batch = DBTransaction::new(&tracesdb); + pub fn new(config: Config, tracesdb: Arc, extras: Arc) -> Self { + let mut batch = DBTransaction::new(); let genesis = extras.block_hash(0) .expect("Genesis block is always inserted upon extras db creation qed"); batch.write(db::COL_TRACE, &genesis, &FlatBlockTraces::default()); @@ -404,8 +404,7 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { mod tests { use std::collections::HashMap; use std::sync::Arc; - use util::{Address, U256, H256, Database, DatabaseConfig, DBTransaction}; - use devtools::RandomTempPath; + use util::{Address, U256, H256, DBTransaction}; use header::BlockNumber; use trace::{Config, TraceDB, Database as TraceDatabase, DatabaseExtras, ImportRequest}; use trace::{Filter, LocalizedTrace, AddressesFilter, TraceError}; @@ -455,14 +454,13 @@ mod tests { } } - fn new_db(path: &str) -> Arc { - Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), path).unwrap()) + fn new_db() -> Arc<::util::kvdb::KeyValueDB> { + Arc::new(::util::kvdb::in_memory(::db::NUM_COLUMNS.unwrap_or(0))) } #[test] fn test_reopening_db_with_tracing_off() { - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let mut config = Config::default(); // set autotracing @@ -476,8 +474,7 @@ mod tests { #[test] fn test_reopening_db_with_tracing_on() { - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let mut config = Config::default(); // set tracing on @@ -555,8 +552,7 @@ mod tests { #[test] fn test_import_non_canon_traces() { - let temp = RandomTempPath::new(); - let db = Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), temp.as_str()).unwrap()); + let db = new_db(); let mut config = Config::default(); config.enabled = true; let block_0 = H256::from(0xa1); @@ -574,7 +570,7 @@ mod tests { // import block 0 let request = create_noncanon_import_request(0, block_0.clone()); - let mut batch = DBTransaction::new(&db); + let mut batch = DBTransaction::new(); tracedb.import(&mut batch, request); db.write(batch).unwrap(); @@ -584,8 +580,7 @@ mod tests { #[test] fn test_import() { - let temp = RandomTempPath::new(); - let db = Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), temp.as_str()).unwrap()); + let db = new_db(); let mut config = Config::default(); config.enabled = true; let block_1 = H256::from(0xa1); @@ -605,7 +600,7 @@ mod tests { // import block 1 let request = create_simple_import_request(1, block_1.clone()); - let mut batch = DBTransaction::new(&db); + let mut batch = DBTransaction::new(); tracedb.import(&mut batch, request); db.write(batch).unwrap(); @@ -621,7 +616,7 @@ mod tests { // import block 2 let request = create_simple_import_request(2, block_2.clone()); - let mut batch = DBTransaction::new(&db); + let mut batch = DBTransaction::new(); tracedb.import(&mut batch, request); db.write(batch).unwrap(); @@ -664,8 +659,7 @@ mod tests { #[test] fn query_trace_after_reopen() { - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let mut config = Config::default(); let mut extras = Extras::default(); let block_0 = H256::from(0xa1); @@ -684,7 +678,7 @@ mod tests { // import block 1 let request = create_simple_import_request(1, block_0.clone()); - let mut batch = DBTransaction::new(&db); + let mut batch = DBTransaction::new(); tracedb.import(&mut batch, request); db.write(batch).unwrap(); } @@ -698,8 +692,7 @@ mod tests { #[test] fn query_genesis() { - let temp = RandomTempPath::new(); - let db = new_db(temp.as_str()); + let db = new_db(); let mut config = Config::default(); let mut extras = Extras::default(); let block_0 = H256::from(0xa1); diff --git a/local-store/Cargo.toml b/local-store/Cargo.toml new file mode 100644 index 000000000..aa71ab9a2 --- /dev/null +++ b/local-store/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "parity-local-store" +description = "Manages persistent local node data." +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +ethcore-util = { path = "../util" } +ethcore-io = { path = "../util/io" } +ethcore = { path = "../ethcore" } +rlp = {path = "../util/rlp" } +serde = "0.9" +serde_derive = "0.9" +serde_json = "0.9" +log = "0.3" +ethkey = { path = "../ethkey" } diff --git a/local-store/src/lib.rs b/local-store/src/lib.rs new file mode 100644 index 000000000..f9e5fe385 --- /dev/null +++ b/local-store/src/lib.rs @@ -0,0 +1,315 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Manages local node data: pending local transactions, sync security level + +use std::sync::Arc; +use std::fmt; + +use ethcore::transaction::{ + SignedTransaction, PendingTransaction, UnverifiedTransaction, + Condition as TransactionCondition +}; +use ethcore::service::ClientIoMessage; +use io::IoHandler; +use rlp::{UntrustedRlp, View}; +use util::kvdb::KeyValueDB; + +extern crate ethcore; +extern crate ethcore_util as util; +extern crate ethcore_io as io; +extern crate rlp; +extern crate serde_json; +extern crate serde; + +#[macro_use] +extern crate serde_derive; + +#[macro_use] +extern crate log; + +#[cfg(test)] +extern crate ethkey; + +const LOCAL_TRANSACTIONS_KEY: &'static [u8] = &*b"LOCAL_TXS"; + +const UPDATE_TIMER: ::io::TimerToken = 0; +const UPDATE_TIMEOUT_MS: u64 = 15 * 60 * 1000; // once every 15 minutes. + +/// Errors which can occur while using the local data store. +#[derive(Debug)] +pub enum Error { + /// Database errors: these manifest as `String`s. + Database(String), + /// JSON errors. + Json(::serde_json::Error), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Database(ref val) => write!(f, "{}", val), + Error::Json(ref err) => write!(f, "{}", err), + } + } +} + +#[derive(Serialize, Deserialize)] +enum Condition { + Number(::ethcore::header::BlockNumber), + Timestamp(u64), +} + +impl From for Condition { + fn from(cond: TransactionCondition) -> Self { + match cond { + TransactionCondition::Number(num) => Condition::Number(num), + TransactionCondition::Timestamp(tm) => Condition::Timestamp(tm), + } + } +} + +impl Into for Condition { + fn into(self) -> TransactionCondition { + match self { + Condition::Number(num) => TransactionCondition::Number(num), + Condition::Timestamp(tm) => TransactionCondition::Timestamp(tm), + } + } +} + +#[derive(Serialize, Deserialize)] +struct TransactionEntry { + rlp_bytes: Vec, + condition: Option, +} + +impl TransactionEntry { + fn into_pending(self) -> Option { + let tx: UnverifiedTransaction = match UntrustedRlp::new(&self.rlp_bytes).as_val() { + Err(e) => { + warn!(target: "local_store", "Invalid persistent transaction stored: {}", e); + return None + } + Ok(tx) => tx, + }; + + let hash = tx.hash(); + match SignedTransaction::new(tx) { + Ok(tx) => Some(PendingTransaction::new(tx, self.condition.map(Into::into))), + Err(_) => { + warn!(target: "local_store", "Bad signature on persistent transaction: {}", hash); + return None + } + } + } +} + +impl From for TransactionEntry { + fn from(pending: PendingTransaction) -> Self { + TransactionEntry { + rlp_bytes: ::rlp::encode(&pending.transaction).to_vec(), + condition: pending.condition.map(Into::into), + } + } +} + +/// Something which can provide information about the local node. +pub trait NodeInfo: Send + Sync { + /// Get all pending transactions of local origin. + fn pending_transactions(&self) -> Vec; +} + +/// Create a new local data store, given a database, a column to write to, and a node. +/// Attempts to read data out of the store, and move it into the node. +pub fn create(db: Arc, col: Option, node: T) -> LocalDataStore { + LocalDataStore { + db: db, + col: col, + node: node, + } +} + +/// Manages local node data. +/// +/// In specific, this will be used to store things like unpropagated local transactions +/// and the node security level. +pub struct LocalDataStore { + db: Arc, + col: Option, + node: T, +} + +impl LocalDataStore { + /// Attempt to read pending transactions out of the local store. + pub fn pending_transactions(&self) -> Result, Error> { + if let Some(val) = self.db.get(self.col, LOCAL_TRANSACTIONS_KEY).map_err(Error::Database)? { + let local_txs: Vec<_> = ::serde_json::from_slice::>(&val) + .map_err(Error::Json)? + .into_iter() + .filter_map(TransactionEntry::into_pending) + .collect(); + + Ok(local_txs) + } else { + Ok(Vec::new()) + } + } + + /// Update the entries in the database. + pub fn update(&self) -> Result<(), Error> { + trace!(target: "local_store", "Updating local store entries."); + + let mut batch = self.db.transaction(); + + let local_entries: Vec = self.node.pending_transactions() + .into_iter() + .map(Into::into) + .collect(); + + let local_json = ::serde_json::to_value(&local_entries).map_err(Error::Json)?; + let json_str = format!("{}", local_json); + + batch.put_vec(self.col, LOCAL_TRANSACTIONS_KEY, json_str.into_bytes()); + self.db.write(batch).map_err(Error::Database) + } +} + +impl IoHandler for LocalDataStore { + fn initialize(&self, io: &::io::IoContext) { + if let Err(e) = io.register_timer(UPDATE_TIMER, UPDATE_TIMEOUT_MS) { + warn!(target: "local_store", "Error registering local store update timer: {}", e); + } + } + + fn timeout(&self, _io: &::io::IoContext, timer: ::io::TimerToken) { + if let UPDATE_TIMER = timer { + if let Err(e) = self.update() { + debug!(target: "local_store", "Error updating local store: {}", e); + } + } + } +} + +impl Drop for LocalDataStore { + fn drop(&mut self) { + debug!(target: "local_store", "Updating node data store on shutdown."); + + let _ = self.update(); + } +} + +#[cfg(test)] +mod tests { + use super::NodeInfo; + + use std::sync::Arc; + use ethcore::transaction::{Transaction, Condition, PendingTransaction}; + use ethkey::{Brain, Generator}; + + // we want to test: round-trip of good transactions. + // failure to roundtrip bad transactions (but that it doesn't panic) + + struct Dummy(Vec); + impl NodeInfo for Dummy { + fn pending_transactions(&self) -> Vec { self.0.clone() } + } + + #[test] + fn twice_empty() { + let db = Arc::new(::util::kvdb::in_memory(0)); + + { + let store = super::create(db.clone(), None, Dummy(vec![])); + assert_eq!(store.pending_transactions().unwrap(), vec![]) + } + + { + let store = super::create(db.clone(), None, Dummy(vec![])); + assert_eq!(store.pending_transactions().unwrap(), vec![]) + } + } + + #[test] + fn with_condition() { + let keypair = Brain::new("abcd".into()).generate().unwrap(); + let transactions: Vec<_> = (0..10u64).map(|nonce| { + let mut tx = Transaction::default(); + tx.nonce = nonce.into(); + + let signed = tx.sign(keypair.secret(), None); + let condition = match nonce { + 5 => Some(Condition::Number(100_000)), + _ => None, + }; + + PendingTransaction::new(signed, condition) + }).collect(); + + let db = Arc::new(::util::kvdb::in_memory(0)); + + { + // nothing written yet, will write pending. + let store = super::create(db.clone(), None, Dummy(transactions.clone())); + assert_eq!(store.pending_transactions().unwrap(), vec![]) + } + { + // pending written, will write nothing. + let store = super::create(db.clone(), None, Dummy(vec![])); + assert_eq!(store.pending_transactions().unwrap(), transactions) + } + { + // pending removed, will write nothing. + let store = super::create(db.clone(), None, Dummy(vec![])); + assert_eq!(store.pending_transactions().unwrap(), vec![]) + } + } + + #[test] + fn skips_bad_transactions() { + let keypair = Brain::new("abcd".into()).generate().unwrap(); + let mut transactions: Vec<_> = (0..10u64).map(|nonce| { + let mut tx = Transaction::default(); + tx.nonce = nonce.into(); + + let signed = tx.sign(keypair.secret(), None); + + PendingTransaction::new(signed, None) + }).collect(); + + transactions.push({ + let mut tx = Transaction::default(); + tx.nonce = 10.into(); + + let signed = tx.fake_sign(Default::default()); + PendingTransaction::new(signed, None) + }); + + let db = Arc::new(::util::kvdb::in_memory(0)); + { + // nothing written, will write bad. + let store = super::create(db.clone(), None, Dummy(transactions.clone())); + assert_eq!(store.pending_transactions().unwrap(), vec![]) + } + { + // try to load transactions. The last transaction, which is invalid, will be skipped. + let store = super::create(db.clone(), None, Dummy(vec![])); + let loaded = store.pending_transactions().unwrap(); + transactions.pop(); + assert_eq!(loaded, transactions); + } + } +} diff --git a/parity/main.rs b/parity/main.rs index eefd42a94..8e0bae0aa 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -59,6 +59,7 @@ extern crate parity_hash_fetch as hash_fetch; extern crate parity_ipfs_api; extern crate parity_reactor; extern crate parity_updater as updater; +extern crate parity_local_store as local_store; extern crate rpc_cli; #[macro_use] diff --git a/parity/migration.rs b/parity/migration.rs index 531abd0d8..c2d5c0797 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -30,7 +30,7 @@ use ethcore::migrations::Extract; /// Database is assumed to be at default version, when no version file is found. const DEFAULT_VERSION: u32 = 5; /// Current version of database models. -const CURRENT_VERSION: u32 = 10; +const CURRENT_VERSION: u32 = 11; /// First version of the consolidated database. const CONSOLIDATION_VERSION: u32 = 9; /// Defines how many items are migrated to the new version of database at once. @@ -146,6 +146,7 @@ pub fn default_migration_settings(compaction_profile: &CompactionProfile) -> Mig fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> Result { let mut manager = MigrationManager::new(default_migration_settings(compaction_profile)); manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible)?; + manager.add_migration(migrations::ToV11::default()).map_err(|_| Error::MigrationImpossible)?; Ok(manager) } diff --git a/parity/run.rs b/parity/run.rs index e91a8716b..d10aa10e1 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -136,6 +136,21 @@ pub fn open_dapp(dapps_conf: &dapps::Configuration, dapp: &str) -> Result<(), St Ok(()) } +// node info fetcher for the local store. +struct FullNodeInfo { + miner: Arc, // TODO: only TXQ needed, just use that after decoupling. +} + +impl ::local_store::NodeInfo for FullNodeInfo { + fn pending_transactions(&self) -> Vec<::ethcore::transaction::PendingTransaction> { + let local_txs = self.miner.local_transactions(); + self.miner.pending_transactions() + .into_iter() + .chain(self.miner.future_transactions()) + .filter(|tx| local_txs.contains_key(&tx.hash())) + .collect() + } +} pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result { if cmd.ui && cmd.dapps_conf.enabled { @@ -318,6 +333,33 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R let client = service.client(); let snapshot_service = service.snapshot_service(); + // initialize the local node information store. + let store = { + let db = service.db(); + let node_info = FullNodeInfo { + miner: miner.clone(), + }; + + let store = ::local_store::create(db, ::ethcore::db::COL_NODE_INFO, node_info); + + // re-queue pending transactions. + match store.pending_transactions() { + Ok(pending) => { + for pending_tx in pending { + if let Err(e) = miner.import_own_transaction(&*client, pending_tx) { + warn!("Error importing saved transaction: {}", e) + } + } + } + Err(e) => warn!("Error loading cached pending transactions from disk: {}", e), + } + + Arc::new(store) + }; + + // register it as an IO service to update periodically. + service.register_io_handler(store).map_err(|_| "Unable to register local store handler".to_owned())?; + // create external miner let external_miner = Arc::new(ExternalMiner::default()); diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 3f66427e7..6b937d733 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -116,19 +116,16 @@ impl EthTester { } fn from_spec(spec: Spec) -> Self { - let dir = RandomTempPath::new(); let account_provider = account_provider(); let miner_service = miner_service(&spec, account_provider.clone()); let snapshot_service = snapshot_service(); - let db_config = ::util::kvdb::DatabaseConfig::with_columns(::ethcore::db::NUM_COLUMNS); let client = Client::new( ClientConfig::default(), &spec, - dir.as_path(), + Arc::new(::util::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))), miner_service.clone(), IoChannel::disconnected(), - &db_config ).unwrap(); let sync_provider = sync_provider(); let external_miner = Arc::new(ExternalMiner::default()); diff --git a/sync/src/tests/consensus.rs b/sync/src/tests/consensus.rs index 15b4d57a4..228096f28 100644 --- a/sync/src/tests/consensus.rs +++ b/sync/src/tests/consensus.rs @@ -62,7 +62,6 @@ fn authority_round() { ap.insert_account(s1.secret().clone(), "").unwrap(); let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_round, Some(ap)); - let mut net = &mut *net; let io_handler0: Arc> = Arc::new(TestIoHandler { client: net.peer(0).chain.clone() }); let io_handler1: Arc> = Arc::new(TestIoHandler { client: net.peer(1).chain.clone() }); // Push transaction to both clients. Only one of them gets lucky to produce a block. @@ -121,7 +120,6 @@ fn tendermint() { ap.insert_account(s1.secret().clone(), "").unwrap(); let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_tendermint, Some(ap)); - let mut net = &mut *net; let io_handler0: Arc> = Arc::new(TestIoHandler { client: net.peer(0).chain.clone() }); let io_handler1: Arc> = Arc::new(TestIoHandler { client: net.peer(1).chain.clone() }); // Push transaction to both clients. Only one of them issues a proposal. diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 46679b9eb..328c0a24f 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -23,13 +23,11 @@ use ethcore::snapshot::SnapshotService; use ethcore::spec::Spec; use ethcore::account_provider::AccountProvider; use ethcore::miner::Miner; -use ethcore::db::NUM_COLUMNS; use sync_io::SyncIo; use io::IoChannel; use api::WARP_SYNC_PROTOCOL_ID; use chain::ChainSync; use ::SyncConfig; -use devtools::{self, GuardedTempResult}; pub trait FlushingBlockChainClient: BlockChainClient { fn flush(&self) {} @@ -271,7 +269,7 @@ impl TestNet> { } impl TestNet> { - pub fn with_spec_and_accounts(n: usize, config: SyncConfig, spec_factory: F, accounts: Option>) -> GuardedTempResult + pub fn with_spec_and_accounts(n: usize, config: SyncConfig, spec_factory: F, accounts: Option>) -> Self where F: Fn() -> Spec { let mut net = TestNet { @@ -279,21 +277,15 @@ impl TestNet> { started: false, disconnect_events: Vec::new(), }; - let dir = devtools::RandomTempPath::new(); + for _ in 0..n { - let mut client_dir = dir.as_path().clone(); - client_dir.push(devtools::random_filename()); - - let db_config = DatabaseConfig::with_columns(NUM_COLUMNS); - let spec = spec_factory(); let client = EthcoreClient::new( ClientConfig::default(), &spec, - client_dir.as_path(), + Arc::new(::util::kvdb::in_memory(::ethcore::db::NUM_COLUMNS.unwrap_or(0))), Arc::new(Miner::with_spec_and_accounts(&spec, accounts.clone())), IoChannel::disconnected(), - &db_config ).unwrap(); let ss = Arc::new(TestSnapshotService::new()); @@ -307,10 +299,8 @@ impl TestNet> { peer.chain.add_notify(peer.clone()); net.peers.push(peer); } - GuardedTempResult { - _temp: dir, - result: Some(net) - } + + net } } diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs index 62c1a924b..1f9381c99 100644 --- a/util/src/journaldb/archivedb.rs +++ b/util/src/journaldb/archivedb.rs @@ -22,9 +22,7 @@ use hashdb::*; use memorydb::*; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::traits::JournalDB; -use kvdb::{Database, DBTransaction}; -#[cfg(test)] -use std::env; +use kvdb::{KeyValueDB, DBTransaction}; /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// and latent-removal semantics. @@ -35,14 +33,14 @@ use std::env; /// that the states of any block the node has ever processed will be accessible. pub struct ArchiveDB { overlay: MemoryDB, - backing: Arc, + backing: Arc, latest_era: Option, column: Option, } impl ArchiveDB { - /// Create a new instance from file - pub fn new(backing: Arc, col: Option) -> ArchiveDB { + /// Create a new instance from a key-value db. + pub fn new(backing: Arc, col: Option) -> ArchiveDB { let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::(&val)); ArchiveDB { overlay: MemoryDB::new(), @@ -55,9 +53,7 @@ impl ArchiveDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] fn new_temp() -> ArchiveDB { - let mut dir = env::temp_dir(); - dir.push(H32::random().hex()); - let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()); + let backing = Arc::new(::kvdb::in_memory(0)); Self::new(backing, None) } @@ -186,7 +182,7 @@ impl JournalDB for ArchiveDB { fn is_pruned(&self) -> bool { false } - fn backing(&self) -> &Arc { + fn backing(&self) -> &Arc { &self.backing } diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index aa0931903..9644a60ac 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -22,9 +22,7 @@ use hashdb::*; use memorydb::*; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::traits::JournalDB; -use kvdb::{Database, DBTransaction}; -#[cfg(test)] -use std::env; +use kvdb::{KeyValueDB, DBTransaction}; #[derive(Clone, PartialEq, Eq)] struct RefInfo { @@ -112,7 +110,7 @@ enum RemoveFrom { /// TODO: `store_reclaim_period` pub struct EarlyMergeDB { overlay: MemoryDB, - backing: Arc, + backing: Arc, refs: Option>>>, latest_era: Option, column: Option, @@ -122,8 +120,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ]; impl EarlyMergeDB { /// Create a new instance from file - pub fn new(backing: Arc, col: Option) -> EarlyMergeDB { - let (latest_era, refs) = EarlyMergeDB::read_refs(&backing, col); + pub fn new(backing: Arc, col: Option) -> EarlyMergeDB { + let (latest_era, refs) = EarlyMergeDB::read_refs(&*backing, col); let refs = Some(Arc::new(RwLock::new(refs))); EarlyMergeDB { overlay: MemoryDB::new(), @@ -137,9 +135,7 @@ impl EarlyMergeDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] fn new_temp() -> EarlyMergeDB { - let mut dir = env::temp_dir(); - dir.push(H32::random().hex()); - let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()); + let backing = Arc::new(::kvdb::in_memory(0)); Self::new(backing, None) } @@ -152,11 +148,11 @@ impl EarlyMergeDB { // The next three are valid only as long as there is an insert operation of `key` in the journal. fn set_already_in(batch: &mut DBTransaction, col: Option, key: &H256) { batch.put(col, &Self::morph_key(key, 0), &[1u8]); } fn reset_already_in(batch: &mut DBTransaction, col: Option, key: &H256) { batch.delete(col, &Self::morph_key(key, 0)); } - fn is_already_in(backing: &Database, col: Option, key: &H256) -> bool { + fn is_already_in(backing: &KeyValueDB, col: Option, key: &H256) -> bool { backing.get(col, &Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() } - fn insert_keys(inserts: &[(H256, DBValue)], backing: &Database, col: Option, refs: &mut HashMap, batch: &mut DBTransaction, trace: bool) { + fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option, refs: &mut HashMap, batch: &mut DBTransaction, trace: bool) { for &(ref h, ref d) in inserts { if let Some(c) = refs.get_mut(h) { // already counting. increment. @@ -189,7 +185,7 @@ impl EarlyMergeDB { } } - fn replay_keys(inserts: &[H256], backing: &Database, col: Option, refs: &mut HashMap) { + fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option, refs: &mut HashMap) { trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs); for h in inserts { if let Some(c) = refs.get_mut(h) { @@ -262,7 +258,7 @@ impl EarlyMergeDB { #[cfg(test)] fn can_reconstruct_refs(&self) -> bool { - let (latest_era, reconstructed) = Self::read_refs(&self.backing, self.column); + let (latest_era, reconstructed) = Self::read_refs(&*self.backing, self.column); let refs = self.refs.as_ref().unwrap().write(); if *refs != reconstructed || latest_era != self.latest_era { let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::>(); @@ -278,7 +274,7 @@ impl EarlyMergeDB { self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?") } - fn read_refs(db: &Database, col: Option) -> (Option, HashMap) { + fn read_refs(db: &KeyValueDB, col: Option) -> (Option, HashMap) { let mut refs = HashMap::new(); let mut latest_era = None; if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") { @@ -361,7 +357,7 @@ impl JournalDB for EarlyMergeDB { self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none() } - fn backing(&self) -> &Arc { + fn backing(&self) -> &Arc { &self.backing } @@ -432,7 +428,7 @@ impl JournalDB for EarlyMergeDB { r.begin_list(inserts.len()); inserts.iter().foreach(|&(k, _)| {r.append(&k);}); r.append(&removes); - Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace); + Self::insert_keys(&inserts, &*self.backing, self.column, &mut refs, batch, trace); let ins = inserts.iter().map(|&(k, _)| k).collect::>(); @@ -558,7 +554,7 @@ mod tests { use super::*; use super::super::traits::JournalDB; use log::init_log; - use kvdb::{Database, DatabaseConfig}; + use kvdb::{DatabaseConfig}; #[test] fn insert_same_in_fork() { @@ -817,7 +813,7 @@ mod tests { fn new_db(path: &Path) -> EarlyMergeDB { let config = DatabaseConfig::with_columns(Some(1)); - let backing = Arc::new(Database::open(&config, path.to_str().unwrap()).unwrap()); + let backing = Arc::new(::kvdb::Database::open(&config, path.to_str().unwrap()).unwrap()); EarlyMergeDB::new(backing, Some(0)) } diff --git a/util/src/journaldb/mod.rs b/util/src/journaldb/mod.rs index 00c35a34f..e949b269f 100644 --- a/util/src/journaldb/mod.rs +++ b/util/src/journaldb/mod.rs @@ -17,7 +17,6 @@ //! `JournalDB` interface and implementation. use common::*; -use kvdb::Database; /// Export the journaldb module. pub mod traits; @@ -115,8 +114,8 @@ impl fmt::Display for Algorithm { } } -/// Create a new `JournalDB` trait object. -pub fn new(backing: Arc, algorithm: Algorithm, col: Option) -> Box { +/// Create a new `JournalDB` trait object over a generic key-value database. +pub fn new(backing: Arc<::kvdb::KeyValueDB>, algorithm: Algorithm, col: Option) -> Box { match algorithm { Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(backing, col)), Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(backing, col)), @@ -184,4 +183,4 @@ mod tests { assert_eq!(overlayrecent, 1); assert_eq!(refcounted, 1); } -} \ No newline at end of file +} diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 5be06d714..ed39905a2 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -21,9 +21,7 @@ use rlp::*; use hashdb::*; use memorydb::*; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; -use kvdb::{Database, DBTransaction}; -#[cfg(test)] -use std::env; +use kvdb::{KeyValueDB, DBTransaction}; use super::JournalDB; /// Implementation of the `JournalDB` trait for a disk-backed database with a memory overlay @@ -59,7 +57,7 @@ use super::JournalDB; pub struct OverlayRecentDB { transaction_overlay: MemoryDB, - backing: Arc, + backing: Arc, journal_overlay: Arc>, column: Option, } @@ -102,8 +100,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ]; impl OverlayRecentDB { /// Create a new instance. - pub fn new(backing: Arc, col: Option) -> OverlayRecentDB { - let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&backing, col))); + pub fn new(backing: Arc, col: Option) -> OverlayRecentDB { + let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&*backing, col))); OverlayRecentDB { transaction_overlay: MemoryDB::new(), backing: backing, @@ -115,15 +113,13 @@ impl OverlayRecentDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] pub fn new_temp() -> OverlayRecentDB { - let mut dir = env::temp_dir(); - dir.push(H32::random().hex()); - let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()); + let backing = Arc::new(::kvdb::in_memory(0)); Self::new(backing, None) } #[cfg(test)] fn can_reconstruct_refs(&self) -> bool { - let reconstructed = Self::read_overlay(&self.backing, self.column); + let reconstructed = Self::read_overlay(&*self.backing, self.column); let journal_overlay = self.journal_overlay.read(); journal_overlay.backing_overlay == reconstructed.backing_overlay && journal_overlay.pending_overlay == reconstructed.pending_overlay && @@ -136,7 +132,7 @@ impl OverlayRecentDB { self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?") } - fn read_overlay(db: &Database, col: Option) -> JournalOverlay { + fn read_overlay(db: &KeyValueDB, col: Option) -> JournalOverlay { let mut journal = HashMap::new(); let mut overlay = MemoryDB::new(); let mut count = 0; @@ -235,7 +231,7 @@ impl JournalDB for OverlayRecentDB { self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none() } - fn backing(&self) -> &Arc { + fn backing(&self) -> &Arc { &self.backing } diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index 8b9092cd7..57d7aa7c0 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -23,9 +23,7 @@ use overlaydb::OverlayDB; use memorydb::MemoryDB; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::traits::JournalDB; -use kvdb::{Database, DBTransaction}; -#[cfg(test)] -use std::env; +use kvdb::{KeyValueDB, DBTransaction}; /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// and latent-removal semantics. @@ -49,7 +47,7 @@ use std::env; // TODO: store last_era, reclaim_period. pub struct RefCountedDB { forward: OverlayDB, - backing: Arc, + backing: Arc, latest_era: Option, inserts: Vec, removes: Vec, @@ -60,7 +58,7 @@ const PADDING : [u8; 10] = [ 0u8; 10 ]; impl RefCountedDB { /// Create a new instance given a `backing` database. - pub fn new(backing: Arc, col: Option) -> RefCountedDB { + pub fn new(backing: Arc, col: Option) -> RefCountedDB { let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::(&val)); RefCountedDB { @@ -76,9 +74,7 @@ impl RefCountedDB { /// Create a new instance with an anonymous temporary database. #[cfg(test)] fn new_temp() -> RefCountedDB { - let mut dir = env::temp_dir(); - dir.push(H32::random().hex()); - let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()); + let backing = Arc::new(::kvdb::in_memory(0)); Self::new(backing, None) } } @@ -112,7 +108,7 @@ impl JournalDB for RefCountedDB { self.latest_era.is_none() } - fn backing(&self) -> &Arc { + fn backing(&self) -> &Arc { &self.backing } diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index 3b81c6544..8a89f1368 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -18,7 +18,7 @@ use common::*; use hashdb::*; -use kvdb::{Database, DBTransaction}; +use kvdb::{self, DBTransaction}; /// A `HashDB` which can manage a short-term journal potentially containing many forks of mutually /// exclusive actions. @@ -66,7 +66,7 @@ pub trait JournalDB: HashDB { fn is_pruned(&self) -> bool { true } /// Get backing database. - fn backing(&self) -> &Arc; + fn backing(&self) -> &Arc; /// Clear internal strucutres. This should called after changes have been written /// to the backing strage diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index b18fbb7b5..1714ce22f 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -17,10 +17,11 @@ //! Key-Value store abstraction with `RocksDB` backend. use std::io::ErrorKind; +use std::marker::PhantomData; +use std::path::PathBuf; + use common::*; use elastic_array::*; -use std::default::Default; -use std::path::PathBuf; use hashdb::DBValue; use rlp::{UntrustedRlp, RlpType, View, Compressible}; use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, @@ -36,10 +37,12 @@ const DB_BACKGROUND_FLUSHES: i32 = 2; const DB_BACKGROUND_COMPACTIONS: i32 = 2; /// Write transaction. Batches a sequence of put/delete operations for efficiency. +#[derive(Default, Clone, PartialEq)] pub struct DBTransaction { ops: Vec, } +#[derive(Clone, PartialEq)] enum DBOp { Insert { col: Option, @@ -59,9 +62,14 @@ enum DBOp { impl DBTransaction { /// Create new transaction. - pub fn new(_db: &Database) -> DBTransaction { + pub fn new() -> DBTransaction { + DBTransaction::with_capacity(256) + } + + /// Create new transaction with capacity. + pub fn with_capacity(cap: usize) -> DBTransaction { DBTransaction { - ops: Vec::with_capacity(256), + ops: Vec::with_capacity(cap) } } @@ -116,6 +124,138 @@ enum KeyState { Delete, } +/// Generic key-value database. +/// +/// This makes a distinction between "buffered" and "flushed" values. Values which have been +/// written can always be read, but may be present in an in-memory buffer. Values which have +/// been flushed have been moved to backing storage, like a RocksDB instance. There are certain +/// operations which are only guaranteed to operate on flushed data and not buffered, +/// although implementations may differ in this regard. +/// +/// The contents of an interior buffer may be explicitly flushed using the `flush` method. +/// +/// The `KeyValueDB` also deals in "column families", which can be thought of as distinct +/// stores within a database. Keys written in one column family will not be accessible from +/// any other. The number of column families must be specified at initialization, with a +/// differing interface for each database. The `None` argument in place of a column index +/// is always supported. +/// +/// The API laid out here, along with the `Sync` bound implies interior synchronization for +/// implementation. +pub trait KeyValueDB: Sync + Send { + /// Helper to create a new transaction. + fn transaction(&self) -> DBTransaction { DBTransaction::new() } + + /// Get a value by key. + fn get(&self, col: Option, key: &[u8]) -> Result, String>; + + /// Get a value by partial key. Only works for flushed data. + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option>; + + /// Write a transaction of changes to the buffer. + fn write_buffered(&self, transaction: DBTransaction); + + /// Write a transaction of changes to the backing store. + fn write(&self, transaction: DBTransaction) -> Result<(), String> { + self.write_buffered(transaction); + self.flush() + } + + /// Flush all buffered data. + fn flush(&self) -> Result<(), String>; + + /// Iterate over flushed data for a given column. + fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a>; + + /// Attempt to replace this database with a new one located at the given path. + fn restore(&self, new_db: &str) -> Result<(), UtilError>; +} + +/// A key-value database fulfilling the `KeyValueDB` trait, living in memory. +/// This is generally intended for tests and is not particularly optimized. +pub struct InMemory { + columns: RwLock, BTreeMap, DBValue>>>, +} + +/// Create an in-memory database with the given number of columns. +/// Columns will be indexable by 0..`num_cols` +pub fn in_memory(num_cols: u32) -> InMemory { + let mut cols = HashMap::new(); + cols.insert(None, BTreeMap::new()); + + for idx in 0..num_cols { + cols.insert(Some(idx), BTreeMap::new()); + } + + InMemory { + columns: RwLock::new(cols) + } +} + +impl KeyValueDB for InMemory { + fn get(&self, col: Option, key: &[u8]) -> Result, String> { + let columns = self.columns.read(); + match columns.get(&col) { + None => Err(format!("No such column family: {:?}", col)), + Some(map) => Ok(map.get(key).cloned()), + } + } + + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + let columns = self.columns.read(); + match columns.get(&col) { + None => None, + Some(map) => + map.iter() + .find(|&(ref k ,_)| k.starts_with(prefix)) + .map(|(_, v)| (&**v).to_vec().into_boxed_slice()) + } + } + + fn write_buffered(&self, transaction: DBTransaction) { + let mut columns = self.columns.write(); + let ops = transaction.ops; + for op in ops { + match op { + DBOp::Insert { col, key, value } => { + if let Some(mut col) = columns.get_mut(&col) { + col.insert(key.to_vec(), value); + } + }, + DBOp::InsertCompressed { col, key, value } => { + if let Some(mut col) = columns.get_mut(&col) { + let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); + let mut value = DBValue::new(); + value.append_slice(&compressed); + col.insert(key.to_vec(), value); + } + }, + DBOp::Delete { col, key } => { + if let Some(mut col) = columns.get_mut(&col) { + col.remove(&*key); + } + }, + } + } + } + + fn flush(&self) -> Result<(), String> { Ok(()) } + fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { + match self.columns.read().get(&col) { + Some(map) => Box::new( // TODO: worth optimizing at all? + map.clone() + .into_iter() + .map(|(k, v)| (k.into_boxed_slice(), v.to_vec().into_boxed_slice())) + ), + None => Box::new(None.into_iter()) + } + } + + fn restore(&self, _new_db: &str) -> Result<(), UtilError> { + Err(UtilError::SimpleString("Attempted to restore in-memory database".into())) + } +} + /// Compaction profile for the database settings #[derive(Clone, Copy, PartialEq, Debug)] pub struct CompactionProfile { @@ -248,12 +388,16 @@ impl Default for DatabaseConfig { } } -/// Database iterator for flushed data only -pub struct DatabaseIterator { +/// Database iterator (for flushed data only) +// The compromise of holding only a virtual borrow vs. holding a lock on the +// inner DB (to prevent closing via restoration) may be re-evaluated in the future. +// +pub struct DatabaseIterator<'a> { iter: DBIterator, + _marker: PhantomData<&'a Database>, } -impl<'a> Iterator for DatabaseIterator { +impl<'a> Iterator for DatabaseIterator<'a> { type Item = (Box<[u8]>, Box<[u8]>); fn next(&mut self) -> Option { @@ -393,9 +537,9 @@ impl Database { }) } - /// Creates new transaction for this database. + /// Helper to create new transaction for this database. pub fn transaction(&self) -> DBTransaction { - DBTransaction::new(self) + DBTransaction::new() } @@ -562,9 +706,16 @@ impl Database { //TODO: iterate over overlay match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - col.map_or_else(|| DatabaseIterator { iter: db.iterator_opt(IteratorMode::Start, &self.read_opts) }, - |c| DatabaseIterator { iter: db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) - .expect("iterator params are valid; qed") }) + let iter = col.map_or_else( + || db.iterator_opt(IteratorMode::Start, &self.read_opts), + |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) + .expect("iterator params are valid; qed") + ); + + DatabaseIterator { + iter: iter, + _marker: PhantomData, + } }, None => panic!("Not supported yet") //TODO: return an empty iterator or change return type } @@ -619,6 +770,39 @@ impl Database { } } +// duplicate declaration of methods here to avoid trait import in certain existing cases +// at time of addition. +impl KeyValueDB for Database { + fn get(&self, col: Option, key: &[u8]) -> Result, String> { + Database::get(self, col, key) + } + + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + Database::get_by_prefix(self, col, prefix) + } + + fn write_buffered(&self, transaction: DBTransaction) { + Database::write_buffered(self, transaction) + } + + fn write(&self, transaction: DBTransaction) -> Result<(), String> { + Database::write(self, transaction) + } + + fn flush(&self) -> Result<(), String> { + Database::flush(self) + } + + fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { + let unboxed = Database::iter(self, col); + Box::new(unboxed) + } + + fn restore(&self, new_db: &str) -> Result<(), UtilError> { + Database::restore(self, new_db) + } +} + impl Drop for Database { fn drop(&mut self) { // write all buffered changes if we can. diff --git a/util/src/migration/mod.rs b/util/src/migration/mod.rs index e7a9ef00e..50464444f 100644 --- a/util/src/migration/mod.rs +++ b/util/src/migration/mod.rs @@ -74,7 +74,7 @@ impl Batch { pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> { if self.inner.is_empty() { return Ok(()) } - let mut transaction = DBTransaction::new(dest); + let mut transaction = DBTransaction::new(); for keypair in &self.inner { transaction.put(self.column, &keypair.0, &keypair.1); diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index fa4891c66..6b6f501a7 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -23,7 +23,7 @@ use hashdb::*; use memorydb::*; use std::sync::*; use std::collections::HashMap; -use kvdb::{Database, DBTransaction}; +use kvdb::{KeyValueDB, DBTransaction}; /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay. /// @@ -36,22 +36,21 @@ use kvdb::{Database, DBTransaction}; #[derive(Clone)] pub struct OverlayDB { overlay: MemoryDB, - backing: Arc, + backing: Arc, column: Option, } impl OverlayDB { /// Create a new instance of OverlayDB given a `backing` database. - pub fn new(backing: Arc, col: Option) -> OverlayDB { + pub fn new(backing: Arc, col: Option) -> OverlayDB { OverlayDB{ overlay: MemoryDB::new(), backing: backing, column: col } } /// Create a new instance of OverlayDB with an anonymous temporary database. #[cfg(test)] pub fn new_temp() -> OverlayDB { - let mut dir = ::std::env::temp_dir(); - dir.push(H32::random().hex()); - Self::new(Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()), None) + let backing = Arc::new(::kvdb::in_memory(0)); + Self::new(backing, None) } /// Commit all operations in a single batch. @@ -295,23 +294,3 @@ fn overlaydb_complex() { trie.commit().unwrap(); // assert_eq!(trie.get(&hfoo), None); } - -#[test] -fn playpen() { - use std::fs; - { - let db = Database::open_default("/tmp/test").unwrap(); - let mut batch = db.transaction(); - batch.put(None, b"test", b"test2"); - db.write(batch).unwrap(); - match db.get(None, b"test") { - Ok(Some(value)) => println!("Got value {:?}", &*value), - Ok(None) => println!("No value for that key"), - Err(..) => println!("Gah"), - } - let mut batch = db.transaction(); - batch.delete(None, b"test"); - db.write(batch).unwrap(); - } - fs::remove_dir_all("/tmp/test").unwrap(); -}