diff --git a/Cargo.lock b/Cargo.lock index 15cb17c88..bb91080c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,11 @@ name = "bloomchain" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bytes" version = "0.3.0" @@ -270,10 +275,12 @@ version = "1.4.0" dependencies = [ "bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.90 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethash 1.4.0", + "ethcore-bloom-journal 0.1.0", "ethcore-devtools 1.4.0", "ethcore-io 1.4.0", "ethcore-ipc 1.4.0", @@ -1901,6 +1908,7 @@ dependencies = [ "checksum bitflags 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4f67931368edf3a9a51d29886d245f1c3db2f1ef0dcc9e35ff70341b78c10d23" "checksum blastfig 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09640e0509d97d5cdff03a9f5daf087a8e04c735c3b113a75139634a19cfc7b2" "checksum bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f421095d2a76fc24cd3fb3f912b90df06be7689912b1bdb423caefae59c258d" +"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" "checksum bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c129aff112dcc562970abb69e2508b40850dd24c274761bb50fb8a0067ba6c27" "checksum bytes 0.4.0-dev (git+https://github.com/carllerche/bytes)" = "" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 2f1291d56..96dcba3f8 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -38,6 +38,8 @@ ethcore-ipc-nano = { path = "../ipc/nano" } rlp = { path = "../util/rlp" } rand = "0.3" lru-cache = "0.0.7" +ethcore-bloom-journal = { path = "../util/bloom" } +byteorder = "0.5" [dependencies.hyper] git = "https://github.com/ethcore/hyper" diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index bd74eb958..11929294f 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -25,8 +25,9 @@ use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action}; use blockchain::TreeRoute; use client::{ BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID, - TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError + TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError, }; +use db::{NUM_COLUMNS, COL_STATE}; use header::{Header as BlockHeader, BlockNumber}; use filter::Filter; use log_entry::LocalizedLogEntry; @@ -286,8 +287,8 @@ impl TestBlockChainClient { pub fn get_temp_state_db() -> GuardedTempResult { let temp = RandomTempPath::new(); - let db = Database::open_default(temp.as_str()).unwrap(); - let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, None); + let db = Database::open(&DatabaseConfig::with_columns(NUM_COLUMNS), temp.as_str()).unwrap(); + let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, COL_STATE); let state_db = StateDB::new(journal_db); GuardedTempResult { _temp: temp, diff --git a/ethcore/src/db.rs b/ethcore/src/db.rs index c8c24cc5f..10672d730 100644 --- a/ethcore/src/db.rs +++ b/ethcore/src/db.rs @@ -34,8 +34,10 @@ pub const COL_BODIES: Option = Some(2); pub const COL_EXTRA: Option = Some(3); /// Column for Traces pub const COL_TRACE: Option = Some(4); +/// Column for Traces +pub const COL_ACCOUNT_BLOOM: Option = Some(5); /// Number of columns in DB -pub const NUM_COLUMNS: Option = Some(5); +pub const NUM_COLUMNS: Option = Some(6); /// Modes for updating caches. #[derive(Clone, Copy)] diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index cc398a2f5..c72a977cf 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -99,6 +99,8 @@ extern crate ethcore_devtools as devtools; extern crate rand; extern crate bit_set; extern crate rlp; +extern crate ethcore_bloom_journal as bloom_journal; +extern crate byteorder; #[macro_use] extern crate log; diff --git a/ethcore/src/migrations/mod.rs b/ethcore/src/migrations/mod.rs index 5c0c6f420..7ccafac74 100644 --- a/ethcore/src/migrations/mod.rs +++ b/ethcore/src/migrations/mod.rs @@ -23,3 +23,6 @@ pub mod extras; mod v9; pub use self::v9::ToV9; pub use self::v9::Extract; + +mod v10; +pub use self::v10::ToV10; diff --git a/ethcore/src/migrations/v10.rs b/ethcore/src/migrations/v10.rs new file mode 100644 index 000000000..88884fb26 --- /dev/null +++ b/ethcore/src/migrations/v10.rs @@ -0,0 +1,117 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Bloom upgrade + +use std::sync::Arc; +use db::{COL_EXTRA, COL_HEADERS, COL_STATE}; +use state_db::{ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET, StateDB}; +use util::trie::TrieDB; +use views::HeaderView; +use bloom_journal::Bloom; +use util::migration::{Error, Migration, Progress, Batch, Config}; +use util::journaldb; +use util::{H256, FixedHash, Trie}; +use util::{Database, DBTransaction}; + +/// Account bloom upgrade routine. If bloom already present, does nothing. +/// If database empty (no best block), does nothing. +/// Can be called on upgraded database with no issues (will do nothing). +pub fn generate_bloom(source: Arc, dest: &mut Database) -> Result<(), Error> { + trace!(target: "migration", "Account bloom upgrade started"); + let best_block_hash = match try!(source.get(COL_EXTRA, b"best")) { + // no migration needed + None => { + trace!(target: "migration", "No best block hash, skipping"); + return Ok(()); + }, + Some(hash) => hash, + }; + let best_block_header = match try!(source.get(COL_HEADERS, &best_block_hash)) { + // no best block, nothing to do + None => { + trace!(target: "migration", "No best block header, skipping"); + return Ok(()) + }, + Some(x) => x, + }; + let state_root = HeaderView::new(&best_block_header).state_root(); + + trace!("Adding accounts bloom (one-time upgrade)"); + let bloom_journal = { + let mut bloom = Bloom::new(ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET); + // no difference what algorithm is passed, since there will be no writes + let state_db = journaldb::new( + source.clone(), + journaldb::Algorithm::OverlayRecent, + COL_STATE); + let account_trie = try!(TrieDB::new(state_db.as_hashdb(), &state_root).map_err(|e| Error::Custom(format!("Cannot open trie: {:?}", e)))); + for item in try!(account_trie.iter().map_err(|_| Error::MigrationImpossible)) { + let (ref account_key, _) = try!(item.map_err(|_| Error::MigrationImpossible)); + let account_key_hash = H256::from_slice(&account_key); + bloom.set(&*account_key_hash); + } + + bloom.drain_journal() + }; + + trace!(target: "migration", "Generated {} bloom updates", bloom_journal.entries.len()); + + let mut batch = DBTransaction::new(dest); + try!(StateDB::commit_bloom(&mut batch, bloom_journal).map_err(|_| Error::Custom("Failed to commit bloom".to_owned()))); + try!(dest.write(batch)); + + trace!(target: "migration", "Finished bloom update"); + + + Ok(()) +} + +/// Account bloom migration. +#[derive(Default)] +pub struct ToV10 { + progress: Progress, +} + +impl ToV10 { + /// New v10 migration + pub fn new() -> ToV10 { ToV10 { progress: Progress::default() } } +} + +impl Migration for ToV10 { + fn version(&self) -> u32 { + 10 + } + + fn pre_columns(&self) -> Option { Some(5) } + + fn columns(&self) -> Option { Some(6) } + + fn migrate(&mut self, source: Arc, config: &Config, dest: &mut Database, col: Option) -> Result<(), Error> { + let mut batch = Batch::new(config, col); + for (key, value) in source.iter(col) { + self.progress.tick(); + try!(batch.insert(key.to_vec(), value.to_vec(), dest)); + } + try!(batch.commit(dest)); + + if col == COL_STATE { + try!(generate_bloom(source, dest)); + } + + Ok(()) + } +} diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 2074f8174..2150ee226 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -43,6 +43,8 @@ use self::account::Account; use self::block::AbridgedBlock; use self::io::SnapshotWriter; +use super::state_db::StateDB; + use crossbeam::{scope, ScopedJoinHandle}; use rand::{Rng, OsRng}; @@ -454,6 +456,10 @@ impl StateRebuilder { self.code_map.insert(code_hash, code); } + let backing = self.db.backing().clone(); + + // bloom has to be updated + let mut bloom = StateDB::load_bloom(&backing); // batch trie writes { @@ -464,12 +470,14 @@ impl StateRebuilder { }; for (hash, thin_rlp) in pairs { + bloom.set(&*hash); try!(account_trie.insert(&hash, &thin_rlp)); } } - let backing = self.db.backing().clone(); + let bloom_journal = bloom.drain_journal(); let mut batch = backing.transaction(); + try!(StateDB::commit_bloom(&mut batch, bloom_journal)); try!(self.db.inject(&mut batch)); try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); trace!(target: "snapshot", "current state root: {:?}", self.state_root); diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index b133c7181..34f7afff4 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -247,6 +247,7 @@ impl Spec { } trace!(target: "spec", "ensure_db_good: Populated sec trie; root is {}", root); for (address, account) in self.genesis_state.get().iter() { + db.note_account_bloom(address); account.insert_additional(&mut AccountDBMut::new(db.as_hashdb_mut(), address)); } assert!(db.as_hashdb().contains(&self.state_root())); diff --git a/ethcore/src/state/mod.rs b/ethcore/src/state/mod.rs index 0661420ed..a2fe25b91 100644 --- a/ethcore/src/state/mod.rs +++ b/ethcore/src/state/mod.rs @@ -300,7 +300,10 @@ impl State { } } } - + + // check bloom before any requests to trie + if !self.db.check_account_bloom(address) { return H256::zero() } + // account is not found in the global cache, get from the DB and insert into local let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); let maybe_acc = match db.get(address) { @@ -405,6 +408,7 @@ impl State { for (address, ref mut a) in accounts.iter_mut() { match a { &mut&mut AccountEntry::Cached(ref mut account) if account.is_dirty() => { + db.note_account_bloom(&address); let addr_hash = account.address_hash(address); let mut account_db = factories.accountdb.create(db.as_hashdb_mut(), addr_hash); account.commit_storage(&factories.trie, account_db.as_hashdb_mut()); @@ -468,6 +472,7 @@ impl State { pub fn populate_from(&mut self, accounts: PodState) { assert!(self.snapshots.borrow().is_empty()); for (add, acc) in accounts.drain().into_iter() { + self.db.note_account_bloom(&add); self.cache.borrow_mut().insert(add, AccountEntry::Cached(Account::from_pod(acc))); } } @@ -543,6 +548,9 @@ impl State { match result { Some(r) => r, None => { + // first check bloom if it is not in database for sure + if !self.db.check_account_bloom(a) { return f(None); } + // not found in the global cache, get from the DB and insert into local let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); let mut maybe_acc = match db.get(a) { @@ -579,11 +587,17 @@ impl State { Some(Some(acc)) => self.insert_cache(a, AccountEntry::Cached(acc)), Some(None) => self.insert_cache(a, AccountEntry::Missing), None => { - let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); - let maybe_acc = match db.get(a) { - Ok(Some(acc)) => AccountEntry::Cached(Account::from_rlp(acc)), - Ok(None) => AccountEntry::Missing, - Err(e) => panic!("Potential DB corruption encountered: {}", e), + let maybe_acc = if self.db.check_account_bloom(a) { + let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); + let maybe_acc = match db.get(a) { + Ok(Some(acc)) => AccountEntry::Cached(Account::from_rlp(acc)), + Ok(None) => AccountEntry::Missing, + Err(e) => panic!("Potential DB corruption encountered: {}", e), + }; + maybe_acc + } + else { + AccountEntry::Missing }; self.insert_cache(a, maybe_acc); } diff --git a/ethcore/src/state_db.rs b/ethcore/src/state_db.rs index af6780cdc..7a1206801 100644 --- a/ethcore/src/state_db.rs +++ b/ethcore/src/state_db.rs @@ -18,11 +18,19 @@ use lru_cache::LruCache; use util::journaldb::JournalDB; use util::hash::{H256}; use util::hashdb::HashDB; -use util::{Arc, Address, DBTransaction, UtilError, Mutex}; use state::Account; +use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable}; +use bloom_journal::{Bloom, BloomJournal}; +use db::COL_ACCOUNT_BLOOM; +use byteorder::{LittleEndian, ByteOrder}; const STATE_CACHE_ITEMS: usize = 65536; +pub const ACCOUNT_BLOOM_SPACE: usize = 1048576; +pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000; + +pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count"; + struct AccountCache { /// DB Account cache. `None` indicates that account is known to be missing. accounts: LruCache>, @@ -39,22 +47,83 @@ pub struct StateDB { account_cache: Arc>, cache_overlay: Vec<(Address, Option)>, is_canon: bool, + account_bloom: Arc>, } impl StateDB { + /// Create a new instance wrapping `JournalDB` pub fn new(db: Box) -> StateDB { + let bloom = Self::load_bloom(db.backing()); StateDB { db: db, account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(STATE_CACHE_ITEMS) })), cache_overlay: Vec::new(), is_canon: false, + account_bloom: Arc::new(Mutex::new(bloom)), } } + /// Loads accounts bloom from the database + /// This bloom is used to handle request for the non-existant account fast + pub fn load_bloom(db: &Database) -> Bloom { + let hash_count_entry = db.get(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY) + .expect("Low-level database error"); + + if hash_count_entry.is_none() { + return Bloom::new(ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET); + } + let hash_count_bytes = hash_count_entry.unwrap(); + assert_eq!(hash_count_bytes.len(), 1); + let hash_count = hash_count_bytes[0]; + + let mut bloom_parts = vec![0u64; ACCOUNT_BLOOM_SPACE / 8]; + let mut key = [0u8; 8]; + for i in 0..ACCOUNT_BLOOM_SPACE / 8 { + LittleEndian::write_u64(&mut key, i as u64); + bloom_parts[i] = db.get(COL_ACCOUNT_BLOOM, &key).expect("low-level database error") + .and_then(|val| Some(LittleEndian::read_u64(&val[..]))) + .unwrap_or(0u64); + } + + let bloom = Bloom::from_parts(&bloom_parts, hash_count as u32); + trace!(target: "account_bloom", "Bloom is {:?} full, hash functions count = {:?}", bloom.saturation(), hash_count); + bloom + } + + pub fn check_account_bloom(&self, address: &Address) -> bool { + trace!(target: "account_bloom", "Check account bloom: {:?}", address); + let bloom = self.account_bloom.lock(); + bloom.check(&*address.sha3()) + } + + pub fn note_account_bloom(&self, address: &Address) { + trace!(target: "account_bloom", "Note account bloom: {:?}", address); + let mut bloom = self.account_bloom.lock(); + bloom.set(&*address.sha3()); + } + + pub fn commit_bloom(batch: &mut DBTransaction, journal: BloomJournal) -> Result<(), UtilError> { + assert!(journal.hash_functions <= 255); + batch.put(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY, &vec![journal.hash_functions as u8]); + let mut key = [0u8; 8]; + let mut val = [0u8; 8]; + + for (bloom_part_index, bloom_part_value) in journal.entries { + LittleEndian::write_u64(&mut key, bloom_part_index as u64); + LittleEndian::write_u64(&mut val, bloom_part_value); + batch.put(COL_ACCOUNT_BLOOM, &key, &val); + } + Ok(()) + } + /// Commit all recent insert operations and canonical historical commits' removals from the /// old era to the backing database, reverting any non-canonical historical commit's inserts. pub fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + { + let mut bloom_lock = self.account_bloom.lock(); + try!(Self::commit_bloom(batch, bloom_lock.drain_journal())); + } let records = try!(self.db.commit(batch, now, id, end)); if self.is_canon { self.commit_cache(); @@ -81,6 +150,7 @@ impl StateDB { account_cache: self.account_cache.clone(), cache_overlay: Vec::new(), is_canon: false, + account_bloom: self.account_bloom.clone(), } } @@ -91,6 +161,7 @@ impl StateDB { account_cache: self.account_cache.clone(), cache_overlay: Vec::new(), is_canon: true, + account_bloom: self.account_bloom.clone(), } } diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 6504ef8a9..acbf4e641 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -29,6 +29,7 @@ use ethereum; use devtools::*; use miner::Miner; use rlp::{self, RlpStream, Stream}; +use db::COL_STATE; #[cfg(feature = "json-tests")] pub enum ChainEra { @@ -344,7 +345,7 @@ pub fn get_temp_state() -> GuardedTempResult { pub fn get_temp_state_db_in(path: &Path) -> StateDB { let db = new_db(path.to_str().expect("Only valid utf8 paths for tests.")); - let journal_db = journaldb::new(db.clone(), journaldb::Algorithm::EarlyMerge, None); + let journal_db = journaldb::new(db.clone(), journaldb::Algorithm::EarlyMerge, COL_STATE); StateDB::new(journal_db) } diff --git a/parity/migration.rs b/parity/migration.rs index 084ade676..26bb606bc 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -30,7 +30,7 @@ use ethcore::migrations::Extract; /// Database is assumed to be at default version, when no version file is found. const DEFAULT_VERSION: u32 = 5; /// Current version of database models. -const CURRENT_VERSION: u32 = 9; +const CURRENT_VERSION: u32 = 10; /// First version of the consolidated database. const CONSOLIDATION_VERSION: u32 = 9; /// Defines how many items are migrated to the new version of database at once. @@ -144,7 +144,8 @@ pub fn default_migration_settings(compaction_profile: &CompactionProfile) -> Mig /// Migrations on the consolidated database. fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> Result { - let manager = MigrationManager::new(default_migration_settings(compaction_profile)); + let mut manager = MigrationManager::new(default_migration_settings(compaction_profile)); + try!(manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible)); Ok(manager) } diff --git a/util/src/migration/mod.rs b/util/src/migration/mod.rs index cd2b7fae1..80cfa29b6 100644 --- a/util/src/migration/mod.rs +++ b/util/src/migration/mod.rs @@ -115,6 +115,12 @@ impl From<::std::io::Error> for Error { } } +impl From for Error { + fn from(e: String) -> Self { + Error::Custom(e) + } +} + /// A generalized migration from the given db to a destination db. pub trait Migration: 'static { /// Number of columns in the database before the migration. @@ -222,10 +228,12 @@ impl Manager { pub fn execute(&mut self, old_path: &Path, version: u32) -> Result { let config = self.config.clone(); let migrations = self.migrations_from(version); + trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len()); if migrations.is_empty() { return Err(Error::MigrationImpossible) }; - let columns = migrations.iter().find(|m| m.version() == version).and_then(|m| m.pre_columns()); + let columns = migrations.iter().nth(0).and_then(|m| m.pre_columns()); + trace!(target: "migration", "Expecting database to contain {:?} columns", columns); let mut db_config = DatabaseConfig { max_open_files: 64, cache_sizes: Default::default(),