Merge branch 'master' into canon-cache-size
This commit is contained in:
commit
19e5bede7f
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -399,6 +399,7 @@ dependencies = [
|
|||||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
|
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
|
||||||
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -304,8 +304,7 @@ impl Client {
|
|||||||
// Enact Verified Block
|
// Enact Verified Block
|
||||||
let parent = chain_has_parent.unwrap();
|
let parent = chain_has_parent.unwrap();
|
||||||
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
|
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
|
||||||
let is_canon = header.parent_hash() == &chain.best_block_hash();
|
let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash());
|
||||||
let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() };
|
|
||||||
|
|
||||||
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
|
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
|
||||||
if let Err(e) = enact_result {
|
if let Err(e) = enact_result {
|
||||||
@ -459,6 +458,8 @@ impl Client {
|
|||||||
enacted: route.enacted.clone(),
|
enacted: route.enacted.clone(),
|
||||||
retracted: route.retracted.len()
|
retracted: route.retracted.len()
|
||||||
});
|
});
|
||||||
|
let is_canon = route.enacted.last().map_or(false, |h| h == hash);
|
||||||
|
state.sync_cache(&route.enacted, &route.retracted, is_canon);
|
||||||
// Final commit to the DB
|
// Final commit to the DB
|
||||||
self.db.read().write_buffered(batch);
|
self.db.read().write_buffered(batch);
|
||||||
chain.commit();
|
chain.commit();
|
||||||
@ -533,9 +534,11 @@ impl Client {
|
|||||||
|
|
||||||
/// Get a copy of the best block's state.
|
/// Get a copy of the best block's state.
|
||||||
pub fn state(&self) -> State {
|
pub fn state(&self) -> State {
|
||||||
|
let header = self.best_block_header();
|
||||||
|
let header = HeaderView::new(&header);
|
||||||
State::from_existing(
|
State::from_existing(
|
||||||
self.state_db.lock().boxed_clone(),
|
self.state_db.lock().boxed_clone_canon(&header.hash()),
|
||||||
HeaderView::new(&self.best_block_header()).state_root(),
|
header.state_root(),
|
||||||
self.engine.account_start_nonce(),
|
self.engine.account_start_nonce(),
|
||||||
self.factories.clone())
|
self.factories.clone())
|
||||||
.expect("State root of best block header always valid.")
|
.expect("State root of best block header always valid.")
|
||||||
@ -1129,6 +1132,7 @@ impl MiningBlockChainClient for Client {
|
|||||||
let block_data = block.rlp_bytes();
|
let block_data = block.rlp_bytes();
|
||||||
let route = self.commit_block(block, &h, &block_data);
|
let route = self.commit_block(block, &h, &block_data);
|
||||||
trace!(target: "client", "Imported sealed block #{} ({})", number, h);
|
trace!(target: "client", "Imported sealed block #{} ({})", number, h);
|
||||||
|
self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false);
|
||||||
|
|
||||||
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
|
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
|
||||||
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);
|
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);
|
||||||
|
@ -164,8 +164,8 @@ impl AccountEntry {
|
|||||||
/// use that.
|
/// use that.
|
||||||
/// ****************************************************************************
|
/// ****************************************************************************
|
||||||
///
|
///
|
||||||
/// Upon destruction all the local cache data merged into the global cache.
|
/// Upon destruction all the local cache data propagated into the global cache.
|
||||||
/// The merge might be rejected if current state is non-canonical.
|
/// Propagated items might be rejected if current state is non-canonical.
|
||||||
///
|
///
|
||||||
/// State snapshotting.
|
/// State snapshotting.
|
||||||
///
|
///
|
||||||
@ -318,7 +318,7 @@ impl State {
|
|||||||
|
|
||||||
/// Destroy the current object and return root and database.
|
/// Destroy the current object and return root and database.
|
||||||
pub fn drop(mut self) -> (H256, StateDB) {
|
pub fn drop(mut self) -> (H256, StateDB) {
|
||||||
self.commit_cache();
|
self.propagate_to_global_cache();
|
||||||
(self.root, self.db)
|
(self.root, self.db)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -533,11 +533,12 @@ impl State {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn commit_cache(&mut self) {
|
/// Propagate local cache into shared canonical state cache.
|
||||||
|
fn propagate_to_global_cache(&mut self) {
|
||||||
let mut addresses = self.cache.borrow_mut();
|
let mut addresses = self.cache.borrow_mut();
|
||||||
trace!("Committing cache {:?} entries", addresses.len());
|
trace!("Committing cache {:?} entries", addresses.len());
|
||||||
for (address, a) in addresses.drain().filter(|&(_, ref a)| a.state == AccountState::Committed || a.state == AccountState::CleanFresh) {
|
for (address, a) in addresses.drain().filter(|&(_, ref a)| a.state == AccountState::Committed || a.state == AccountState::CleanFresh) {
|
||||||
self.db.cache_account(address, a.account);
|
self.db.add_to_account_cache(address, a.account, a.state == AccountState::Committed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,11 +14,13 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
use std::collections::{VecDeque, HashSet};
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use util::journaldb::JournalDB;
|
use util::journaldb::JournalDB;
|
||||||
use util::hash::{H256};
|
use util::hash::{H256};
|
||||||
use util::hashdb::HashDB;
|
use util::hashdb::HashDB;
|
||||||
use state::Account;
|
use state::Account;
|
||||||
|
use header::BlockNumber;
|
||||||
use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable};
|
use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable};
|
||||||
use bloom_journal::{Bloom, BloomJournal};
|
use bloom_journal::{Bloom, BloomJournal};
|
||||||
use db::COL_ACCOUNT_BLOOM;
|
use db::COL_ACCOUNT_BLOOM;
|
||||||
@ -29,26 +31,77 @@ pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000;
|
|||||||
|
|
||||||
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
|
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
|
||||||
|
|
||||||
|
const STATE_CACHE_BLOCKS: usize = 8;
|
||||||
|
|
||||||
|
|
||||||
|
/// Shared canonical state cache.
|
||||||
struct AccountCache {
|
struct AccountCache {
|
||||||
/// DB Account cache. `None` indicates that account is known to be missing.
|
/// DB Account cache. `None` indicates that account is known to be missing.
|
||||||
// When changing the type of the values here, be sure to update `mem_used` and
|
// When changing the type of the values here, be sure to update `mem_used` and
|
||||||
// `new`.
|
// `new`.
|
||||||
accounts: LruCache<Address, Option<Account>>,
|
accounts: LruCache<Address, Option<Account>>,
|
||||||
|
/// Information on the modifications in recently committed blocks; specifically which addresses
|
||||||
|
/// changed in which block. Ordered by block number.
|
||||||
|
modifications: VecDeque<BlockChanges>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Buffered account cache item.
|
||||||
|
struct CacheQueueItem {
|
||||||
|
/// Account address.
|
||||||
|
address: Address,
|
||||||
|
/// Acccount data or `None` if account does not exist.
|
||||||
|
account: Option<Account>,
|
||||||
|
/// Indicates that the account was modified before being
|
||||||
|
/// added to the cache.
|
||||||
|
modified: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
/// Accumulates a list of accounts changed in a block.
|
||||||
|
struct BlockChanges {
|
||||||
|
/// Block number.
|
||||||
|
number: BlockNumber,
|
||||||
|
/// Block hash.
|
||||||
|
hash: H256,
|
||||||
|
/// Parent block hash.
|
||||||
|
parent: H256,
|
||||||
|
/// A set of modified account addresses.
|
||||||
|
accounts: HashSet<Address>,
|
||||||
|
/// Block is part of the canonical chain.
|
||||||
|
is_canon: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// State database abstraction.
|
/// State database abstraction.
|
||||||
/// Manages shared global state cache.
|
/// Manages shared global state cache which reflects the canonical
|
||||||
|
/// state as it is on the disk. All the entries in the cache are clean.
|
||||||
/// A clone of `StateDB` may be created as canonical or not.
|
/// A clone of `StateDB` may be created as canonical or not.
|
||||||
/// For canonical clones cache changes are accumulated and applied
|
/// For canonical clones local cache is accumulated and applied
|
||||||
/// on commit.
|
/// in `sync_cache`
|
||||||
/// For non-canonical clones cache is cleared on commit.
|
/// For non-canonical clones local cache is dropped.
|
||||||
|
///
|
||||||
|
/// Global cache propagation.
|
||||||
|
/// After a `State` object has been committed to the trie it
|
||||||
|
/// propagates its local cache into the `StateDB` local cache
|
||||||
|
/// using `add_to_account_cache` function.
|
||||||
|
/// Then, after the block has been added to the chain the local cache in the
|
||||||
|
/// `StateDB` is propagated into the global cache.
|
||||||
pub struct StateDB {
|
pub struct StateDB {
|
||||||
|
/// Backing database.
|
||||||
db: Box<JournalDB>,
|
db: Box<JournalDB>,
|
||||||
|
/// Shared canonical state cache.
|
||||||
account_cache: Arc<Mutex<AccountCache>>,
|
account_cache: Arc<Mutex<AccountCache>>,
|
||||||
cache_overlay: Vec<(Address, Option<Account>)>,
|
/// Local dirty cache.
|
||||||
is_canon: bool,
|
local_cache: Vec<CacheQueueItem>,
|
||||||
|
/// Shared account bloom. Does not handle chain reorganizations.
|
||||||
account_bloom: Arc<Mutex<Bloom>>,
|
account_bloom: Arc<Mutex<Bloom>>,
|
||||||
cache_size: usize,
|
cache_size: usize,
|
||||||
|
/// Hash of the block on top of which this instance was created or
|
||||||
|
/// `None` if cache is disabled
|
||||||
|
parent_hash: Option<H256>,
|
||||||
|
/// Hash of the committing block or `None` if not committed yet.
|
||||||
|
commit_hash: Option<H256>,
|
||||||
|
/// Number of the committing block or `None` if not committed yet.
|
||||||
|
commit_number: Option<BlockNumber>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StateDB {
|
impl StateDB {
|
||||||
@ -63,11 +116,16 @@ impl StateDB {
|
|||||||
|
|
||||||
StateDB {
|
StateDB {
|
||||||
db: db,
|
db: db,
|
||||||
account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(cache_items) })),
|
account_cache: Arc::new(Mutex::new(AccountCache {
|
||||||
cache_overlay: Vec::new(),
|
accounts: LruCache::new(cache_items),
|
||||||
is_canon: false,
|
modifications: VecDeque::new(),
|
||||||
|
})),
|
||||||
|
local_cache: Vec::new(),
|
||||||
account_bloom: Arc::new(Mutex::new(bloom)),
|
account_bloom: Arc::new(Mutex::new(bloom)),
|
||||||
cache_size: cache_size,
|
cache_size: cache_size,
|
||||||
|
parent_hash: None,
|
||||||
|
commit_hash: None,
|
||||||
|
commit_number: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,14 +190,107 @@ impl StateDB {
|
|||||||
try!(Self::commit_bloom(batch, bloom_lock.drain_journal()));
|
try!(Self::commit_bloom(batch, bloom_lock.drain_journal()));
|
||||||
}
|
}
|
||||||
let records = try!(self.db.commit(batch, now, id, end));
|
let records = try!(self.db.commit(batch, now, id, end));
|
||||||
if self.is_canon {
|
self.commit_hash = Some(id.clone());
|
||||||
self.commit_cache();
|
self.commit_number = Some(now);
|
||||||
} else {
|
|
||||||
self.clear_cache();
|
|
||||||
}
|
|
||||||
Ok(records)
|
Ok(records)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Propagate local cache into the global cache and synchonize
|
||||||
|
/// the global cache with the best block state.
|
||||||
|
/// This function updates the global cache by removing entries
|
||||||
|
/// that are invalidated by chain reorganization. `sync_cache`
|
||||||
|
/// should be called after the block has been committed and the
|
||||||
|
/// blockchain route has ben calculated.
|
||||||
|
pub fn sync_cache(&mut self, enacted: &[H256], retracted: &[H256], is_best: bool) {
|
||||||
|
trace!("sync_cache id = (#{:?}, {:?}), parent={:?}, best={}", self.commit_number, self.commit_hash, self.parent_hash, is_best);
|
||||||
|
let mut cache = self.account_cache.lock();
|
||||||
|
let mut cache = &mut *cache;
|
||||||
|
|
||||||
|
// Purge changes from re-enacted and retracted blocks.
|
||||||
|
// Filter out commiting block if any.
|
||||||
|
let mut clear = false;
|
||||||
|
for block in enacted.iter().filter(|h| self.commit_hash.as_ref().map_or(true, |p| *h != p)) {
|
||||||
|
clear = clear || {
|
||||||
|
if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) {
|
||||||
|
trace!("Reverting enacted block {:?}", block);
|
||||||
|
m.is_canon = true;
|
||||||
|
for a in &m.accounts {
|
||||||
|
trace!("Reverting enacted address {:?}", a);
|
||||||
|
cache.accounts.remove(a);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
for block in retracted {
|
||||||
|
clear = clear || {
|
||||||
|
if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) {
|
||||||
|
trace!("Retracting block {:?}", block);
|
||||||
|
m.is_canon = false;
|
||||||
|
for a in &m.accounts {
|
||||||
|
trace!("Retracted address {:?}", a);
|
||||||
|
cache.accounts.remove(a);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if clear {
|
||||||
|
// We don't know anything about the block; clear everything
|
||||||
|
trace!("Wiping cache");
|
||||||
|
cache.accounts.clear();
|
||||||
|
cache.modifications.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Propagate cache only if committing on top of the latest canonical state
|
||||||
|
// blocks are ordered by number and only one block with a given number is marked as canonical
|
||||||
|
// (contributed to canonical state cache)
|
||||||
|
if let (Some(ref number), Some(ref hash), Some(ref parent)) = (self.commit_number, self.commit_hash, self.parent_hash) {
|
||||||
|
if cache.modifications.len() == STATE_CACHE_BLOCKS {
|
||||||
|
cache.modifications.pop_back();
|
||||||
|
}
|
||||||
|
let mut modifications = HashSet::new();
|
||||||
|
trace!("committing {} cache entries", self.local_cache.len());
|
||||||
|
for account in self.local_cache.drain(..) {
|
||||||
|
if account.modified {
|
||||||
|
modifications.insert(account.address.clone());
|
||||||
|
}
|
||||||
|
if is_best {
|
||||||
|
if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) {
|
||||||
|
if let Some(new) = account.account {
|
||||||
|
if account.modified {
|
||||||
|
existing.overwrite_with(new);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cache.accounts.insert(account.address, account.account);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save modified accounts. These are ordered by the block number.
|
||||||
|
let block_changes = BlockChanges {
|
||||||
|
accounts: modifications,
|
||||||
|
number: *number,
|
||||||
|
hash: hash.clone(),
|
||||||
|
is_canon: is_best,
|
||||||
|
parent: parent.clone(),
|
||||||
|
};
|
||||||
|
let insert_at = cache.modifications.iter().enumerate().find(|&(_, ref m)| m.number < *number).map(|(i, _)| i);
|
||||||
|
trace!("inserting modifications at {:?}", insert_at);
|
||||||
|
if let Some(insert_at) = insert_at {
|
||||||
|
cache.modifications.insert(insert_at, block_changes);
|
||||||
|
} else {
|
||||||
|
cache.modifications.push_back(block_changes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns an interface to HashDB.
|
/// Returns an interface to HashDB.
|
||||||
pub fn as_hashdb(&self) -> &HashDB {
|
pub fn as_hashdb(&self) -> &HashDB {
|
||||||
self.db.as_hashdb()
|
self.db.as_hashdb()
|
||||||
@ -155,22 +306,26 @@ impl StateDB {
|
|||||||
StateDB {
|
StateDB {
|
||||||
db: self.db.boxed_clone(),
|
db: self.db.boxed_clone(),
|
||||||
account_cache: self.account_cache.clone(),
|
account_cache: self.account_cache.clone(),
|
||||||
cache_overlay: Vec::new(),
|
local_cache: Vec::new(),
|
||||||
is_canon: false,
|
|
||||||
account_bloom: self.account_bloom.clone(),
|
account_bloom: self.account_bloom.clone(),
|
||||||
cache_size: self.cache_size,
|
cache_size: self.cache_size,
|
||||||
|
parent_hash: None,
|
||||||
|
commit_hash: None,
|
||||||
|
commit_number: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clone the database for a canonical state.
|
/// Clone the database for a canonical state.
|
||||||
pub fn boxed_clone_canon(&self) -> StateDB {
|
pub fn boxed_clone_canon(&self, parent: &H256) -> StateDB {
|
||||||
StateDB {
|
StateDB {
|
||||||
db: self.db.boxed_clone(),
|
db: self.db.boxed_clone(),
|
||||||
account_cache: self.account_cache.clone(),
|
account_cache: self.account_cache.clone(),
|
||||||
cache_overlay: Vec::new(),
|
local_cache: Vec::new(),
|
||||||
is_canon: true,
|
|
||||||
account_bloom: self.account_bloom.clone(),
|
account_bloom: self.account_bloom.clone(),
|
||||||
cache_size: self.cache_size,
|
cache_size: self.cache_size,
|
||||||
|
parent_hash: Some(parent.clone()),
|
||||||
|
commit_hash: None,
|
||||||
|
commit_number: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,52 +345,36 @@ impl StateDB {
|
|||||||
&*self.db
|
&*self.db
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enqueue cache change.
|
/// Add a local cache entry.
|
||||||
pub fn cache_account(&mut self, addr: Address, data: Option<Account>) {
|
/// The entry will be propagated to the global cache in `sync_cache`.
|
||||||
self.cache_overlay.push((addr, data));
|
/// `modified` indicates that the entry was changed since being read from disk or global cache.
|
||||||
}
|
/// `data` can be set to an existing (`Some`), or non-existing account (`None`).
|
||||||
|
pub fn add_to_account_cache(&mut self, addr: Address, data: Option<Account>, modified: bool) {
|
||||||
/// Apply pending cache changes.
|
self.local_cache.push(CacheQueueItem {
|
||||||
fn commit_cache(&mut self) {
|
address: addr,
|
||||||
let mut cache = self.account_cache.lock();
|
account: data,
|
||||||
for (address, account) in self.cache_overlay.drain(..) {
|
modified: modified,
|
||||||
if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&address) {
|
})
|
||||||
if let Some(new) = account {
|
|
||||||
existing.overwrite_with(new);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cache.accounts.insert(address, account);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clear the cache.
|
|
||||||
pub fn clear_cache(&mut self) {
|
|
||||||
self.cache_overlay.clear();
|
|
||||||
let mut cache = self.account_cache.lock();
|
|
||||||
cache.accounts.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get basic copy of the cached account. Does not include storage.
|
/// Get basic copy of the cached account. Does not include storage.
|
||||||
/// Returns 'None' if the state is non-canonical and cache is disabled
|
/// Returns 'None' if cache is disabled or if the account is not cached.
|
||||||
/// or if the account is not cached.
|
|
||||||
pub fn get_cached_account(&self, addr: &Address) -> Option<Option<Account>> {
|
pub fn get_cached_account(&self, addr: &Address) -> Option<Option<Account>> {
|
||||||
if !self.is_canon {
|
let mut cache = self.account_cache.lock();
|
||||||
|
if !Self::is_allowed(addr, &self.parent_hash, &cache.modifications) {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let mut cache = self.account_cache.lock();
|
|
||||||
cache.accounts.get_mut(&addr).map(|a| a.as_ref().map(|a| a.clone_basic()))
|
cache.accounts.get_mut(&addr).map(|a| a.as_ref().map(|a| a.clone_basic()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get value from a cached account.
|
/// Get value from a cached account.
|
||||||
/// Returns 'None' if the state is non-canonical and cache is disabled
|
/// Returns 'None' if cache is disabled or if the account is not cached.
|
||||||
/// or if the account is not cached.
|
|
||||||
pub fn get_cached<F, U>(&self, a: &Address, f: F) -> Option<U>
|
pub fn get_cached<F, U>(&self, a: &Address, f: F) -> Option<U>
|
||||||
where F: FnOnce(Option<&mut Account>) -> U {
|
where F: FnOnce(Option<&mut Account>) -> U {
|
||||||
if !self.is_canon {
|
let mut cache = self.account_cache.lock();
|
||||||
|
if !Self::is_allowed(a, &self.parent_hash, &cache.modifications) {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let mut cache = self.account_cache.lock();
|
|
||||||
cache.accounts.get_mut(a).map(|c| f(c.as_mut()))
|
cache.accounts.get_mut(a).map(|c| f(c.as_mut()))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,5 +382,117 @@ impl StateDB {
|
|||||||
pub fn cache_size(&self) -> usize {
|
pub fn cache_size(&self) -> usize {
|
||||||
self.cache_size
|
self.cache_size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if the account can be returned from cache by matching current block parent hash against canonical
|
||||||
|
/// state and filtering out account modified in later blocks.
|
||||||
|
fn is_allowed(addr: &Address, parent_hash: &Option<H256>, modifications: &VecDeque<BlockChanges>) -> bool {
|
||||||
|
let mut parent = match *parent_hash {
|
||||||
|
None => {
|
||||||
|
trace!("Cache lookup skipped for {:?}: no parent hash", addr);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Some(ref parent) => parent,
|
||||||
|
};
|
||||||
|
if modifications.is_empty() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// Ignore all accounts modified in later blocks
|
||||||
|
// Modifications contains block ordered by the number
|
||||||
|
// We search for our parent in that list first and then for
|
||||||
|
// all its parent until we hit the canonical block,
|
||||||
|
// checking against all the intermediate modifications.
|
||||||
|
let mut iter = modifications.iter();
|
||||||
|
while let Some(ref m) = iter.next() {
|
||||||
|
if &m.hash == parent {
|
||||||
|
if m.is_canon {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
parent = &m.parent;
|
||||||
|
}
|
||||||
|
if m.accounts.contains(addr) {
|
||||||
|
trace!("Cache lookup skipped for {:?}: modified in a later block", addr);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
trace!("Cache lookup skipped for {:?}: parent hash is unknown", addr);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
use util::{U256, H256, FixedHash, Address, DBTransaction};
|
||||||
|
use tests::helpers::*;
|
||||||
|
use state::Account;
|
||||||
|
use util::log::init_log;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn state_db_smoke() {
|
||||||
|
init_log();
|
||||||
|
|
||||||
|
let mut state_db_result = get_temp_state_db();
|
||||||
|
let state_db = state_db_result.take();
|
||||||
|
let root_parent = H256::random();
|
||||||
|
let address = Address::random();
|
||||||
|
let h0 = H256::random();
|
||||||
|
let h1a = H256::random();
|
||||||
|
let h1b = H256::random();
|
||||||
|
let h2a = H256::random();
|
||||||
|
let h2b = H256::random();
|
||||||
|
let h3a = H256::random();
|
||||||
|
let h3b = H256::random();
|
||||||
|
let mut batch = DBTransaction::new(state_db.journal_db().backing());
|
||||||
|
|
||||||
|
// blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ]
|
||||||
|
// balance [ 5 5 4 3 2 2 ]
|
||||||
|
let mut s = state_db.boxed_clone_canon(&root_parent);
|
||||||
|
s.add_to_account_cache(address, Some(Account::new_basic(2.into(), 0.into())), false);
|
||||||
|
s.commit(&mut batch, 0, &h0, None).unwrap();
|
||||||
|
s.sync_cache(&[], &[], true);
|
||||||
|
|
||||||
|
let mut s = state_db.boxed_clone_canon(&h0);
|
||||||
|
s.commit(&mut batch, 1, &h1a, None).unwrap();
|
||||||
|
s.sync_cache(&[], &[], true);
|
||||||
|
|
||||||
|
let mut s = state_db.boxed_clone_canon(&h0);
|
||||||
|
s.add_to_account_cache(address, Some(Account::new_basic(3.into(), 0.into())), true);
|
||||||
|
s.commit(&mut batch, 1, &h1b, None).unwrap();
|
||||||
|
s.sync_cache(&[], &[], false);
|
||||||
|
|
||||||
|
let mut s = state_db.boxed_clone_canon(&h1b);
|
||||||
|
s.add_to_account_cache(address, Some(Account::new_basic(4.into(), 0.into())), true);
|
||||||
|
s.commit(&mut batch, 2, &h2b, None).unwrap();
|
||||||
|
s.sync_cache(&[], &[], false);
|
||||||
|
|
||||||
|
let mut s = state_db.boxed_clone_canon(&h1a);
|
||||||
|
s.add_to_account_cache(address, Some(Account::new_basic(5.into(), 0.into())), true);
|
||||||
|
s.commit(&mut batch, 2, &h2a, None).unwrap();
|
||||||
|
s.sync_cache(&[], &[], true);
|
||||||
|
|
||||||
|
let mut s = state_db.boxed_clone_canon(&h2a);
|
||||||
|
s.commit(&mut batch, 3, &h3a, None).unwrap();
|
||||||
|
s.sync_cache(&[], &[], true);
|
||||||
|
|
||||||
|
let s = state_db.boxed_clone_canon(&h3a);
|
||||||
|
assert_eq!(s.get_cached_account(&address).unwrap().unwrap().balance(), &U256::from(5));
|
||||||
|
|
||||||
|
let s = state_db.boxed_clone_canon(&h1a);
|
||||||
|
assert!(s.get_cached_account(&address).is_none());
|
||||||
|
|
||||||
|
let s = state_db.boxed_clone_canon(&h2b);
|
||||||
|
assert!(s.get_cached_account(&address).is_none());
|
||||||
|
|
||||||
|
let s = state_db.boxed_clone_canon(&h1b);
|
||||||
|
assert!(s.get_cached_account(&address).is_none());
|
||||||
|
|
||||||
|
// reorg to 3b
|
||||||
|
// blocks [ 3b(c) 3a 2a 2b(c) 1b 1a 0 ]
|
||||||
|
let mut s = state_db.boxed_clone_canon(&h2b);
|
||||||
|
s.commit(&mut batch, 3, &h3b, None).unwrap();
|
||||||
|
s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], true);
|
||||||
|
let s = state_db.boxed_clone_canon(&h3a);
|
||||||
|
assert!(s.get_cached_account(&address).is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@ nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" }
|
|||||||
ethcore-ipc-nano = { path = "../nano" }
|
ethcore-ipc-nano = { path = "../nano" }
|
||||||
semver = "0.2"
|
semver = "0.2"
|
||||||
log = "0.3"
|
log = "0.3"
|
||||||
|
time = "0.1"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
ethcore-ipc-codegen = { path = "../codegen" }
|
ethcore-ipc-codegen = { path = "../codegen" }
|
||||||
|
@ -22,6 +22,7 @@ extern crate ethcore_ipc as ipc;
|
|||||||
extern crate ethcore_ipc_nano as nanoipc;
|
extern crate ethcore_ipc_nano as nanoipc;
|
||||||
extern crate semver;
|
extern crate semver;
|
||||||
#[macro_use] extern crate log;
|
#[macro_use] extern crate log;
|
||||||
|
extern crate time;
|
||||||
|
|
||||||
pub mod service;
|
pub mod service;
|
||||||
|
|
||||||
@ -187,23 +188,40 @@ impl Hypervisor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Waits for every required module to check in
|
/// Waits for every required module to check in
|
||||||
pub fn wait_for_shutdown(&self) {
|
pub fn wait_for_shutdown(&self) -> bool {
|
||||||
|
use time::{PreciseTime, Duration};
|
||||||
|
|
||||||
let mut worker = self.ipc_worker.write().unwrap();
|
let mut worker = self.ipc_worker.write().unwrap();
|
||||||
|
let start = PreciseTime::now();
|
||||||
while !self.modules_shutdown() {
|
while !self.modules_shutdown() {
|
||||||
worker.poll()
|
worker.poll();
|
||||||
|
if start.to(PreciseTime::now()) > Duration::seconds(30) {
|
||||||
|
warn!("Some modules failed to shutdown gracefully, they will be terminated.");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.modules_shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
/// Shutdown the ipc and all managed child processes
|
/// Shutdown the ipc and all managed child processes
|
||||||
pub fn shutdown(&self) {
|
pub fn shutdown(&self) {
|
||||||
let mut childs = self.processes.write().unwrap();
|
let mut childs = self.processes.write().unwrap();
|
||||||
for (ref mut module, _) in childs.iter_mut() {
|
for (ref module, _) in childs.iter() {
|
||||||
trace!(target: "hypervisor", "Stopping process module: {}", module);
|
trace!(target: "hypervisor", "Stopping process module: {}", module);
|
||||||
self.service.send_shutdown(**module);
|
self.service.send_shutdown(**module);
|
||||||
}
|
}
|
||||||
trace!(target: "hypervisor", "Waiting for shutdown...");
|
trace!(target: "hypervisor", "Waiting for shutdown...");
|
||||||
self.wait_for_shutdown();
|
if self.wait_for_shutdown() {
|
||||||
trace!(target: "hypervisor", "All modules reported shutdown");
|
trace!(target: "hypervisor", "All modules reported shutdown");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ref module, ref mut process) in childs.iter_mut() {
|
||||||
|
if self.service.is_running(**module) {
|
||||||
|
process.kill().unwrap();
|
||||||
|
trace!("Terminated {}", module);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,6 @@ pub struct ModuleState {
|
|||||||
shutdown: bool,
|
shutdown: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[ipc]
|
#[ipc]
|
||||||
pub trait ControlService {
|
pub trait ControlService {
|
||||||
fn shutdown(&self) -> bool;
|
fn shutdown(&self) -> bool;
|
||||||
@ -106,6 +105,10 @@ impl HypervisorService {
|
|||||||
self.modules.read().unwrap().iter().filter(|&(_, module)| module.started && !module.shutdown).count()
|
self.modules.read().unwrap().iter().filter(|&(_, module)| module.started && !module.shutdown).count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_running(&self, id: IpcModuleId) -> bool {
|
||||||
|
self.modules.read().unwrap().get(&id).map(|module| module.started && !module.shutdown).unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn send_shutdown(&self, module_id: IpcModuleId) {
|
pub fn send_shutdown(&self, module_id: IpcModuleId) {
|
||||||
let modules = self.modules.read().unwrap();
|
let modules = self.modules.read().unwrap();
|
||||||
modules.get(&module_id).map(|module| {
|
modules.get(&module_id).map(|module| {
|
||||||
|
Loading…
Reference in New Issue
Block a user