diff --git a/Cargo.lock b/Cargo.lock
index b335df4af..1fd8fabb9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -399,6 +399,7 @@ dependencies = [
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs
index c6f26aab3..7c9e71ee0 100644
--- a/ethcore/src/client/client.rs
+++ b/ethcore/src/client/client.rs
@@ -304,8 +304,7 @@ impl Client {
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
- let is_canon = header.parent_hash() == &chain.best_block_hash();
- let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() };
+ let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash());
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
if let Err(e) = enact_result {
@@ -459,6 +458,8 @@ impl Client {
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
+ let is_canon = route.enacted.last().map_or(false, |h| h == hash);
+ state.sync_cache(&route.enacted, &route.retracted, is_canon);
// Final commit to the DB
self.db.read().write_buffered(batch);
chain.commit();
@@ -533,9 +534,11 @@ impl Client {
/// Get a copy of the best block's state.
pub fn state(&self) -> State {
+ let header = self.best_block_header();
+ let header = HeaderView::new(&header);
State::from_existing(
- self.state_db.lock().boxed_clone(),
- HeaderView::new(&self.best_block_header()).state_root(),
+ self.state_db.lock().boxed_clone_canon(&header.hash()),
+ header.state_root(),
self.engine.account_start_nonce(),
self.factories.clone())
.expect("State root of best block header always valid.")
@@ -1129,6 +1132,7 @@ impl MiningBlockChainClient for Client {
let block_data = block.rlp_bytes();
let route = self.commit_block(block, &h, &block_data);
trace!(target: "client", "Imported sealed block #{} ({})", number, h);
+ self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false);
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);
diff --git a/ethcore/src/state/mod.rs b/ethcore/src/state/mod.rs
index 03f6d9fad..39c8bbc11 100644
--- a/ethcore/src/state/mod.rs
+++ b/ethcore/src/state/mod.rs
@@ -164,8 +164,8 @@ impl AccountEntry {
/// use that.
/// ****************************************************************************
///
-/// Upon destruction all the local cache data merged into the global cache.
-/// The merge might be rejected if current state is non-canonical.
+/// Upon destruction all the local cache data propagated into the global cache.
+/// Propagated items might be rejected if current state is non-canonical.
///
/// State snapshotting.
///
@@ -318,7 +318,7 @@ impl State {
/// Destroy the current object and return root and database.
pub fn drop(mut self) -> (H256, StateDB) {
- self.commit_cache();
+ self.propagate_to_global_cache();
(self.root, self.db)
}
@@ -533,11 +533,12 @@ impl State {
Ok(())
}
- fn commit_cache(&mut self) {
+ /// Propagate local cache into shared canonical state cache.
+ fn propagate_to_global_cache(&mut self) {
let mut addresses = self.cache.borrow_mut();
trace!("Committing cache {:?} entries", addresses.len());
for (address, a) in addresses.drain().filter(|&(_, ref a)| a.state == AccountState::Committed || a.state == AccountState::CleanFresh) {
- self.db.cache_account(address, a.account);
+ self.db.add_to_account_cache(address, a.account, a.state == AccountState::Committed);
}
}
diff --git a/ethcore/src/state_db.rs b/ethcore/src/state_db.rs
index 2b67dc290..8724a5d73 100644
--- a/ethcore/src/state_db.rs
+++ b/ethcore/src/state_db.rs
@@ -14,11 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
+use std::collections::{VecDeque, HashSet};
use lru_cache::LruCache;
use util::journaldb::JournalDB;
use util::hash::{H256};
use util::hashdb::HashDB;
use state::Account;
+use header::BlockNumber;
use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable};
use bloom_journal::{Bloom, BloomJournal};
use db::COL_ACCOUNT_BLOOM;
@@ -29,26 +31,77 @@ pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000;
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
+const STATE_CACHE_BLOCKS: usize = 8;
+
+
+/// Shared canonical state cache.
struct AccountCache {
/// DB Account cache. `None` indicates that account is known to be missing.
// When changing the type of the values here, be sure to update `mem_used` and
// `new`.
accounts: LruCache
>,
+ /// Information on the modifications in recently committed blocks; specifically which addresses
+ /// changed in which block. Ordered by block number.
+ modifications: VecDeque,
+}
+
+/// Buffered account cache item.
+struct CacheQueueItem {
+ /// Account address.
+ address: Address,
+ /// Acccount data or `None` if account does not exist.
+ account: Option,
+ /// Indicates that the account was modified before being
+ /// added to the cache.
+ modified: bool,
+}
+
+#[derive(Debug)]
+/// Accumulates a list of accounts changed in a block.
+struct BlockChanges {
+ /// Block number.
+ number: BlockNumber,
+ /// Block hash.
+ hash: H256,
+ /// Parent block hash.
+ parent: H256,
+ /// A set of modified account addresses.
+ accounts: HashSet,
+ /// Block is part of the canonical chain.
+ is_canon: bool,
}
/// State database abstraction.
-/// Manages shared global state cache.
+/// Manages shared global state cache which reflects the canonical
+/// state as it is on the disk. All the entries in the cache are clean.
/// A clone of `StateDB` may be created as canonical or not.
-/// For canonical clones cache changes are accumulated and applied
-/// on commit.
-/// For non-canonical clones cache is cleared on commit.
+/// For canonical clones local cache is accumulated and applied
+/// in `sync_cache`
+/// For non-canonical clones local cache is dropped.
+///
+/// Global cache propagation.
+/// After a `State` object has been committed to the trie it
+/// propagates its local cache into the `StateDB` local cache
+/// using `add_to_account_cache` function.
+/// Then, after the block has been added to the chain the local cache in the
+/// `StateDB` is propagated into the global cache.
pub struct StateDB {
+ /// Backing database.
db: Box,
+ /// Shared canonical state cache.
account_cache: Arc>,
- cache_overlay: Vec<(Address, Option)>,
- is_canon: bool,
+ /// Local dirty cache.
+ local_cache: Vec,
+ /// Shared account bloom. Does not handle chain reorganizations.
account_bloom: Arc>,
cache_size: usize,
+ /// Hash of the block on top of which this instance was created or
+ /// `None` if cache is disabled
+ parent_hash: Option,
+ /// Hash of the committing block or `None` if not committed yet.
+ commit_hash: Option,
+ /// Number of the committing block or `None` if not committed yet.
+ commit_number: Option,
}
impl StateDB {
@@ -63,11 +116,16 @@ impl StateDB {
StateDB {
db: db,
- account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(cache_items) })),
- cache_overlay: Vec::new(),
- is_canon: false,
+ account_cache: Arc::new(Mutex::new(AccountCache {
+ accounts: LruCache::new(cache_items),
+ modifications: VecDeque::new(),
+ })),
+ local_cache: Vec::new(),
account_bloom: Arc::new(Mutex::new(bloom)),
cache_size: cache_size,
+ parent_hash: None,
+ commit_hash: None,
+ commit_number: None,
}
}
@@ -132,14 +190,107 @@ impl StateDB {
try!(Self::commit_bloom(batch, bloom_lock.drain_journal()));
}
let records = try!(self.db.commit(batch, now, id, end));
- if self.is_canon {
- self.commit_cache();
- } else {
- self.clear_cache();
- }
+ self.commit_hash = Some(id.clone());
+ self.commit_number = Some(now);
Ok(records)
}
+ /// Propagate local cache into the global cache and synchonize
+ /// the global cache with the best block state.
+ /// This function updates the global cache by removing entries
+ /// that are invalidated by chain reorganization. `sync_cache`
+ /// should be called after the block has been committed and the
+ /// blockchain route has ben calculated.
+ pub fn sync_cache(&mut self, enacted: &[H256], retracted: &[H256], is_best: bool) {
+ trace!("sync_cache id = (#{:?}, {:?}), parent={:?}, best={}", self.commit_number, self.commit_hash, self.parent_hash, is_best);
+ let mut cache = self.account_cache.lock();
+ let mut cache = &mut *cache;
+
+ // Purge changes from re-enacted and retracted blocks.
+ // Filter out commiting block if any.
+ let mut clear = false;
+ for block in enacted.iter().filter(|h| self.commit_hash.as_ref().map_or(true, |p| *h != p)) {
+ clear = clear || {
+ if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) {
+ trace!("Reverting enacted block {:?}", block);
+ m.is_canon = true;
+ for a in &m.accounts {
+ trace!("Reverting enacted address {:?}", a);
+ cache.accounts.remove(a);
+ }
+ false
+ } else {
+ true
+ }
+ };
+ }
+
+ for block in retracted {
+ clear = clear || {
+ if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) {
+ trace!("Retracting block {:?}", block);
+ m.is_canon = false;
+ for a in &m.accounts {
+ trace!("Retracted address {:?}", a);
+ cache.accounts.remove(a);
+ }
+ false
+ } else {
+ true
+ }
+ };
+ }
+ if clear {
+ // We don't know anything about the block; clear everything
+ trace!("Wiping cache");
+ cache.accounts.clear();
+ cache.modifications.clear();
+ }
+
+ // Propagate cache only if committing on top of the latest canonical state
+ // blocks are ordered by number and only one block with a given number is marked as canonical
+ // (contributed to canonical state cache)
+ if let (Some(ref number), Some(ref hash), Some(ref parent)) = (self.commit_number, self.commit_hash, self.parent_hash) {
+ if cache.modifications.len() == STATE_CACHE_BLOCKS {
+ cache.modifications.pop_back();
+ }
+ let mut modifications = HashSet::new();
+ trace!("committing {} cache entries", self.local_cache.len());
+ for account in self.local_cache.drain(..) {
+ if account.modified {
+ modifications.insert(account.address.clone());
+ }
+ if is_best {
+ if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) {
+ if let Some(new) = account.account {
+ if account.modified {
+ existing.overwrite_with(new);
+ }
+ continue;
+ }
+ }
+ cache.accounts.insert(account.address, account.account);
+ }
+ }
+
+ // Save modified accounts. These are ordered by the block number.
+ let block_changes = BlockChanges {
+ accounts: modifications,
+ number: *number,
+ hash: hash.clone(),
+ is_canon: is_best,
+ parent: parent.clone(),
+ };
+ let insert_at = cache.modifications.iter().enumerate().find(|&(_, ref m)| m.number < *number).map(|(i, _)| i);
+ trace!("inserting modifications at {:?}", insert_at);
+ if let Some(insert_at) = insert_at {
+ cache.modifications.insert(insert_at, block_changes);
+ } else {
+ cache.modifications.push_back(block_changes);
+ }
+ }
+ }
+
/// Returns an interface to HashDB.
pub fn as_hashdb(&self) -> &HashDB {
self.db.as_hashdb()
@@ -155,22 +306,26 @@ impl StateDB {
StateDB {
db: self.db.boxed_clone(),
account_cache: self.account_cache.clone(),
- cache_overlay: Vec::new(),
- is_canon: false,
+ local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
cache_size: self.cache_size,
+ parent_hash: None,
+ commit_hash: None,
+ commit_number: None,
}
}
/// Clone the database for a canonical state.
- pub fn boxed_clone_canon(&self) -> StateDB {
+ pub fn boxed_clone_canon(&self, parent: &H256) -> StateDB {
StateDB {
db: self.db.boxed_clone(),
account_cache: self.account_cache.clone(),
- cache_overlay: Vec::new(),
- is_canon: true,
+ local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
cache_size: self.cache_size,
+ parent_hash: Some(parent.clone()),
+ commit_hash: None,
+ commit_number: None,
}
}
@@ -190,52 +345,36 @@ impl StateDB {
&*self.db
}
- /// Enqueue cache change.
- pub fn cache_account(&mut self, addr: Address, data: Option) {
- self.cache_overlay.push((addr, data));
- }
-
- /// Apply pending cache changes.
- fn commit_cache(&mut self) {
- let mut cache = self.account_cache.lock();
- for (address, account) in self.cache_overlay.drain(..) {
- if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&address) {
- if let Some(new) = account {
- existing.overwrite_with(new);
- continue;
- }
- }
- cache.accounts.insert(address, account);
- }
- }
-
- /// Clear the cache.
- pub fn clear_cache(&mut self) {
- self.cache_overlay.clear();
- let mut cache = self.account_cache.lock();
- cache.accounts.clear();
+ /// Add a local cache entry.
+ /// The entry will be propagated to the global cache in `sync_cache`.
+ /// `modified` indicates that the entry was changed since being read from disk or global cache.
+ /// `data` can be set to an existing (`Some`), or non-existing account (`None`).
+ pub fn add_to_account_cache(&mut self, addr: Address, data: Option, modified: bool) {
+ self.local_cache.push(CacheQueueItem {
+ address: addr,
+ account: data,
+ modified: modified,
+ })
}
/// Get basic copy of the cached account. Does not include storage.
- /// Returns 'None' if the state is non-canonical and cache is disabled
- /// or if the account is not cached.
+ /// Returns 'None' if cache is disabled or if the account is not cached.
pub fn get_cached_account(&self, addr: &Address) -> Option