Remove accounts bloom (#11589)

This commit is contained in:
David 2020-04-22 10:04:18 +02:00 committed by GitHub
parent b7dd06b1ff
commit c85300ca6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 76 additions and 462 deletions

19
Cargo.lock generated
View File

@ -40,13 +40,6 @@ dependencies = [
"trie-vm-factories",
]
[[package]]
name = "accounts-bloom"
version = "0.1.0"
dependencies = [
"siphasher",
]
[[package]]
name = "aes"
version = "0.3.2"
@ -4558,12 +4551,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2"
[[package]]
name = "siphasher"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e88f89a550c01e4cd809f3df4f52dc9e939f3273a2017eabd5c6d12fd98bb23"
[[package]]
name = "slab"
version = "0.4.2"
@ -4590,8 +4577,6 @@ name = "snapshot"
version = "0.1.0"
dependencies = [
"account-db",
"account-state",
"accounts-bloom",
"client-traits",
"common-types",
"criterion",
@ -4629,7 +4614,6 @@ dependencies = [
"scopeguard 1.1.0",
"snapshot-tests",
"spec",
"state-db",
"tempfile",
"trie-db",
"trie-standardmap",
@ -4747,15 +4731,12 @@ name = "state-db"
version = "0.1.0"
dependencies = [
"account-state",
"accounts-bloom",
"common-types",
"env_logger 0.5.13",
"ethcore",
"ethcore-db",
"ethereum-types",
"hash-db",
"journaldb",
"keccak-hash",
"keccak-hasher 0.1.1",
"kvdb",
"log",

View File

@ -62,13 +62,6 @@ pub trait Backend: Send {
/// Get cached code based on hash.
fn get_cached_code(&self, hash: &H256) -> Option<Arc<Vec<u8>>>;
/// Note that an account with the given address is non-null.
fn note_non_null_account(&self, address: &Address);
/// Check whether an account is known to be empty. Returns true if known to be
/// empty, false otherwise.
fn is_known_null(&self, address: &Address) -> bool;
}
/// A raw backend used to check proofs of execution.
@ -126,8 +119,6 @@ impl Backend for ProofCheck {
None
}
fn get_cached_code(&self, _hash: &H256) -> Option<Arc<Vec<u8>>> { None }
fn note_non_null_account(&self, _address: &Address) {}
fn is_known_null(&self, _address: &Address) -> bool { false }
}
/// Proving state backend.
@ -195,8 +186,6 @@ impl<H: AsHashDB<KeccakHasher, DBValue> + Send + Sync> Backend for Proving<H> {
}
fn get_cached_code(&self, _: &H256) -> Option<Arc<Vec<u8>>> { None }
fn note_non_null_account(&self, _: &Address) { }
fn is_known_null(&self, _: &Address) -> bool { false }
}
impl<H: AsHashDB<KeccakHasher, DBValue>> Proving<H> {
@ -253,6 +242,4 @@ impl<H: AsHashDB<KeccakHasher, DBValue> + Send + Sync> Backend for Basic<H> {
}
fn get_cached_code(&self, _: &H256) -> Option<Arc<Vec<u8>>> { None }
fn note_non_null_account(&self, _: &Address) { }
fn is_known_null(&self, _: &Address) -> bool { false }
}

View File

@ -411,48 +411,48 @@ impl<B: Backend> State<B> {
pub fn exists(&self, a: &Address) -> TrieResult<bool> {
// Bloom filter does not contain empty accounts, so it is important here to
// check if account exists in the database directly before EIP-161 is in effect.
self.ensure_cached(a, RequireCache::None, false, |a| a.is_some())
self.ensure_cached(a, RequireCache::None, |a| a.is_some())
}
/// Determine whether an account exists and if not empty.
pub fn exists_and_not_null(&self, a: &Address) -> TrieResult<bool> {
self.ensure_cached(a, RequireCache::None, false, |a| a.map_or(false, |a| !a.is_null()))
self.ensure_cached(a, RequireCache::None, |a| a.map_or(false, |a| !a.is_null()))
}
/// Determine whether an account exists and has code or non-zero nonce.
pub fn exists_and_has_code_or_nonce(&self, a: &Address) -> TrieResult<bool> {
self.ensure_cached(a, RequireCache::CodeSize, false,
self.ensure_cached(a, RequireCache::CodeSize,
|a| a.map_or(false, |a| a.code_hash() != KECCAK_EMPTY || *a.nonce() != self.account_start_nonce))
}
/// Get the balance of account `a`.
pub fn balance(&self, a: &Address) -> TrieResult<U256> {
self.ensure_cached(a, RequireCache::None, true,
self.ensure_cached(a, RequireCache::None,
|a| a.as_ref().map_or(U256::zero(), |account| *account.balance()))
}
/// Get the nonce of account `a`.
pub fn nonce(&self, a: &Address) -> TrieResult<U256> {
self.ensure_cached(a, RequireCache::None, true,
self.ensure_cached(a, RequireCache::None,
|a| a.map_or(self.account_start_nonce, |account| *account.nonce()))
}
/// Whether the base storage root of an account remains unchanged.
pub fn is_base_storage_root_unchanged(&self, a: &Address) -> TrieResult<bool> {
Ok(self.ensure_cached(a, RequireCache::None, true,
Ok(self.ensure_cached(a, RequireCache::None,
|a| a.as_ref().map(|account| account.is_base_storage_root_unchanged()))?
.unwrap_or(true))
}
/// Get the storage root of account `a`.
pub fn storage_root(&self, a: &Address) -> TrieResult<Option<H256>> {
self.ensure_cached(a, RequireCache::None, true,
self.ensure_cached(a, RequireCache::None,
|a| a.as_ref().and_then(|account| account.storage_root()))
}
/// Get the original storage root since last commit of account `a`.
pub fn original_storage_root(&self, a: &Address) -> TrieResult<H256> {
Ok(self.ensure_cached(a, RequireCache::None, true,
Ok(self.ensure_cached(a, RequireCache::None,
|a| a.as_ref().map(|account| account.original_storage_root()))?
.unwrap_or(KECCAK_NULL_RLP))
}
@ -579,9 +579,6 @@ impl<B: Backend> State<B> {
}
}
// check if the account could exist before any requests to trie
if self.db.is_known_null(address) { return Ok(H256::zero()) }
// account is not found in the global cache, get from the DB and insert into local
let db = &self.db.as_hash_db();
let db = self.factories.trie.readonly(db, &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);
@ -617,25 +614,25 @@ impl<B: Backend> State<B> {
/// Get accounts' code.
pub fn code(&self, a: &Address) -> TrieResult<Option<Arc<Bytes>>> {
self.ensure_cached(a, RequireCache::Code, true,
self.ensure_cached(a, RequireCache::Code,
|a| a.as_ref().map_or(None, |a| a.code().clone()))
}
/// Get an account's code hash.
pub fn code_hash(&self, a: &Address) -> TrieResult<Option<H256>> {
self.ensure_cached(a, RequireCache::None, true,
self.ensure_cached(a, RequireCache::None,
|a| a.as_ref().map(|a| a.code_hash()))
}
/// Get an account's code version.
pub fn code_version(&self, a: &Address) -> TrieResult<U256> {
self.ensure_cached(a, RequireCache::None, true,
self.ensure_cached(a, RequireCache::None,
|a| a.as_ref().map(|a| *a.code_version()).unwrap_or(U256::zero()))
}
/// Get accounts' code size.
pub fn code_size(&self, a: &Address) -> TrieResult<Option<usize>> {
self.ensure_cached(a, RequireCache::CodeSize, true,
self.ensure_cached(a, RequireCache::CodeSize,
|a| a.as_ref().and_then(|a| a.code_size()))
}
@ -719,9 +716,6 @@ impl<B: Backend> State<B> {
account.commit_storage(&self.factories.trie, account_db.as_hash_db_mut())?;
account.commit_code(account_db.as_hash_db_mut());
}
if !account.is_empty() {
self.db.note_non_null_account(address);
}
}
}
@ -877,7 +871,7 @@ impl<B: Backend> State<B> {
Ok(PodState::from(all_addresses.into_iter().fold(Ok(BTreeMap::new()), |m: TrieResult<_>, address| {
let mut m = m?;
let account = self.ensure_cached(&address, RequireCache::Code, true, |acc| {
let account = self.ensure_cached(&address, RequireCache::Code, |acc| {
acc.map(|acc| {
// Merge all modified storage keys.
let all_keys = {
@ -966,7 +960,7 @@ impl<B: Backend> State<B> {
/// Check caches for required data
/// First searches for account in the local, then the shared cache.
/// Populates local cache if nothing found.
fn ensure_cached<F, U>(&self, a: &Address, require: RequireCache, check_null: bool, f: F) -> TrieResult<U>
fn ensure_cached<F, U>(&self, a: &Address, require: RequireCache, f: F) -> TrieResult<U>
where F: Fn(Option<&Account>) -> U {
// check local cache first
if let Some(ref mut maybe_acc) = self.cache.borrow_mut().get_mut(a) {
@ -993,9 +987,6 @@ impl<B: Backend> State<B> {
match result {
Some(r) => Ok(r?),
None => {
// first check if it is not in database for sure
if check_null && self.db.is_known_null(a) { return Ok(f(None)); }
// not found in the global cache, get from the DB and insert into local
let db = &self.db.as_hash_db();
let db = self.factories.trie.readonly(db, &self.root)?;
@ -1029,15 +1020,11 @@ impl<B: Backend> State<B> {
match self.db.get_cached_account(a) {
Some(acc) => self.insert_cache(a, AccountEntry::new_clean_cached(acc)),
None => {
let maybe_acc = if !self.db.is_known_null(a) {
let db = &self.db.as_hash_db();
let db = self.factories.trie.readonly(db, &self.root)?;
let from_rlp = |b:&[u8]| { Account::from_rlp(b).expect("decoding db value failed") };
AccountEntry::new_clean(db.get_with(a.as_bytes(), from_rlp)?)
} else {
AccountEntry::new_clean(None)
};
self.insert_cache(a, maybe_acc);
let db = &self.db.as_hash_db();
let db = self.factories.trie.readonly(db, &self.root)?;
let from_rlp = |b:&[u8]| { Account::from_rlp(b).expect("decoding db value failed") };
let maybe_account = db.get_with(a.as_bytes(), from_rlp)?;
self.insert_cache(a, AccountEntry::new_clean(maybe_account));
}
}
}

View File

@ -35,7 +35,8 @@ pub const COL_BODIES: u32 = 2;
pub const COL_EXTRA: u32 = 3;
/// Column for Traces
pub const COL_TRACE: u32 = 4;
/// Column for the empty accounts bloom filter.
/// Column for the accounts existence bloom filter.
#[deprecated(since = "3.0.0", note = "Accounts bloom column is deprecated")]
pub const COL_ACCOUNT_BLOOM: u32 = 5;
/// Column for general information from the local node which can persist.
pub const COL_NODE_INFO: u32 = 6;

View File

@ -12,9 +12,7 @@ harness = false
[dependencies]
account-db = { path = "../account-db" }
account-state = { path = "../account-state" }
blockchain = { package = "ethcore-blockchain", path = "../blockchain" }
bloom-journal = { package = "accounts-bloom", path = "../../util/bloom" }
bytes = { package = "parity-bytes", version = "0.1.0" }
client-traits = { path = "../client-traits" }
common-types = { path = "../types" }
@ -39,7 +37,6 @@ rlp = "0.4.5"
rlp-derive = "0.1"
scopeguard = "1.1.0"
snappy = { package = "parity-snappy", version ="0.1.0" }
state-db = { path = "../state-db" }
trie-db = "0.20.0"
triehash = { package = "triehash-ethereum", version = "0.2", path = "../../util/triehash-ethereum" }

View File

@ -27,9 +27,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use keccak_hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
use account_db::{AccountDB, AccountDBMut};
use account_state::Account as StateAccount;
use blockchain::{BlockChain, BlockProvider};
use bloom_journal::Bloom;
use bytes::Bytes;
use common_types::{
ids::BlockId,
@ -39,7 +37,7 @@ use common_types::{
};
use crossbeam_utils::thread;
use engine::Engine;
use ethereum_types::{H256, U256};
use ethereum_types::H256;
use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB;
use journaldb::{self, Algorithm, JournalDB};
@ -51,7 +49,6 @@ use num_cpus;
use rand::{Rng, rngs::OsRng};
use rlp::{RlpStream, Rlp};
use snappy;
use state_db::StateDB;
use trie_db::{Trie, TrieMut};
pub use self::consensus::*;
@ -378,7 +375,6 @@ pub struct StateRebuilder {
state_root: H256,
known_code: HashMap<H256, H256>, // code hashes mapped to first account with this code.
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
bloom: Bloom,
known_storage_roots: HashMap<H256, H256>, // maps account hashes to last known storage root. Only filled for last account per chunk.
}
@ -390,7 +386,6 @@ impl StateRebuilder {
state_root: KECCAK_NULL_RLP,
known_code: HashMap::new(),
missing_code: HashMap::new(),
bloom: StateDB::load_bloom(&*db),
known_storage_roots: HashMap::new(),
}
}
@ -398,7 +393,6 @@ impl StateRebuilder {
/// Feed an uncompressed state chunk into the rebuilder.
pub fn feed(&mut self, chunk: &[u8], flag: &AtomicBool) -> Result<(), EthcoreError> {
let rlp = Rlp::new(chunk);
let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp();
let mut pairs = Vec::with_capacity(rlp.item_count()?);
// initialize the pairs vector with empty values so we have slots to write into.
@ -427,8 +421,6 @@ impl StateRebuilder {
self.known_code.insert(code_hash, first_with);
}
let backing = self.db.backing().clone();
// batch trie writes
{
let mut account_trie = if self.state_root != KECCAK_NULL_RLP {
@ -439,19 +431,10 @@ impl StateRebuilder {
for (hash, thin_rlp) in pairs {
if !flag.load(Ordering::SeqCst) { return Err(Error::RestorationAborted.into()) }
if &thin_rlp[..] != &empty_rlp[..] {
self.bloom.set(hash.as_bytes());
}
account_trie.insert(hash.as_bytes(), &thin_rlp)?;
}
}
let bloom_journal = self.bloom.drain_journal();
let mut batch = backing.transaction();
StateDB::commit_bloom(&mut batch, bloom_journal)?;
self.db.inject(&mut batch)?;
backing.write_buffered(batch);
Ok(())
}

View File

@ -121,7 +121,6 @@ fn run_constructors<T: Backend>(
}
for (address, account) in genesis_state.get().iter() {
db.note_non_null_account(address);
account.insert_additional(
&mut *factories.accountdb.create(
db.as_hash_db_mut(),

View File

@ -8,12 +8,9 @@ edition = "2018"
[dependencies]
account-state = { path = "../account-state" }
bloom_journal = { package = "accounts-bloom", path = "../../util/bloom" }
common-types = { path = "../types"}
ethcore-db = { path = "../db" }
ethereum-types = "0.9.0"
hash-db = "0.15.0"
keccak-hash = "0.5.0"
keccak-hasher = { path = "../../util/keccak-hasher" }
journaldb = { path = "../../util/journaldb" }
kvdb = "0.5.0"

View File

@ -22,33 +22,17 @@ use std::sync::Arc;
use ethereum_types::{Address, H256};
use hash_db::HashDB;
use keccak_hash::keccak;
use kvdb::{DBTransaction, DBValue, KeyValueDB};
use kvdb::{DBTransaction, DBValue};
use log::trace;
use lru_cache::LruCache;
use parking_lot::Mutex;
use account_state::{self, Account};
use bloom_journal::{Bloom, BloomJournal};
use common_types::BlockNumber;
use ethcore_db::COL_ACCOUNT_BLOOM;
use journaldb::JournalDB;
use keccak_hasher::KeccakHasher;
use memory_cache::MemoryLruCache;
/// Value used to initialize bloom bitmap size.
///
/// Bitmap size is the size in bytes (not bits) that will be allocated in memory.
pub const ACCOUNT_BLOOM_SPACE: usize = 1048576;
/// Value used to initialize bloom items count.
///
/// Items count is an estimation of the maximum number of items to store.
pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000;
/// Key for a value storing amount of hashes
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
const STATE_CACHE_BLOCKS: usize = 12;
// The percentage of supplied cache size to go to accounts.
@ -114,8 +98,6 @@ pub struct StateDB {
code_cache: Arc<Mutex<MemoryLruCache<H256, Arc<Vec<u8>>>>>,
/// Local dirty cache.
local_cache: Vec<CacheQueueItem>,
/// Shared account bloom. Does not handle chain reorganizations.
account_bloom: Arc<Mutex<Bloom>>,
cache_size: usize,
/// Hash of the block on top of which this instance was created or
/// `None` if cache is disabled
@ -138,7 +120,6 @@ impl StateDB {
// TODO: make the cache size actually accurate by moving the account storage cache
// into the `AccountCache` structure as its own `LruCache<(Address, H256), H256>`.
pub fn new(db: Box<dyn JournalDB>, cache_size: usize) -> StateDB {
let bloom = Self::load_bloom(&**db.backing());
let acc_cache_size = cache_size * ACCOUNT_CACHE_RATIO / 100;
let code_cache_size = cache_size - acc_cache_size;
let cache_items = acc_cache_size / ::std::mem::size_of::<Option<Account>>();
@ -151,7 +132,6 @@ impl StateDB {
})),
code_cache: Arc::new(Mutex::new(MemoryLruCache::new(code_cache_size))),
local_cache: Vec::new(),
account_bloom: Arc::new(Mutex::new(bloom)),
cache_size,
parent_hash: None,
commit_hash: None,
@ -159,57 +139,8 @@ impl StateDB {
}
}
/// Loads accounts bloom from the database
/// This bloom is used to handle request for the non-existent account fast
pub fn load_bloom(db: &dyn KeyValueDB) -> Bloom {
let hash_count_entry = db.get(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY)
.expect("Low-level database error");
let hash_count_bytes = match hash_count_entry {
Some(bytes) => bytes,
None => return Bloom::new(ACCOUNT_BLOOM_SPACE, DEFAULT_ACCOUNT_PRESET),
};
assert_eq!(hash_count_bytes.len(), 1);
let hash_count = hash_count_bytes[0];
let mut bloom_parts = vec![0u64; ACCOUNT_BLOOM_SPACE / 8];
for i in 0..ACCOUNT_BLOOM_SPACE / 8 {
let key: [u8; 8] = (i as u64).to_le_bytes();
bloom_parts[i] = db.get(COL_ACCOUNT_BLOOM, &key).expect("low-level database error")
.map(|val| {
assert_eq!(val.len(), 8, "low-level database error");
let mut buff = [0u8; 8];
buff.copy_from_slice(&*val);
u64::from_le_bytes(buff)
})
.unwrap_or(0u64);
}
let bloom = Bloom::from_parts(&bloom_parts, hash_count as u32);
trace!(target: "account_bloom", "Bloom is {:?} full, hash functions count = {:?}", bloom.saturation(), hash_count);
bloom
}
/// Commit blooms journal to the database transaction
pub fn commit_bloom(batch: &mut DBTransaction, journal: BloomJournal) -> io::Result<()> {
assert!(journal.hash_functions <= 255);
batch.put(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY, &[journal.hash_functions as u8]);
for (bloom_part_index, bloom_part_value) in journal.entries {
let key: [u8; 8] = (bloom_part_index as u64).to_le_bytes();
let val: [u8; 8] = bloom_part_value.to_le_bytes();
batch.put(COL_ACCOUNT_BLOOM, &key, &val);
}
Ok(())
}
/// Journal all recent operations under the given era and ID.
pub fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> io::Result<u32> {
{
let mut bloom_lock = self.account_bloom.lock();
Self::commit_bloom(batch, bloom_lock.drain_journal())?;
}
let records = self.db.journal_under(batch, now, id)?;
self.commit_hash = Some(id.clone());
self.commit_number = Some(now);
@ -336,7 +267,6 @@ impl StateDB {
account_cache: self.account_cache.clone(),
code_cache: self.code_cache.clone(),
local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
cache_size: self.cache_size,
parent_hash: None,
commit_hash: None,
@ -351,7 +281,6 @@ impl StateDB {
account_cache: self.account_cache.clone(),
code_cache: self.code_cache.clone(),
local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
cache_size: self.cache_size,
parent_hash: Some(parent.clone()),
commit_hash: None,
@ -461,19 +390,6 @@ impl account_state::Backend for StateDB {
cache.get_mut(hash).map(|code| code.clone())
}
fn note_non_null_account(&self, address: &Address) {
trace!(target: "account_bloom", "Note account bloom: {:?}", address);
let mut bloom = self.account_bloom.lock();
bloom.set(keccak(address).as_bytes());
}
fn is_known_null(&self, address: &Address) -> bool {
trace!(target: "account_bloom", "Check account bloom: {:?}", address);
let bloom = self.account_bloom.lock();
let is_null = !bloom.check(keccak(address).as_bytes());
is_null
}
}
/// Sync wrapper for the account.

View File

@ -18,9 +18,10 @@ use std::fs;
use std::io::{Read, Write, Error as IoError, ErrorKind};
use std::path::{Path, PathBuf};
use std::fmt::{Display, Formatter, Error as FmtError};
use super::migration_rocksdb::{Manager as MigrationManager, Config as MigrationConfig, ChangeColumns};
use super::migration_rocksdb::{Manager as MigrationManager, Config as MigrationConfig, ChangeColumns, VacuumAccountsBloom};
use super::kvdb_rocksdb::{CompactionProfile, DatabaseConfig};
use ethcore::client::DatabaseCompactionProfile;
use ethcore_db::NUM_COLUMNS;
use types::errors::EthcoreError;
use super::helpers;
@ -50,11 +51,20 @@ pub const TO_V14: ChangeColumns = ChangeColumns {
version: 14,
};
/// The migration from v14 to v15.
/// Removes all entries from the COL_ACCOUNTS_BLOOM column
/// NOTE: column 5 is still there, but has no data.
pub const TO_V15: VacuumAccountsBloom = VacuumAccountsBloom {
column_to_vacuum: 5,
columns: NUM_COLUMNS,
version: 15,
};
/// Database is assumed to be at default version, when no version file is found.
const DEFAULT_VERSION: u32 = 5;
/// Current version of database models.
const CURRENT_VERSION: u32 = 14;
/// A version of database at which blooms-db was introduced
const CURRENT_VERSION: u32 = 15;
/// A version of database at which blooms-db was introduced for header and trace blooms.
const BLOOMS_DB_VERSION: u32 = 13;
/// Defines how many items are migrated to the new version of database at once.
const BATCH_SIZE: usize = 1024;
@ -156,6 +166,7 @@ fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> R
manager.add_migration(TO_V11).map_err(|_| Error::MigrationImpossible)?;
manager.add_migration(TO_V12).map_err(|_| Error::MigrationImpossible)?;
manager.add_migration(TO_V14).map_err(|_| Error::MigrationImpossible)?;
manager.add_migration(TO_V15).map_err(|_| Error::MigrationImpossible)?;
Ok(manager)
}

View File

@ -41,7 +41,9 @@ pub use self::migration::migrate;
struct AppDB {
key_value: Arc<dyn KeyValueDB>,
// Header bloom
blooms: blooms_db::Database,
// Trace bloom
trace_blooms: blooms_db::Database,
}

View File

@ -1,13 +0,0 @@
[package]
name = "accounts-bloom"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Journaling bloom filter"
license = "GPL3"
edition = "2018"
[lib]
path = "src/lib.rs"
[dependencies]
siphasher = "0.3"

View File

@ -1,270 +0,0 @@
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of Open Ethereum.
// Open Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Open Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Open Ethereum. If not, see <http://www.gnu.org/licenses/>.
use std::{cmp, mem, f64};
use std::hash::{Hash, Hasher};
use std::collections::HashSet;
use siphasher::sip::SipHasher;
/// BitVec structure with journalling
/// Every time any of the blocks is getting set it's index is tracked
/// and can be then drained by `drain` method
struct BitVecJournal {
elems: Vec<u64>,
journal: HashSet<usize>,
}
impl BitVecJournal {
pub fn new(size: usize) -> BitVecJournal {
let extra = if size % 64 > 0 { 1 } else { 0 };
BitVecJournal {
elems: vec![0u64; size / 64 + extra],
journal: HashSet::new(),
}
}
pub fn from_parts(parts: &[u64]) -> BitVecJournal {
BitVecJournal {
elems: parts.to_vec(),
journal: HashSet::new(),
}
}
pub fn set(&mut self, index: usize) {
let e_index = index / 64;
let bit_index = index % 64;
let val = self.elems.get_mut(e_index).unwrap();
*val |= 1u64 << bit_index;
self.journal.insert(e_index);
}
pub fn get(&self, index: usize) -> bool {
let e_index = index / 64;
let bit_index = index % 64;
self.elems[e_index] & (1 << bit_index) != 0
}
pub fn drain(&mut self) -> Vec<(usize, u64)> {
let journal = mem::replace(&mut self.journal, HashSet::new()).into_iter();
journal.map(|idx| (idx, self.elems[idx])).collect::<Vec<(usize, u64)>>()
}
pub fn saturation(&self) -> f64 {
self.elems.iter().fold(0u64, |acc, e| acc + e.count_ones() as u64) as f64 / (self.elems.len() * 64) as f64
}
}
/// Bloom filter structure
pub struct Bloom {
bitmap: BitVecJournal,
bitmap_bits: u64,
k_num: u32,
}
impl Bloom {
/// Create a new bloom filter structure.
/// bitmap_size is the size in bytes (not bits) that will be allocated in memory
/// items_count is an estimation of the maximum number of items to store.
pub fn new(bitmap_size: usize, items_count: usize) -> Bloom {
assert!(bitmap_size > 0 && items_count > 0);
let bitmap_bits = (bitmap_size as u64) * 8u64;
let k_num = Bloom::optimal_k_num(bitmap_bits, items_count);
let bitmap = BitVecJournal::new(bitmap_bits as usize);
Bloom {
bitmap,
bitmap_bits,
k_num,
}
}
/// Initializes bloom filter from saved state
pub fn from_parts(parts: &[u64], k_num: u32) -> Bloom {
let bitmap_size = parts.len() * 8;
let bitmap_bits = (bitmap_size as u64) * 8u64;
let bitmap = BitVecJournal::from_parts(parts);
Bloom {
bitmap,
bitmap_bits,
k_num,
}
}
/// Create a new bloom filter structure.
/// items_count is an estimation of the maximum number of items to store.
/// fp_p is the wanted rate of false positives, in ]0.0, 1.0[
pub fn new_for_fp_rate(items_count: usize, fp_p: f64) -> Bloom {
let bitmap_size = Bloom::compute_bitmap_size(items_count, fp_p);
Bloom::new(bitmap_size, items_count)
}
/// Compute a recommended bitmap size for items_count items
/// and a fp_p rate of false positives.
/// fp_p obviously has to be within the ]0.0, 1.0[ range.
pub fn compute_bitmap_size(items_count: usize, fp_p: f64) -> usize {
assert!(items_count > 0);
assert!(fp_p > 0.0 && fp_p < 1.0);
let log2 = f64::consts::LN_2;
let log2_2 = log2 * log2;
((items_count as f64) * f64::ln(fp_p) / (-8.0 * log2_2)).ceil() as usize
}
/// Records the presence of an item.
pub fn set<T>(&mut self, item: T)
where T: Hash
{
let base_hash = Bloom::sip_hash(&item);
for k_i in 0..self.k_num {
let bit_offset = (Bloom::bloom_hash(base_hash, k_i) % self.bitmap_bits) as usize;
self.bitmap.set(bit_offset);
}
}
/// Check if an item is present in the set.
/// There can be false positives, but no false negatives.
pub fn check<T>(&self, item: T) -> bool
where T: Hash
{
let base_hash = Bloom::sip_hash(&item);
for k_i in 0..self.k_num {
let bit_offset = (Bloom::bloom_hash(base_hash, k_i) % self.bitmap_bits) as usize;
if !self.bitmap.get(bit_offset) {
return false;
}
}
true
}
/// Return the number of bits in the filter
pub fn number_of_bits(&self) -> u64 {
self.bitmap_bits
}
/// Return the number of hash functions used for `check` and `set`
pub fn number_of_hash_functions(&self) -> u32 {
self.k_num
}
fn optimal_k_num(bitmap_bits: u64, items_count: usize) -> u32 {
let m = bitmap_bits as f64;
let n = items_count as f64;
let k_num = (m / n * f64::ln(2.0f64)).ceil() as u32;
cmp::max(k_num, 1)
}
fn sip_hash<T>(item: &T) -> u64
where T: Hash
{
let mut sip = SipHasher::new();
item.hash(&mut sip);
sip.finish()
}
fn bloom_hash(base_hash: u64, k_i: u32) -> u64 {
if k_i < 2 {
base_hash
} else {
base_hash.wrapping_add((k_i as u64).wrapping_mul(base_hash) % 0xffff_ffff_ffff_ffc5)
}
}
/// Drains the bloom journal returning the updated bloom part
pub fn drain_journal(&mut self) -> BloomJournal {
BloomJournal {
entries: self.bitmap.drain(),
hash_functions: self.k_num,
}
}
/// Returns the ratio of set bits in the bloom filter to the total bits
pub fn saturation(&self) -> f64 {
self.bitmap.saturation()
}
}
/// Bloom journal
/// Returns the tuple of (bloom part index, bloom part value) where each one is representing
/// an index of bloom parts that was updated since the last drain
pub struct BloomJournal {
pub hash_functions: u32,
pub entries: Vec<(usize, u64)>,
}
#[cfg(test)]
mod tests {
use super::Bloom;
use std::collections::HashSet;
#[test]
fn get_set() {
let mut bloom = Bloom::new(10, 80);
let key = vec![115u8, 99];
assert!(!bloom.check(&key));
bloom.set(&key);
assert!(bloom.check(&key));
}
#[test]
fn journalling() {
let initial = vec![0u64; 8];
let mut bloom = Bloom::from_parts(&initial, 3);
bloom.set(&vec![5u8, 4]);
let drain = bloom.drain_journal();
assert_eq!(2, drain.entries.len())
}
#[test]
fn saturation() {
let initial = vec![0u64; 8];
let mut bloom = Bloom::from_parts(&initial, 3);
bloom.set(&vec![5u8, 4]);
let full = bloom.saturation();
// 2/8/64 = 0.00390625
assert!(full >= 0.0039f64 && full <= 0.004f64);
}
#[test]
fn hash_backward_compatibility_for_new() {
let ss = vec!["you", "should", "not", "break", "hash", "backward", "compatibility"];
let mut bloom = Bloom::new(16, 8);
for s in ss.iter() {
bloom.set(&s);
}
let drained_elems: HashSet<u64> = bloom.drain_journal().entries.into_iter().map(|t| t.1).collect();
let expected: HashSet<u64> = [2094615114573771027u64, 244675582389208413u64].iter().cloned().collect();
assert_eq!(drained_elems, expected);
assert_eq!(bloom.k_num, 12);
}
#[test]
fn hash_backward_compatibility_for_from_parts() {
let stored_state = vec![2094615114573771027u64, 244675582389208413u64];
let k_num = 12;
let bloom = Bloom::from_parts(&stored_state, k_num);
let ss = vec!["you", "should", "not", "break", "hash", "backward", "compatibility"];
let tt = vec!["this", "doesnot", "exist"];
for s in ss.iter() {
assert!(bloom.check(&s));
}
for s in tt.iter() {
assert!(!bloom.check(&s));
}
}
}

View File

@ -21,7 +21,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{fs, io, error};
use log::trace;
use log::{info, trace};
use kvdb::DBTransaction;
use kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
@ -161,6 +161,42 @@ impl Migration for ChangeColumns {
}
}
pub struct VacuumAccountsBloom {
pub column_to_vacuum: u32,
pub columns: u32,
pub version: u32,
}
impl Migration for VacuumAccountsBloom {
fn pre_columns(&self) -> u32 { self.columns }
fn columns(&self) -> u32 { self.columns }
fn alters_existing(&self) -> bool { true }
fn version(&self) -> u32 { self.version }
fn migrate(&mut self, db: Arc<Database>, _config: &Config, _dest: &mut Database, col: u32) -> io::Result<()> {
if col != self.column_to_vacuum {
return Ok(())
}
let num_keys = db.num_keys(COL_ACCOUNT_BLOOM)?;
info!(target: "migration", "Removing accounts existence bloom ({} keys)", num_keys + 1);
let mut batch = DBTransaction::with_capacity(num_keys as usize);
const COL_ACCOUNT_BLOOM: u32 = 5;
const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
for (n, (k,_)) in db.iter(COL_ACCOUNT_BLOOM).enumerate() {
batch.delete(COL_ACCOUNT_BLOOM, &k);
if n > 0 && n % 10_000 == 0 {
info!(target: "migration", " Account Bloom entries queued for deletion: {}", n);
}
}
batch.delete(COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY);
let deletions = batch.ops.len();
db.write(batch)?;
db.flush()?;
info!(target: "migration", "Deleted {} account existence bloom items from the DB", deletions);
Ok(())
}
}
/// Get the path where all databases reside.
fn database_path(path: &Path) -> PathBuf {
let mut temp_path = path.to_owned();