diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 192bff1c8..8835af184 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -303,9 +303,8 @@ impl Client { // Enact Verified Block let parent = chain_has_parent.unwrap(); - let last_hashes = self.build_last_hashes(header.parent_hash.clone()); - let is_canon = header.parent_hash == self.chain.best_block_hash(); - let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() }; + let last_hashes = self.build_last_hashes(header.parent_hash().clone()); + let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash); let enact_result = enact_verified(block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone()); if let Err(e) = enact_result { @@ -442,7 +441,6 @@ impl Client { .collect(); //let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new)); - let batch = DBTransaction::new(&self.db); // CHECK! I *think* this is fine, even if the state_root is equal to another // already-imported block of the same number. @@ -458,6 +456,8 @@ impl Client { enacted: route.enacted.clone(), retracted: route.retracted.len() }); + let is_canon = route.enacted.last().map_or(false, |h| h == hash); + state.sync_cache(&route.enacted, &route.retracted, is_canon); // Final commit to the DB self.db.write_buffered(batch).expect("DB write failed."); self.chain.commit(); @@ -532,9 +532,11 @@ impl Client { /// Get a copy of the best block's state. pub fn state(&self) -> State { + let header = self.best_block_header(); + let header = HeaderView::new(&header); State::from_existing( - self.state_db.lock().boxed_clone(), - HeaderView::new(&self.best_block_header()).state_root(), + self.state_db.lock().boxed_clone_canon(&header.hash()), + header.state_root(), self.engine.account_start_nonce(), self.trie_factory.clone()) .expect("State root of best block header always valid.") @@ -1085,9 +1087,9 @@ impl MiningBlockChainClient for Client { let block_data = block.rlp_bytes(); // Clear canonical state cache - self.state_db.lock().clear_cache(); let route = self.commit_block(block, &h, &block_data); trace!(target: "client", "Imported sealed block #{} ({})", number, h); + self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false); let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted); diff --git a/ethcore/src/evm/interpreter/shared_cache.rs b/ethcore/src/evm/interpreter/shared_cache.rs index 76360138b..ce383bae8 100644 --- a/ethcore/src/evm/interpreter/shared_cache.rs +++ b/ethcore/src/evm/interpreter/shared_cache.rs @@ -21,7 +21,7 @@ use util::sha3::*; use bit_set::BitSet; use super::super::instructions; -const CACHE_CODE_ITEMS: usize = 4096; +const CACHE_CODE_ITEMS: usize = 65536; /// GLobal cache for EVM interpreter pub struct SharedCache { diff --git a/ethcore/src/state.rs b/ethcore/src/state.rs index 0fe5931d3..b2d370512 100644 --- a/ethcore/src/state.rs +++ b/ethcore/src/state.rs @@ -42,14 +42,16 @@ pub type ApplyResult = Result; /// Account modification state. Used to check if the account was /// Modified in between commits and overall. enum AccountState { - /// Account was never modified in this state object. - Clean, + /// Account was loaded from disk and never modified in this state object. + CleanFresh, + /// Account was loaded from the global cache and never modified. + CleanCached, /// Account has been modified and is not committed to the trie yet. - /// This is set than any of the account data is changed, including + /// This is set if any of the account data is changed, including /// storage and code. Dirty, /// Account was modified and committed to the trie. - Commited, + Committed, } #[derive(Debug)] @@ -100,7 +102,15 @@ impl AccountEntry { fn new_clean(account: Option) -> AccountEntry { AccountEntry { account: account, - state: AccountState::Clean, + state: AccountState::CleanFresh, + } + } + + // Create a new account entry and mark it as clean and cached. + fn new_clean_cached(account: Option) -> AccountEntry { + AccountEntry { + account: account, + state: AccountState::CleanCached, } } @@ -303,7 +313,7 @@ impl State { /// Destroy the current object and return root and database. pub fn drop(mut self) -> (H256, StateDB) { - self.commit_cache(); + self.update_shared_cache(); (self.root, self.db) } @@ -494,7 +504,7 @@ impl State { { let mut trie = trie_factory.from_existing(db.as_hashdb_mut(), root).unwrap(); for (address, ref mut a) in accounts.iter_mut().filter(|&(_, ref a)| a.is_dirty()) { - a.state = AccountState::Commited; + a.state = AccountState::Committed; match a.account { Some(ref mut account) => { try!(trie.insert(address, &account.rlp())); @@ -509,11 +519,12 @@ impl State { Ok(()) } - fn commit_cache(&mut self) { + /// Merge local cache into shared canonical state cache. + fn update_shared_cache(&mut self) { let mut addresses = self.cache.borrow_mut(); trace!("Committing cache {:?} entries", addresses.len()); - for (address, a) in addresses.drain().filter(|&(_, ref a)| !a.is_dirty()) { - self.db.cache_account(address, a.account); + for (address, a) in addresses.drain().filter(|&(_, ref a)| a.state == AccountState::Committed || a.state == AccountState::CleanFresh) { + self.db.add_to_account_cache(address, a.account, a.state == AccountState::Committed); } } @@ -621,10 +632,7 @@ impl State { Self::update_account_cache(require, account, a, self.db.as_hashdb()); } let r = f(maybe_acc.as_ref()); - match maybe_acc { - Some(account) => self.insert_cache(a, AccountEntry::new_clean(Some(account))), - None => self.insert_cache(a, AccountEntry::new_clean(None)), - } + self.insert_cache(a, AccountEntry::new_clean(maybe_acc)); r } } @@ -643,8 +651,7 @@ impl State { let contains_key = self.cache.borrow().contains_key(a); if !contains_key { match self.db.get_cached_account(a) { - Some(Some(acc)) => self.insert_cache(a, AccountEntry::new_clean(Some(acc))), - Some(None) => self.insert_cache(a, AccountEntry::new_clean(None)), + Some(acc) => self.insert_cache(a, AccountEntry::new_clean_cached(acc)), None => { let maybe_acc = if self.db.check_account_bloom(a) { let db = self.trie_factory.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); diff --git a/ethcore/src/state_db.rs b/ethcore/src/state_db.rs index bae461af5..a1c756b32 100644 --- a/ethcore/src/state_db.rs +++ b/ethcore/src/state_db.rs @@ -14,22 +14,53 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::collections::{VecDeque, HashSet}; use lru_cache::LruCache; use util::journaldb::JournalDB; use util::hash::{H256}; use util::hashdb::HashDB; -use util::{Arc, Address, DBTransaction, UtilError, Mutex, Hashable, BytesConvertable}; use account::Account; +use header::BlockNumber; +use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable, BytesConvertable}; use bloomfilter::{Bloom, BloomJournal}; -use util::Database; use client::DB_COL_ACCOUNT_BLOOM; use byteorder::{LittleEndian, ByteOrder}; -const STATE_CACHE_ITEMS: usize = 65536; +const STATE_CACHE_ITEMS: usize = 256000; +const STATE_CACHE_BLOCKS: usize = 8; +/// Shared canonical state cache. struct AccountCache { /// DB Account cache. `None` indicates that account is known to be missing. accounts: LruCache>, + /// Accounts changed in recently committed blocks. Ordered by block number. + modifications: VecDeque, +} + +/// Buffered account cache item. +struct CacheQueueItem { + /// Account address. + address: Address, + /// Acccount data or `None` if account does not exist. + account: Option, + /// Indicates that the account was modified before being + /// added to the cache. + modified: bool, +} + +#[derive(Debug)] +/// Accumulates a list of accounts changed in a block. +struct BlockChanges { + /// Block number. + number: BlockNumber, + /// Block hash. + hash: H256, + /// Parent block hash. + parent: H256, + /// A set of modified account addresses. + accounts: HashSet
, + /// Block is part of the canonical chain. + is_canon: bool, } /// State database abstraction. @@ -39,11 +70,21 @@ struct AccountCache { /// on commit. /// For non-canonical clones cache is cleared on commit. pub struct StateDB { + /// Backing database. db: Box, + /// Shared canonical state cache. account_cache: Arc>, - cache_overlay: Vec<(Address, Option)>, - is_canon: bool, + /// Local cache buffer. + cache_buffer: Vec, + /// Shared account bloom. Does not handle chain reorganizations. account_bloom: Arc>, + /// Hash of the block on top of which this instance was created or + /// `None` if cache is disabled + parent_hash: Option, + /// Hash of the committing block or `None` if not committed yet. + commit_hash: Option, + /// Number of the committing block or `None` if not committed yet. + commit_number: Option, } pub const ACCOUNT_BLOOM_SPACE: usize = 1048576; @@ -52,6 +93,8 @@ pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000; pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count"; impl StateDB { + /// Loads accounts bloom from the database + /// This bloom is used to handle request for the non-existant account fast pub fn load_bloom(db: &Database) -> Bloom { let hash_count_entry = db.get(DB_COL_ACCOUNT_BLOOM, ACCOUNT_BLOOM_HASHCOUNT_KEY) .expect("Low-level database error"); @@ -82,10 +125,15 @@ impl StateDB { let bloom = Self::load_bloom(db.backing()); StateDB { db: db, - account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(STATE_CACHE_ITEMS) })), - cache_overlay: Vec::new(), - is_canon: false, + account_cache: Arc::new(Mutex::new(AccountCache { + accounts: LruCache::new(STATE_CACHE_ITEMS), + modifications: VecDeque::new(), + })), + cache_buffer: Vec::new(), account_bloom: Arc::new(Mutex::new(bloom)), + parent_hash: None, + commit_hash: None, + commit_number: None, } } @@ -124,14 +172,106 @@ impl StateDB { } let records = try!(self.db.commit(batch, now, id, end)); - if self.is_canon { - self.commit_cache(); - } else { - self.clear_cache(); - } + self.commit_hash = Some(id.clone()); + self.commit_number = Some(now); Ok(records) } + /// Apply buffered cache changes and synchronize canonical + /// state cache with the best block state. + /// This function updates the cache by removing entries that are + /// invalidated by chain reorganization. `update_cache` should be + /// called after the block has been commited and the blockchain + /// route has ben calculated. + pub fn sync_cache(&mut self, enacted: &[H256], retracted: &[H256], is_best: bool) { + trace!("sync_cache id = (#{:?}, {:?}), parent={:?}, best={}", self.commit_number, self.commit_hash, self.parent_hash, is_best); + let mut cache = self.account_cache.lock(); + let mut cache = &mut *cache; + + // Clean changes from re-enacted and retracted blocks + let mut clear = false; + for block in enacted.iter().filter(|h| self.commit_hash.as_ref().map_or(true, |p| *h != p)) { + clear = clear || { + if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) { + trace!("Reverting enacted block {:?}", block); + m.is_canon = true; + for a in &m.accounts { + trace!("Reverting enacted address {:?}", a); + cache.accounts.remove(a); + } + false + } else { + true + } + }; + } + + for block in retracted { + clear = clear || { + if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) { + trace!("Retracting block {:?}", block); + m.is_canon = false; + for a in &m.accounts { + trace!("Retracted address {:?}", a); + cache.accounts.remove(a); + } + false + } else { + true + } + }; + } + if clear { + // We don't know anything about the block; clear everything + trace!("Wiping cache"); + cache.accounts.clear(); + cache.modifications.clear(); + } + + // Apply cache changes only if committing on top of the latest canonical state + // blocks are ordered by number and only one block with a given number is marked as canonical + // (contributed to canonical state cache) + if let (Some(ref number), Some(ref hash), Some(ref parent)) = (self.commit_number, self.commit_hash, self.parent_hash) { + if cache.modifications.len() == STATE_CACHE_BLOCKS { + cache.modifications.pop_back(); + } + let mut modifications = HashSet::new(); + trace!("committing {} cache entries", self.cache_buffer.len()); + for account in self.cache_buffer.drain(..) { + if account.modified { + modifications.insert(account.address.clone()); + } + if is_best { + if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) { + if let Some(new) = account.account { + if account.modified { + existing.overwrite_with(new); + } + continue; + } + } + cache.accounts.insert(account.address, account.account); + } + } + + // Save modified accounts. These are ordered by the block number. + let block_changes = BlockChanges { + accounts: modifications, + number: *number, + hash: hash.clone(), + is_canon: is_best, + parent: parent.clone(), + }; + let insert_at = cache.modifications.iter().enumerate().find(|&(_, ref m)| m.number < *number).map(|(i, _)| i); + trace!("inserting modifications at {:?}", insert_at); + if let Some(insert_at) = insert_at { + cache.modifications.insert(insert_at, block_changes); + } else { + cache.modifications.push_back(block_changes); + } + } + } + /// Returns an interface to HashDB. pub fn as_hashdb(&self) -> &HashDB { self.db.as_hashdb() @@ -147,20 +287,24 @@ impl StateDB { StateDB { db: self.db.boxed_clone(), account_cache: self.account_cache.clone(), - cache_overlay: Vec::new(), - is_canon: false, + cache_buffer: Vec::new(), account_bloom: self.account_bloom.clone(), + parent_hash: None, + commit_hash: None, + commit_number: None, } } /// Clone the database for a canonical state. - pub fn boxed_clone_canon(&self) -> StateDB { + pub fn boxed_clone_canon(&self, parent: &H256) -> StateDB { StateDB { db: self.db.boxed_clone(), account_cache: self.account_cache.clone(), - cache_overlay: Vec::new(), - is_canon: true, + cache_buffer: Vec::new(), account_bloom: self.account_bloom.clone(), + parent_hash: Some(parent.clone()), + commit_hash: None, + commit_number: None, } } @@ -179,53 +323,147 @@ impl StateDB { &*self.db } - /// Enqueue cache change. - pub fn cache_account(&mut self, addr: Address, data: Option) { - self.cache_overlay.push((addr, data)); - } - - /// Apply pending cache changes. - fn commit_cache(&mut self) { - let mut cache = self.account_cache.lock(); - for (address, account) in self.cache_overlay.drain(..) { - if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&address) { - if let Some(new) = account { - existing.overwrite_with(new); - continue; - } - } - cache.accounts.insert(address, account); - } - } - - /// Clear the cache. - pub fn clear_cache(&mut self) { - self.cache_overlay.clear(); - let mut cache = self.account_cache.lock(); - cache.accounts.clear(); + /// Add pending cache change. + /// The change is queued to be applied in `commit`. + pub fn add_to_account_cache(&mut self, addr: Address, data: Option, modified: bool) { + self.cache_buffer.push(CacheQueueItem { + address: addr, + account: data, + modified: modified, + }) } /// Get basic copy of the cached account. Does not include storage. - /// Returns 'None' if the state is non-canonical and cache is disabled - /// or if the account is not cached. + /// Returns 'None' if cache is disabled or if the account is not cached. pub fn get_cached_account(&self, addr: &Address) -> Option> { - if !self.is_canon { + let mut cache = self.account_cache.lock(); + if !Self::is_allowed(addr, &self.parent_hash, &cache.modifications) { return None; } - let mut cache = self.account_cache.lock(); cache.accounts.get_mut(&addr).map(|a| a.as_ref().map(|a| a.clone_basic())) } /// Get value from a cached account. - /// Returns 'None' if the state is non-canonical and cache is disabled - /// or if the account is not cached. + /// Returns 'None' if cache is disabled or if the account is not cached. pub fn get_cached(&self, a: &Address, f: F) -> Option where F: FnOnce(Option<&mut Account>) -> U { - if !self.is_canon { + let mut cache = self.account_cache.lock(); + if !Self::is_allowed(a, &self.parent_hash, &cache.modifications) { return None; } - let mut cache = self.account_cache.lock(); cache.accounts.get_mut(a).map(|c| f(c.as_mut())) } + + /// Check if the account can be returned from cache by matching current block parent hash against canonical + /// state and filtering out account modified in later blocks. + fn is_allowed(addr: &Address, parent_hash: &Option, modifications: &VecDeque) -> bool { + let mut parent = match *parent_hash { + None => { + trace!("Cache lookup skipped for {:?}: no parent hash", addr); + return false; + } + Some(ref parent) => parent, + }; + if modifications.is_empty() { + return true; + } + // Ignore all accounts modified in later blocks + // Modifications contains block ordered by the number + // We search for our parent in that list first and then for + // all its parent until we hit the canonical block, + // checking against all the intermediate modifications. + let mut iter = modifications.iter(); + while let Some(ref m) = iter.next() { + if &m.hash == parent { + if m.is_canon { + return true; + } + parent = &m.parent; + } + if m.accounts.contains(addr) { + trace!("Cache lookup skipped for {:?}: modified in a later block", addr); + return false; + } + } + trace!("Cache lookup skipped for {:?}: parent hash is unknown", addr); + return false; + } +} + +#[cfg(test)] +mod tests { + +use util::{U256, H256, FixedHash, Address, DBTransaction}; +use tests::helpers::*; +use account::Account; +use util::log::init_log; + +#[test] +fn state_db_smoke() { + init_log(); + + let mut state_db_result = get_temp_state_db(); + let state_db = state_db_result.take(); + let root_parent = H256::random(); + let address = Address::random(); + let h0 = H256::random(); + let h1a = H256::random(); + let h1b = H256::random(); + let h2a = H256::random(); + let h2b = H256::random(); + let h3a = H256::random(); + let h3b = H256::random(); + let mut batch = DBTransaction::new(state_db.journal_db().backing()); + + // blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ] + // balance [ 5 5 4 3 2 2 ] + let mut s = state_db.boxed_clone_canon(&root_parent); + s.add_to_account_cache(address, Some(Account::new_basic(2.into(), 0.into())), false); + s.commit(&mut batch, 0, &h0, None).unwrap(); + s.sync_cache(&[], &[], true); + + let mut s = state_db.boxed_clone_canon(&h0); + s.commit(&mut batch, 1, &h1a, None).unwrap(); + s.sync_cache(&[], &[], true); + + let mut s = state_db.boxed_clone_canon(&h0); + s.add_to_account_cache(address, Some(Account::new_basic(3.into(), 0.into())), true); + s.commit(&mut batch, 1, &h1b, None).unwrap(); + s.sync_cache(&[], &[], false); + + let mut s = state_db.boxed_clone_canon(&h1b); + s.add_to_account_cache(address, Some(Account::new_basic(4.into(), 0.into())), true); + s.commit(&mut batch, 2, &h2b, None).unwrap(); + s.sync_cache(&[], &[], false); + + let mut s = state_db.boxed_clone_canon(&h1a); + s.add_to_account_cache(address, Some(Account::new_basic(5.into(), 0.into())), true); + s.commit(&mut batch, 2, &h2a, None).unwrap(); + s.sync_cache(&[], &[], true); + + let mut s = state_db.boxed_clone_canon(&h2a); + s.commit(&mut batch, 3, &h3a, None).unwrap(); + s.sync_cache(&[], &[], true); + + let s = state_db.boxed_clone_canon(&h3a); + assert_eq!(s.get_cached_account(&address).unwrap().unwrap().balance(), &U256::from(5)); + + let s = state_db.boxed_clone_canon(&h1a); + assert!(s.get_cached_account(&address).is_none()); + + let s = state_db.boxed_clone_canon(&h2b); + assert!(s.get_cached_account(&address).is_none()); + + let s = state_db.boxed_clone_canon(&h1b); + assert!(s.get_cached_account(&address).is_none()); + + // reorg to 3b + // blocks [ 3b(c) 3a 2a 2b(c) 1b 1a 0 ] + let mut s = state_db.boxed_clone_canon(&h2b); + s.commit(&mut batch, 3, &h3b, None).unwrap(); + s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], true); + let s = state_db.boxed_clone_canon(&h3a); + assert!(s.get_cached_account(&address).is_none()); +} } diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 4312c7c41..a5851878a 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -58,7 +58,6 @@ fn miner_service(spec: &Spec, accounts: Arc) -> Arc { reseal_on_external_tx: true, reseal_on_own_tx: true, tx_queue_size: 1024, - tx_queue_strategy: PrioritizationStrategy::GasPriceOnly, tx_gas_limit: !U256::zero(), tx_queue_strategy: PrioritizationStrategy::GasPriceOnly, pending_set: PendingSet::SealingOrElseQueue,