Accounts bloom in master (#2426)

* bloom crate link

* database layout and outdated tests

* state db alterations

* v10 migration run

* using arc

* bloom migration

* migration fixes and mess

* fix tests
This commit is contained in:
Nikolay Volf 2016-10-03 14:02:43 +04:00 committed by Gav Wood
parent 06fe768ac2
commit e1d3b3fff8
14 changed files with 255 additions and 16 deletions

8
Cargo.lock generated
View File

@ -120,6 +120,11 @@ name = "bloomchain"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "byteorder"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bytes"
version = "0.3.0"
@ -270,10 +275,12 @@ version = "1.4.0"
dependencies = [
"bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethash 1.4.0",
"ethcore-bloom-journal 0.1.0",
"ethcore-devtools 1.4.0",
"ethcore-io 1.4.0",
"ethcore-ipc 1.4.0",
@ -1901,6 +1908,7 @@ dependencies = [
"checksum bitflags 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4f67931368edf3a9a51d29886d245f1c3db2f1ef0dcc9e35ff70341b78c10d23"
"checksum blastfig 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09640e0509d97d5cdff03a9f5daf087a8e04c735c3b113a75139634a19cfc7b2"
"checksum bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f421095d2a76fc24cd3fb3f912b90df06be7689912b1bdb423caefae59c258d"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
"checksum bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c129aff112dcc562970abb69e2508b40850dd24c274761bb50fb8a0067ba6c27"
"checksum bytes 0.4.0-dev (git+https://github.com/carllerche/bytes)" = "<none>"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"

View File

@ -38,6 +38,8 @@ ethcore-ipc-nano = { path = "../ipc/nano" }
rlp = { path = "../util/rlp" }
rand = "0.3"
lru-cache = "0.0.7"
ethcore-bloom-journal = { path = "../util/bloom" }
byteorder = "0.5"
[dependencies.hyper]
git = "https://github.com/ethcore/hyper"

View File

@ -25,8 +25,9 @@ use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
use blockchain::TreeRoute;
use client::{
BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID,
TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError
TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError,
};
use db::{NUM_COLUMNS, COL_STATE};
use header::{Header as BlockHeader, BlockNumber};
use filter::Filter;
use log_entry::LocalizedLogEntry;
@ -286,8 +287,8 @@ impl TestBlockChainClient {
pub fn get_temp_state_db() -> GuardedTempResult<StateDB> {
let temp = RandomTempPath::new();
let db = Database::open_default(temp.as_str()).unwrap();
let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, None);
let db = Database::open(&DatabaseConfig::with_columns(NUM_COLUMNS), temp.as_str()).unwrap();
let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, COL_STATE);
let state_db = StateDB::new(journal_db);
GuardedTempResult {
_temp: temp,

View File

@ -34,8 +34,10 @@ pub const COL_BODIES: Option<u32> = Some(2);
pub const COL_EXTRA: Option<u32> = Some(3);
/// Column for Traces
pub const COL_TRACE: Option<u32> = Some(4);
/// Column for Traces
pub const COL_ACCOUNT_BLOOM: Option<u32> = Some(5);
/// Number of columns in DB
pub const NUM_COLUMNS: Option<u32> = Some(5);
pub const NUM_COLUMNS: Option<u32> = Some(6);
/// Modes for updating caches.
#[derive(Clone, Copy)]

View File

@ -99,6 +99,8 @@ extern crate ethcore_devtools as devtools;
extern crate rand;
extern crate bit_set;
extern crate rlp;
extern crate ethcore_bloom_journal as bloom_journal;
extern crate byteorder;
#[macro_use]
extern crate log;

View File

@ -23,3 +23,6 @@ pub mod extras;
mod v9;
pub use self::v9::ToV9;
pub use self::v9::Extract;
mod v10;
pub use self::v10::ToV10;

View File

@ -0,0 +1,117 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Bloom upgrade
use std::sync::Arc;
use db::{COL_EXTRA, COL_HEADERS, COL_STATE};
use state_db::{ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET, StateDB};
use util::trie::TrieDB;
use views::HeaderView;
use bloom_journal::Bloom;
use util::migration::{Error, Migration, Progress, Batch, Config};
use util::journaldb;
use util::{H256, FixedHash, Trie};
use util::{Database, DBTransaction};
/// Account bloom upgrade routine. If bloom already present, does nothing.
/// If database empty (no best block), does nothing.
/// Can be called on upgraded database with no issues (will do nothing).
pub fn generate_bloom(source: Arc<Database>, dest: &mut Database) -> Result<(), Error> {
trace!(target: "migration", "Account bloom upgrade started");
let best_block_hash = match try!(source.get(COL_EXTRA, b"best")) {
// no migration needed
None => {
trace!(target: "migration", "No best block hash, skipping");
return Ok(());
},
Some(hash) => hash,
};
let best_block_header = match try!(source.get(COL_HEADERS, &best_block_hash)) {
// no best block, nothing to do
None => {
trace!(target: "migration", "No best block header, skipping");
return Ok(())
},
Some(x) => x,
};
let state_root = HeaderView::new(&best_block_header).state_root();
trace!("Adding accounts bloom (one-time upgrade)");
let bloom_journal = {
let mut bloom = Bloom::new(ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET);
// no difference what algorithm is passed, since there will be no writes
let state_db = journaldb::new(
source.clone(),
journaldb::Algorithm::OverlayRecent,
COL_STATE);
let account_trie = try!(TrieDB::new(state_db.as_hashdb(), &state_root).map_err(|e| Error::Custom(format!("Cannot open trie: {:?}", e))));
for item in try!(account_trie.iter().map_err(|_| Error::MigrationImpossible)) {
let (ref account_key, _) = try!(item.map_err(|_| Error::MigrationImpossible));
let account_key_hash = H256::from_slice(&account_key);
bloom.set(&*account_key_hash);
}
bloom.drain_journal()
};
trace!(target: "migration", "Generated {} bloom updates", bloom_journal.entries.len());
let mut batch = DBTransaction::new(dest);
try!(StateDB::commit_bloom(&mut batch, bloom_journal).map_err(|_| Error::Custom("Failed to commit bloom".to_owned())));
try!(dest.write(batch));
trace!(target: "migration", "Finished bloom update");
Ok(())
}
/// Account bloom migration.
#[derive(Default)]
pub struct ToV10 {
progress: Progress,
}
impl ToV10 {
/// New v10 migration
pub fn new() -> ToV10 { ToV10 { progress: Progress::default() } }
}
impl Migration for ToV10 {
fn version(&self) -> u32 {
10
}
fn pre_columns(&self) -> Option<u32> { Some(5) }
fn columns(&self) -> Option<u32> { Some(6) }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col) {
self.progress.tick();
try!(batch.insert(key.to_vec(), value.to_vec(), dest));
}
try!(batch.commit(dest));
if col == COL_STATE {
try!(generate_bloom(source, dest));
}
Ok(())
}
}

View File

@ -43,6 +43,8 @@ use self::account::Account;
use self::block::AbridgedBlock;
use self::io::SnapshotWriter;
use super::state_db::StateDB;
use crossbeam::{scope, ScopedJoinHandle};
use rand::{Rng, OsRng};
@ -454,6 +456,10 @@ impl StateRebuilder {
self.code_map.insert(code_hash, code);
}
let backing = self.db.backing().clone();
// bloom has to be updated
let mut bloom = StateDB::load_bloom(&backing);
// batch trie writes
{
@ -464,12 +470,14 @@ impl StateRebuilder {
};
for (hash, thin_rlp) in pairs {
bloom.set(&*hash);
try!(account_trie.insert(&hash, &thin_rlp));
}
}
let backing = self.db.backing().clone();
let bloom_journal = bloom.drain_journal();
let mut batch = backing.transaction();
try!(StateDB::commit_bloom(&mut batch, bloom_journal));
try!(self.db.inject(&mut batch));
try!(backing.write(batch).map_err(::util::UtilError::SimpleString));
trace!(target: "snapshot", "current state root: {:?}", self.state_root);

View File

@ -247,6 +247,7 @@ impl Spec {
}
trace!(target: "spec", "ensure_db_good: Populated sec trie; root is {}", root);
for (address, account) in self.genesis_state.get().iter() {
db.note_account_bloom(address);
account.insert_additional(&mut AccountDBMut::new(db.as_hashdb_mut(), address));
}
assert!(db.as_hashdb().contains(&self.state_root()));

View File

@ -301,6 +301,9 @@ impl State {
}
}
// check bloom before any requests to trie
if !self.db.check_account_bloom(address) { return H256::zero() }
// account is not found in the global cache, get from the DB and insert into local
let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);
let maybe_acc = match db.get(address) {
@ -405,6 +408,7 @@ impl State {
for (address, ref mut a) in accounts.iter_mut() {
match a {
&mut&mut AccountEntry::Cached(ref mut account) if account.is_dirty() => {
db.note_account_bloom(&address);
let addr_hash = account.address_hash(address);
let mut account_db = factories.accountdb.create(db.as_hashdb_mut(), addr_hash);
account.commit_storage(&factories.trie, account_db.as_hashdb_mut());
@ -468,6 +472,7 @@ impl State {
pub fn populate_from(&mut self, accounts: PodState) {
assert!(self.snapshots.borrow().is_empty());
for (add, acc) in accounts.drain().into_iter() {
self.db.note_account_bloom(&add);
self.cache.borrow_mut().insert(add, AccountEntry::Cached(Account::from_pod(acc)));
}
}
@ -543,6 +548,9 @@ impl State {
match result {
Some(r) => r,
None => {
// first check bloom if it is not in database for sure
if !self.db.check_account_bloom(a) { return f(None); }
// not found in the global cache, get from the DB and insert into local
let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);
let mut maybe_acc = match db.get(a) {
@ -579,11 +587,17 @@ impl State {
Some(Some(acc)) => self.insert_cache(a, AccountEntry::Cached(acc)),
Some(None) => self.insert_cache(a, AccountEntry::Missing),
None => {
let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);
let maybe_acc = match db.get(a) {
Ok(Some(acc)) => AccountEntry::Cached(Account::from_rlp(acc)),
Ok(None) => AccountEntry::Missing,
Err(e) => panic!("Potential DB corruption encountered: {}", e),
let maybe_acc = if self.db.check_account_bloom(a) {
let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);
let maybe_acc = match db.get(a) {
Ok(Some(acc)) => AccountEntry::Cached(Account::from_rlp(acc)),
Ok(None) => AccountEntry::Missing,
Err(e) => panic!("Potential DB corruption encountered: {}", e),
};
maybe_acc
}
else {
AccountEntry::Missing
};
self.insert_cache(a, maybe_acc);
}

View File

@ -18,11 +18,19 @@ use lru_cache::LruCache;
use util::journaldb::JournalDB;
use util::hash::{H256};
use util::hashdb::HashDB;
use util::{Arc, Address, DBTransaction, UtilError, Mutex};
use state::Account;
use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable};
use bloom_journal::{Bloom, BloomJournal};
use db::COL_ACCOUNT_BLOOM;
use byteorder::{LittleEndian, ByteOrder};
const STATE_CACHE_ITEMS: usize = 65536;
pub const ACCOUNT_BLOOM_SPACE: usize = 1048576;
pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000;
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
struct AccountCache {
/// DB Account cache. `None` indicates that account is known to be missing.
accounts: LruCache<Address, Option<Account>>,
@ -39,22 +47,83 @@ pub struct StateDB {
account_cache: Arc<Mutex<AccountCache>>,
cache_overlay: Vec<(Address, Option<Account>)>,
is_canon: bool,
account_bloom: Arc<Mutex<Bloom>>,
}
impl StateDB {
/// Create a new instance wrapping `JournalDB`
pub fn new(db: Box<JournalDB>) -> StateDB {
let bloom = Self::load_bloom(db.backing());
StateDB {
db: db,
account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(STATE_CACHE_ITEMS) })),
cache_overlay: Vec::new(),
is_canon: false,
account_bloom: Arc::new(Mutex::new(bloom)),
}
}
/// Loads accounts bloom from the database
/// This bloom is used to handle request for the non-existant account fast
pub fn load_bloom(db: &Database) -> Bloom {
let hash_count_entry = db.get(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY)
.expect("Low-level database error");
if hash_count_entry.is_none() {
return Bloom::new(ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET);
}
let hash_count_bytes = hash_count_entry.unwrap();
assert_eq!(hash_count_bytes.len(), 1);
let hash_count = hash_count_bytes[0];
let mut bloom_parts = vec![0u64; ACCOUNT_BLOOM_SPACE / 8];
let mut key = [0u8; 8];
for i in 0..ACCOUNT_BLOOM_SPACE / 8 {
LittleEndian::write_u64(&mut key, i as u64);
bloom_parts[i] = db.get(COL_ACCOUNT_BLOOM, &key).expect("low-level database error")
.and_then(|val| Some(LittleEndian::read_u64(&val[..])))
.unwrap_or(0u64);
}
let bloom = Bloom::from_parts(&bloom_parts, hash_count as u32);
trace!(target: "account_bloom", "Bloom is {:?} full, hash functions count = {:?}", bloom.saturation(), hash_count);
bloom
}
pub fn check_account_bloom(&self, address: &Address) -> bool {
trace!(target: "account_bloom", "Check account bloom: {:?}", address);
let bloom = self.account_bloom.lock();
bloom.check(&*address.sha3())
}
pub fn note_account_bloom(&self, address: &Address) {
trace!(target: "account_bloom", "Note account bloom: {:?}", address);
let mut bloom = self.account_bloom.lock();
bloom.set(&*address.sha3());
}
pub fn commit_bloom(batch: &mut DBTransaction, journal: BloomJournal) -> Result<(), UtilError> {
assert!(journal.hash_functions <= 255);
batch.put(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY, &vec![journal.hash_functions as u8]);
let mut key = [0u8; 8];
let mut val = [0u8; 8];
for (bloom_part_index, bloom_part_value) in journal.entries {
LittleEndian::write_u64(&mut key, bloom_part_index as u64);
LittleEndian::write_u64(&mut val, bloom_part_value);
batch.put(COL_ACCOUNT_BLOOM, &key, &val);
}
Ok(())
}
/// Commit all recent insert operations and canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
pub fn commit(&mut self, batch: &mut DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
{
let mut bloom_lock = self.account_bloom.lock();
try!(Self::commit_bloom(batch, bloom_lock.drain_journal()));
}
let records = try!(self.db.commit(batch, now, id, end));
if self.is_canon {
self.commit_cache();
@ -81,6 +150,7 @@ impl StateDB {
account_cache: self.account_cache.clone(),
cache_overlay: Vec::new(),
is_canon: false,
account_bloom: self.account_bloom.clone(),
}
}
@ -91,6 +161,7 @@ impl StateDB {
account_cache: self.account_cache.clone(),
cache_overlay: Vec::new(),
is_canon: true,
account_bloom: self.account_bloom.clone(),
}
}

View File

@ -29,6 +29,7 @@ use ethereum;
use devtools::*;
use miner::Miner;
use rlp::{self, RlpStream, Stream};
use db::COL_STATE;
#[cfg(feature = "json-tests")]
pub enum ChainEra {
@ -344,7 +345,7 @@ pub fn get_temp_state() -> GuardedTempResult<State> {
pub fn get_temp_state_db_in(path: &Path) -> StateDB {
let db = new_db(path.to_str().expect("Only valid utf8 paths for tests."));
let journal_db = journaldb::new(db.clone(), journaldb::Algorithm::EarlyMerge, None);
let journal_db = journaldb::new(db.clone(), journaldb::Algorithm::EarlyMerge, COL_STATE);
StateDB::new(journal_db)
}

View File

@ -30,7 +30,7 @@ use ethcore::migrations::Extract;
/// Database is assumed to be at default version, when no version file is found.
const DEFAULT_VERSION: u32 = 5;
/// Current version of database models.
const CURRENT_VERSION: u32 = 9;
const CURRENT_VERSION: u32 = 10;
/// First version of the consolidated database.
const CONSOLIDATION_VERSION: u32 = 9;
/// Defines how many items are migrated to the new version of database at once.
@ -144,7 +144,8 @@ pub fn default_migration_settings(compaction_profile: &CompactionProfile) -> Mig
/// Migrations on the consolidated database.
fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> Result<MigrationManager, Error> {
let manager = MigrationManager::new(default_migration_settings(compaction_profile));
let mut manager = MigrationManager::new(default_migration_settings(compaction_profile));
try!(manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible));
Ok(manager)
}

View File

@ -115,6 +115,12 @@ impl From<::std::io::Error> for Error {
}
}
impl From<String> for Error {
fn from(e: String) -> Self {
Error::Custom(e)
}
}
/// A generalized migration from the given db to a destination db.
pub trait Migration: 'static {
/// Number of columns in the database before the migration.
@ -222,10 +228,12 @@ impl Manager {
pub fn execute(&mut self, old_path: &Path, version: u32) -> Result<PathBuf, Error> {
let config = self.config.clone();
let migrations = self.migrations_from(version);
trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
if migrations.is_empty() { return Err(Error::MigrationImpossible) };
let columns = migrations.iter().find(|m| m.version() == version).and_then(|m| m.pre_columns());
let columns = migrations.iter().nth(0).and_then(|m| m.pre_columns());
trace!(target: "migration", "Expecting database to contain {:?} columns", columns);
let mut db_config = DatabaseConfig {
max_open_files: 64,
cache_sizes: Default::default(),