From 5354a0905ebec1d4fd2e1b36db48dee30691d5ad Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 7 Oct 2016 14:10:53 +0300 Subject: [PATCH 1/2] terminate after 30 seconds (#2513) --- Cargo.lock | 1 + ipc/hypervisor/Cargo.toml | 1 + ipc/hypervisor/src/lib.rs | 28 +++++++++++++++++++++++----- ipc/hypervisor/src/service.rs.in | 5 ++++- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04bc46c9c..3781bc589 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -399,6 +399,7 @@ dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/ipc/hypervisor/Cargo.toml b/ipc/hypervisor/Cargo.toml index a4c462bd0..d730b9bcf 100644 --- a/ipc/hypervisor/Cargo.toml +++ b/ipc/hypervisor/Cargo.toml @@ -13,6 +13,7 @@ nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" } ethcore-ipc-nano = { path = "../nano" } semver = "0.2" log = "0.3" +time = "0.1" [build-dependencies] ethcore-ipc-codegen = { path = "../codegen" } diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index 78b8b04ce..c7543ca91 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -22,6 +22,7 @@ extern crate ethcore_ipc as ipc; extern crate ethcore_ipc_nano as nanoipc; extern crate semver; #[macro_use] extern crate log; +extern crate time; pub mod service; @@ -187,23 +188,40 @@ impl Hypervisor { } /// Waits for every required module to check in - pub fn wait_for_shutdown(&self) { + pub fn wait_for_shutdown(&self) -> bool { + use time::{PreciseTime, Duration}; + let mut worker = self.ipc_worker.write().unwrap(); + let start = PreciseTime::now(); while !self.modules_shutdown() { - worker.poll() + worker.poll(); + if start.to(PreciseTime::now()) > Duration::seconds(30) { + warn!("Some modules failed to shutdown gracefully, they will be terminated."); + break; + } } + self.modules_shutdown() } /// Shutdown the ipc and all managed child processes pub fn shutdown(&self) { let mut childs = self.processes.write().unwrap(); - for (ref mut module, _) in childs.iter_mut() { + for (ref module, _) in childs.iter() { trace!(target: "hypervisor", "Stopping process module: {}", module); self.service.send_shutdown(**module); } trace!(target: "hypervisor", "Waiting for shutdown..."); - self.wait_for_shutdown(); - trace!(target: "hypervisor", "All modules reported shutdown"); + if self.wait_for_shutdown() { + trace!(target: "hypervisor", "All modules reported shutdown"); + return; + } + + for (ref module, ref mut process) in childs.iter_mut() { + if self.service.is_running(**module) { + process.kill().unwrap(); + trace!("Terminated {}", module); + } + } } } diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 6996765ec..e80a1ec30 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -39,7 +39,6 @@ pub struct ModuleState { shutdown: bool, } - #[ipc] pub trait ControlService { fn shutdown(&self) -> bool; @@ -106,6 +105,10 @@ impl HypervisorService { self.modules.read().unwrap().iter().filter(|&(_, module)| module.started && !module.shutdown).count() } + pub fn is_running(&self, id: IpcModuleId) -> bool { + self.modules.read().unwrap().get(&id).map(|module| module.started && !module.shutdown).unwrap_or(false) + } + pub fn send_shutdown(&self, module_id: IpcModuleId) { let modules = self.modules.read().unwrap(); modules.get(&module_id).map(|module| { From 72ec9366ad45526d16f6c78b5dfb32d16ba5aa6c Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Fri, 7 Oct 2016 13:34:32 +0200 Subject: [PATCH 2/2] Handle reorganizations in the state cache (#2490) * Handle reorganizations in the state cache * Renamed and documented a few methods * Basic test * Renamed pending to buffered * Updated cache on sealed block * More renames and updated documentation * Minor doc tweaks --- ethcore/src/client/client.rs | 12 +- ethcore/src/state/mod.rs | 11 +- ethcore/src/state_db.rs | 370 +++++++++++++++++++++++++++++------ 3 files changed, 323 insertions(+), 70 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index a5a353702..a5bc63922 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -304,8 +304,7 @@ impl Client { // Enact Verified Block let parent = chain_has_parent.unwrap(); let last_hashes = self.build_last_hashes(header.parent_hash().clone()); - let is_canon = header.parent_hash() == &chain.best_block_hash(); - let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() }; + let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash()); let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone()); if let Err(e) = enact_result { @@ -459,6 +458,8 @@ impl Client { enacted: route.enacted.clone(), retracted: route.retracted.len() }); + let is_canon = route.enacted.last().map_or(false, |h| h == hash); + state.sync_cache(&route.enacted, &route.retracted, is_canon); // Final commit to the DB self.db.read().write_buffered(batch); chain.commit(); @@ -533,9 +534,11 @@ impl Client { /// Get a copy of the best block's state. pub fn state(&self) -> State { + let header = self.best_block_header(); + let header = HeaderView::new(&header); State::from_existing( - self.state_db.lock().boxed_clone(), - HeaderView::new(&self.best_block_header()).state_root(), + self.state_db.lock().boxed_clone_canon(&header.hash()), + header.state_root(), self.engine.account_start_nonce(), self.factories.clone()) .expect("State root of best block header always valid.") @@ -1128,6 +1131,7 @@ impl MiningBlockChainClient for Client { let block_data = block.rlp_bytes(); let route = self.commit_block(block, &h, &block_data); trace!(target: "client", "Imported sealed block #{} ({})", number, h); + self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false); let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted); diff --git a/ethcore/src/state/mod.rs b/ethcore/src/state/mod.rs index 03f6d9fad..39c8bbc11 100644 --- a/ethcore/src/state/mod.rs +++ b/ethcore/src/state/mod.rs @@ -164,8 +164,8 @@ impl AccountEntry { /// use that. /// **************************************************************************** /// -/// Upon destruction all the local cache data merged into the global cache. -/// The merge might be rejected if current state is non-canonical. +/// Upon destruction all the local cache data propagated into the global cache. +/// Propagated items might be rejected if current state is non-canonical. /// /// State snapshotting. /// @@ -318,7 +318,7 @@ impl State { /// Destroy the current object and return root and database. pub fn drop(mut self) -> (H256, StateDB) { - self.commit_cache(); + self.propagate_to_global_cache(); (self.root, self.db) } @@ -533,11 +533,12 @@ impl State { Ok(()) } - fn commit_cache(&mut self) { + /// Propagate local cache into shared canonical state cache. + fn propagate_to_global_cache(&mut self) { let mut addresses = self.cache.borrow_mut(); trace!("Committing cache {:?} entries", addresses.len()); for (address, a) in addresses.drain().filter(|&(_, ref a)| a.state == AccountState::Committed || a.state == AccountState::CleanFresh) { - self.db.cache_account(address, a.account); + self.db.add_to_account_cache(address, a.account, a.state == AccountState::Committed); } } diff --git a/ethcore/src/state_db.rs b/ethcore/src/state_db.rs index 8ba50c3d6..04db274c4 100644 --- a/ethcore/src/state_db.rs +++ b/ethcore/src/state_db.rs @@ -14,56 +14,94 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::collections::{VecDeque, HashSet}; use lru_cache::LruCache; use util::journaldb::JournalDB; use util::hash::{H256}; use util::hashdb::HashDB; use state::Account; +use header::BlockNumber; use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable}; use bloom_journal::{Bloom, BloomJournal}; use db::COL_ACCOUNT_BLOOM; use byteorder::{LittleEndian, ByteOrder}; const STATE_CACHE_ITEMS: usize = 256000; +const STATE_CACHE_BLOCKS: usize = 8; pub const ACCOUNT_BLOOM_SPACE: usize = 1048576; pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000; pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count"; +/// Shared canonical state cache. struct AccountCache { /// DB Account cache. `None` indicates that account is known to be missing. accounts: LruCache>, + /// Information on the modifications in recently committed blocks; specifically which addresses + /// changed in which block. Ordered by block number. + modifications: VecDeque, +} + +/// Buffered account cache item. +struct CacheQueueItem { + /// Account address. + address: Address, + /// Acccount data or `None` if account does not exist. + account: Option, + /// Indicates that the account was modified before being + /// added to the cache. + modified: bool, +} + +#[derive(Debug)] +/// Accumulates a list of accounts changed in a block. +struct BlockChanges { + /// Block number. + number: BlockNumber, + /// Block hash. + hash: H256, + /// Parent block hash. + parent: H256, + /// A set of modified account addresses. + accounts: HashSet
, + /// Block is part of the canonical chain. + is_canon: bool, } /// State database abstraction. -/// Manages shared global state cache. +/// Manages shared global state cache which reflects the canonical +/// state as it is on the disk. All the entries in the cache are clean. /// A clone of `StateDB` may be created as canonical or not. -/// For canonical clones cache changes are accumulated and applied -/// on commit. -/// For non-canonical clones cache is cleared on commit. +/// For canonical clones local cache is accumulated and applied +/// in `sync_cache` +/// For non-canonical clones local cache is dropped. +/// +/// Global cache propagation. +/// After a `State` object has been committed to the trie it +/// propagates its local cache into the `StateDB` local cache +/// using `add_to_account_cache` function. +/// Then, after the block has been added to the chain the local cache in the +/// `StateDB` is propagated into the global cache. pub struct StateDB { + /// Backing database. db: Box, + /// Shared canonical state cache. account_cache: Arc>, - cache_overlay: Vec<(Address, Option)>, - is_canon: bool, + /// Local dirty cache. + local_cache: Vec, + /// Shared account bloom. Does not handle chain reorganizations. account_bloom: Arc>, + /// Hash of the block on top of which this instance was created or + /// `None` if cache is disabled + parent_hash: Option, + /// Hash of the committing block or `None` if not committed yet. + commit_hash: Option, + /// Number of the committing block or `None` if not committed yet. + commit_number: Option, } impl StateDB { - - /// Create a new instance wrapping `JournalDB` - pub fn new(db: Box) -> StateDB { - let bloom = Self::load_bloom(db.backing()); - StateDB { - db: db, - account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(STATE_CACHE_ITEMS) })), - cache_overlay: Vec::new(), - is_canon: false, - account_bloom: Arc::new(Mutex::new(bloom)), - } - } - /// Loads accounts bloom from the database /// This bloom is used to handle request for the non-existant account fast pub fn load_bloom(db: &Database) -> Bloom { @@ -91,6 +129,23 @@ impl StateDB { bloom } + /// Create a new instance wrapping `JournalDB` + pub fn new(db: Box) -> StateDB { + let bloom = Self::load_bloom(db.backing()); + StateDB { + db: db, + account_cache: Arc::new(Mutex::new(AccountCache { + accounts: LruCache::new(STATE_CACHE_ITEMS), + modifications: VecDeque::new(), + })), + local_cache: Vec::new(), + account_bloom: Arc::new(Mutex::new(bloom)), + parent_hash: None, + commit_hash: None, + commit_number: None, + } + } + pub fn check_account_bloom(&self, address: &Address) -> bool { trace!(target: "account_bloom", "Check account bloom: {:?}", address); let bloom = self.account_bloom.lock(); @@ -125,14 +180,107 @@ impl StateDB { try!(Self::commit_bloom(batch, bloom_lock.drain_journal())); } let records = try!(self.db.commit(batch, now, id, end)); - if self.is_canon { - self.commit_cache(); - } else { - self.clear_cache(); - } + self.commit_hash = Some(id.clone()); + self.commit_number = Some(now); Ok(records) } + /// Propagate local cache into the global cache and synchonize + /// the global cache with the best block state. + /// This function updates the global cache by removing entries + /// that are invalidated by chain reorganization. `sync_cache` + /// should be called after the block has been committed and the + /// blockchain route has ben calculated. + pub fn sync_cache(&mut self, enacted: &[H256], retracted: &[H256], is_best: bool) { + trace!("sync_cache id = (#{:?}, {:?}), parent={:?}, best={}", self.commit_number, self.commit_hash, self.parent_hash, is_best); + let mut cache = self.account_cache.lock(); + let mut cache = &mut *cache; + + // Purge changes from re-enacted and retracted blocks. + // Filter out commiting block if any. + let mut clear = false; + for block in enacted.iter().filter(|h| self.commit_hash.as_ref().map_or(true, |p| *h != p)) { + clear = clear || { + if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) { + trace!("Reverting enacted block {:?}", block); + m.is_canon = true; + for a in &m.accounts { + trace!("Reverting enacted address {:?}", a); + cache.accounts.remove(a); + } + false + } else { + true + } + }; + } + + for block in retracted { + clear = clear || { + if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) { + trace!("Retracting block {:?}", block); + m.is_canon = false; + for a in &m.accounts { + trace!("Retracted address {:?}", a); + cache.accounts.remove(a); + } + false + } else { + true + } + }; + } + if clear { + // We don't know anything about the block; clear everything + trace!("Wiping cache"); + cache.accounts.clear(); + cache.modifications.clear(); + } + + // Propagate cache only if committing on top of the latest canonical state + // blocks are ordered by number and only one block with a given number is marked as canonical + // (contributed to canonical state cache) + if let (Some(ref number), Some(ref hash), Some(ref parent)) = (self.commit_number, self.commit_hash, self.parent_hash) { + if cache.modifications.len() == STATE_CACHE_BLOCKS { + cache.modifications.pop_back(); + } + let mut modifications = HashSet::new(); + trace!("committing {} cache entries", self.local_cache.len()); + for account in self.local_cache.drain(..) { + if account.modified { + modifications.insert(account.address.clone()); + } + if is_best { + if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) { + if let Some(new) = account.account { + if account.modified { + existing.overwrite_with(new); + } + continue; + } + } + cache.accounts.insert(account.address, account.account); + } + } + + // Save modified accounts. These are ordered by the block number. + let block_changes = BlockChanges { + accounts: modifications, + number: *number, + hash: hash.clone(), + is_canon: is_best, + parent: parent.clone(), + }; + let insert_at = cache.modifications.iter().enumerate().find(|&(_, ref m)| m.number < *number).map(|(i, _)| i); + trace!("inserting modifications at {:?}", insert_at); + if let Some(insert_at) = insert_at { + cache.modifications.insert(insert_at, block_changes); + } else { + cache.modifications.push_back(block_changes); + } + } + } + /// Returns an interface to HashDB. pub fn as_hashdb(&self) -> &HashDB { self.db.as_hashdb() @@ -148,20 +296,24 @@ impl StateDB { StateDB { db: self.db.boxed_clone(), account_cache: self.account_cache.clone(), - cache_overlay: Vec::new(), - is_canon: false, + local_cache: Vec::new(), account_bloom: self.account_bloom.clone(), + parent_hash: None, + commit_hash: None, + commit_number: None, } } /// Clone the database for a canonical state. - pub fn boxed_clone_canon(&self) -> StateDB { + pub fn boxed_clone_canon(&self, parent: &H256) -> StateDB { StateDB { db: self.db.boxed_clone(), account_cache: self.account_cache.clone(), - cache_overlay: Vec::new(), - is_canon: true, + local_cache: Vec::new(), account_bloom: self.account_bloom.clone(), + parent_hash: Some(parent.clone()), + commit_hash: None, + commit_number: None, } } @@ -180,53 +332,149 @@ impl StateDB { &*self.db } - /// Enqueue cache change. - pub fn cache_account(&mut self, addr: Address, data: Option) { - self.cache_overlay.push((addr, data)); - } - - /// Apply pending cache changes. - fn commit_cache(&mut self) { - let mut cache = self.account_cache.lock(); - for (address, account) in self.cache_overlay.drain(..) { - if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&address) { - if let Some(new) = account { - existing.overwrite_with(new); - continue; - } - } - cache.accounts.insert(address, account); - } - } - - /// Clear the cache. - pub fn clear_cache(&mut self) { - self.cache_overlay.clear(); - let mut cache = self.account_cache.lock(); - cache.accounts.clear(); + /// Add a local cache entry. + /// The entry will be propagated to the global cache in `sync_cache`. + /// `modified` indicates that the entry was changed since being read from disk or global cache. + /// `data` can be set to an existing (`Some`), or non-existing account (`None`). + pub fn add_to_account_cache(&mut self, addr: Address, data: Option, modified: bool) { + self.local_cache.push(CacheQueueItem { + address: addr, + account: data, + modified: modified, + }) } /// Get basic copy of the cached account. Does not include storage. - /// Returns 'None' if the state is non-canonical and cache is disabled - /// or if the account is not cached. + /// Returns 'None' if cache is disabled or if the account is not cached. pub fn get_cached_account(&self, addr: &Address) -> Option> { - if !self.is_canon { + let mut cache = self.account_cache.lock(); + if !Self::is_allowed(addr, &self.parent_hash, &cache.modifications) { return None; } - let mut cache = self.account_cache.lock(); cache.accounts.get_mut(&addr).map(|a| a.as_ref().map(|a| a.clone_basic())) } /// Get value from a cached account. - /// Returns 'None' if the state is non-canonical and cache is disabled - /// or if the account is not cached. + /// Returns 'None' if cache is disabled or if the account is not cached. pub fn get_cached(&self, a: &Address, f: F) -> Option where F: FnOnce(Option<&mut Account>) -> U { - if !self.is_canon { + let mut cache = self.account_cache.lock(); + if !Self::is_allowed(a, &self.parent_hash, &cache.modifications) { return None; } - let mut cache = self.account_cache.lock(); cache.accounts.get_mut(a).map(|c| f(c.as_mut())) } + + /// Check if the account can be returned from cache by matching current block parent hash against canonical + /// state and filtering out account modified in later blocks. + fn is_allowed(addr: &Address, parent_hash: &Option, modifications: &VecDeque) -> bool { + let mut parent = match *parent_hash { + None => { + trace!("Cache lookup skipped for {:?}: no parent hash", addr); + return false; + } + Some(ref parent) => parent, + }; + if modifications.is_empty() { + return true; + } + // Ignore all accounts modified in later blocks + // Modifications contains block ordered by the number + // We search for our parent in that list first and then for + // all its parent until we hit the canonical block, + // checking against all the intermediate modifications. + let mut iter = modifications.iter(); + while let Some(ref m) = iter.next() { + if &m.hash == parent { + if m.is_canon { + return true; + } + parent = &m.parent; + } + if m.accounts.contains(addr) { + trace!("Cache lookup skipped for {:?}: modified in a later block", addr); + return false; + } + } + trace!("Cache lookup skipped for {:?}: parent hash is unknown", addr); + return false; + } +} + +#[cfg(test)] +mod tests { + +use util::{U256, H256, FixedHash, Address, DBTransaction}; +use tests::helpers::*; +use state::Account; +use util::log::init_log; + +#[test] +fn state_db_smoke() { + init_log(); + + let mut state_db_result = get_temp_state_db(); + let state_db = state_db_result.take(); + let root_parent = H256::random(); + let address = Address::random(); + let h0 = H256::random(); + let h1a = H256::random(); + let h1b = H256::random(); + let h2a = H256::random(); + let h2b = H256::random(); + let h3a = H256::random(); + let h3b = H256::random(); + let mut batch = DBTransaction::new(state_db.journal_db().backing()); + + // blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ] + // balance [ 5 5 4 3 2 2 ] + let mut s = state_db.boxed_clone_canon(&root_parent); + s.add_to_account_cache(address, Some(Account::new_basic(2.into(), 0.into())), false); + s.commit(&mut batch, 0, &h0, None).unwrap(); + s.sync_cache(&[], &[], true); + + let mut s = state_db.boxed_clone_canon(&h0); + s.commit(&mut batch, 1, &h1a, None).unwrap(); + s.sync_cache(&[], &[], true); + + let mut s = state_db.boxed_clone_canon(&h0); + s.add_to_account_cache(address, Some(Account::new_basic(3.into(), 0.into())), true); + s.commit(&mut batch, 1, &h1b, None).unwrap(); + s.sync_cache(&[], &[], false); + + let mut s = state_db.boxed_clone_canon(&h1b); + s.add_to_account_cache(address, Some(Account::new_basic(4.into(), 0.into())), true); + s.commit(&mut batch, 2, &h2b, None).unwrap(); + s.sync_cache(&[], &[], false); + + let mut s = state_db.boxed_clone_canon(&h1a); + s.add_to_account_cache(address, Some(Account::new_basic(5.into(), 0.into())), true); + s.commit(&mut batch, 2, &h2a, None).unwrap(); + s.sync_cache(&[], &[], true); + + let mut s = state_db.boxed_clone_canon(&h2a); + s.commit(&mut batch, 3, &h3a, None).unwrap(); + s.sync_cache(&[], &[], true); + + let s = state_db.boxed_clone_canon(&h3a); + assert_eq!(s.get_cached_account(&address).unwrap().unwrap().balance(), &U256::from(5)); + + let s = state_db.boxed_clone_canon(&h1a); + assert!(s.get_cached_account(&address).is_none()); + + let s = state_db.boxed_clone_canon(&h2b); + assert!(s.get_cached_account(&address).is_none()); + + let s = state_db.boxed_clone_canon(&h1b); + assert!(s.get_cached_account(&address).is_none()); + + // reorg to 3b + // blocks [ 3b(c) 3a 2a 2b(c) 1b 1a 0 ] + let mut s = state_db.boxed_clone_canon(&h2b); + s.commit(&mut batch, 3, &h3b, None).unwrap(); + s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], true); + let s = state_db.boxed_clone_canon(&h3a); + assert!(s.get_cached_account(&address).is_none()); +} }