Merge pull request #7425 from paritytech/td-state-lock

Use RwLock for state DB
This commit is contained in:
Marek Kotewicz 2018-01-02 11:57:29 +01:00 committed by GitHub
commit 7deeb26e21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 21 deletions

View File

@ -152,7 +152,7 @@ pub struct Client {
config: ClientConfig, config: ClientConfig,
pruning: journaldb::Algorithm, pruning: journaldb::Algorithm,
db: RwLock<Arc<KeyValueDB>>, db: RwLock<Arc<KeyValueDB>>,
state_db: Mutex<StateDB>, state_db: RwLock<StateDB>,
block_queue: BlockQueue, block_queue: BlockQueue,
report: RwLock<ClientReport>, report: RwLock<ClientReport>,
import_lock: Mutex<()>, import_lock: Mutex<()>,
@ -166,7 +166,6 @@ pub struct Client {
last_hashes: RwLock<VecDeque<H256>>, last_hashes: RwLock<VecDeque<H256>>,
factories: Factories, factories: Factories,
history: u64, history: u64,
rng: Mutex<OsRng>,
ancient_verifier: Mutex<Option<AncientVerifier>>, ancient_verifier: Mutex<Option<AncientVerifier>>,
on_user_defaults_change: Mutex<Option<Box<FnMut(Option<Mode>) + 'static + Send>>>, on_user_defaults_change: Mutex<Option<Box<FnMut(Option<Mode>) + 'static + Send>>>,
registrar: Mutex<Option<Registry>>, registrar: Mutex<Option<Registry>>,
@ -242,7 +241,7 @@ impl Client {
verifier: verification::new(config.verifier_type.clone()), verifier: verification::new(config.verifier_type.clone()),
config: config, config: config,
db: RwLock::new(db), db: RwLock::new(db),
state_db: Mutex::new(state_db), state_db: RwLock::new(state_db),
block_queue: block_queue, block_queue: block_queue,
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
import_lock: Mutex::new(()), import_lock: Mutex::new(()),
@ -253,7 +252,6 @@ impl Client {
last_hashes: RwLock::new(VecDeque::new()), last_hashes: RwLock::new(VecDeque::new()),
factories: factories, factories: factories,
history: history, history: history,
rng: Mutex::new(OsRng::new().map_err(UtilError::from)?),
ancient_verifier: Mutex::new(None), ancient_verifier: Mutex::new(None),
on_user_defaults_change: Mutex::new(None), on_user_defaults_change: Mutex::new(None),
registrar: Mutex::new(None), registrar: Mutex::new(None),
@ -262,7 +260,7 @@ impl Client {
// prune old states. // prune old states.
{ {
let state_db = client.state_db.lock().boxed_clone(); let state_db = client.state_db.read().boxed_clone();
let chain = client.chain.read(); let chain = client.chain.read();
client.prune_ancient(state_db, &chain)?; client.prune_ancient(state_db, &chain)?;
} }
@ -452,7 +450,7 @@ impl Client {
// Enact Verified Block // Enact Verified Block
let last_hashes = self.build_last_hashes(header.parent_hash().clone()); let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let db = self.state_db.lock().boxed_clone_canon(header.parent_hash()); let db = self.state_db.read().boxed_clone_canon(header.parent_hash());
let is_epoch_begin = chain.epoch_transition(parent.number(), *header.parent_hash()).is_some(); let is_epoch_begin = chain.epoch_transition(parent.number(), *header.parent_hash()).is_some();
let enact_result = enact_verified(block, let enact_result = enact_verified(block,
@ -611,7 +609,8 @@ impl Client {
let verify_with = |verifier: &AncientVerifier| -> Result<(), ::error::Error> { let verify_with = |verifier: &AncientVerifier| -> Result<(), ::error::Error> {
// verify the block, passing the chain for updating the epoch // verify the block, passing the chain for updating the epoch
// verifier. // verifier.
verifier.verify(&mut *self.rng.lock(), &header, &chain) let mut rng = OsRng::new().map_err(UtilError::from)?;
verifier.verify(&mut rng, &header, &chain)
}; };
// initialize the ancient block verifier if we don't have one already. // initialize the ancient block verifier if we don't have one already.
@ -937,7 +936,7 @@ impl Client {
}; };
self.block_header(id).and_then(|header| { self.block_header(id).and_then(|header| {
let db = self.state_db.lock().boxed_clone(); let db = self.state_db.read().boxed_clone();
// early exit for pruned blocks // early exit for pruned blocks
if db.is_pruned() && self.pruning_info().earliest_state > block_number { if db.is_pruned() && self.pruning_info().earliest_state > block_number {
@ -968,7 +967,7 @@ impl Client {
pub fn state(&self) -> State<StateDB> { pub fn state(&self) -> State<StateDB> {
let header = self.best_block_header(); let header = self.best_block_header();
State::from_existing( State::from_existing(
self.state_db.lock().boxed_clone_canon(&header.hash()), self.state_db.read().boxed_clone_canon(&header.hash()),
header.state_root(), header.state_root(),
self.engine.account_start_nonce(header.number()), self.engine.account_start_nonce(header.number()),
self.factories.clone()) self.factories.clone())
@ -983,7 +982,7 @@ impl Client {
/// Get the report. /// Get the report.
pub fn report(&self) -> ClientReport { pub fn report(&self) -> ClientReport {
let mut report = self.report.read().clone(); let mut report = self.report.read().clone();
report.state_db_mem = self.state_db.lock().mem_used(); report.state_db_mem = self.state_db.read().mem_used();
report report
} }
@ -1039,7 +1038,7 @@ impl Client {
/// Take a snapshot at the given block. /// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind. /// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> { pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> {
let db = self.state_db.lock().journal_db().boxed_clone(); let db = self.state_db.read().journal_db().boxed_clone();
let best_block_number = self.chain_info().best_block_number; let best_block_number = self.chain_info().best_block_number;
let block_number = self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))?; let block_number = self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))?;
@ -1187,7 +1186,7 @@ impl snapshot::DatabaseRestore for Client {
trace!(target: "snapshot", "Replacing client database with {:?}", new_db); trace!(target: "snapshot", "Replacing client database with {:?}", new_db);
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let mut state_db = self.state_db.lock(); let mut state_db = self.state_db.write();
let mut chain = self.chain.write(); let mut chain = self.chain.write();
let mut tracedb = self.tracedb.write(); let mut tracedb = self.tracedb.write();
self.miner.clear(); self.miner.clear();
@ -1608,7 +1607,7 @@ impl BlockChainClient for Client {
} }
fn state_data(&self, hash: &H256) -> Option<Bytes> { fn state_data(&self, hash: &H256) -> Option<Bytes> {
self.state_db.lock().journal_db().state(hash) self.state_db.read().journal_db().state(hash)
} }
fn block_receipts(&self, hash: &H256) -> Option<Bytes> { fn block_receipts(&self, hash: &H256) -> Option<Bytes> {
@ -1790,7 +1789,7 @@ impl BlockChainClient for Client {
fn pruning_info(&self) -> PruningInfo { fn pruning_info(&self) -> PruningInfo {
PruningInfo { PruningInfo {
earliest_chain: self.chain.read().first_block_number().unwrap_or(1), earliest_chain: self.chain.read().first_block_number().unwrap_or(1),
earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0), earliest_state: self.state_db.read().journal_db().earliest_era().unwrap_or(0),
} }
} }
@ -1858,7 +1857,7 @@ impl MiningBlockChainClient for Client {
engine, engine,
self.factories.clone(), self.factories.clone(),
self.tracedb.read().tracing_enabled(), self.tracedb.read().tracing_enabled(),
self.state_db.lock().boxed_clone_canon(&h), self.state_db.read().boxed_clone_canon(&h),
best_header, best_header,
self.build_last_hashes(h.clone()), self.build_last_hashes(h.clone()),
author, author,
@ -1943,7 +1942,7 @@ impl MiningBlockChainClient for Client {
let route = self.commit_block(block, &header, &block_data); let route = self.commit_block(block, &header, &block_data);
trace!(target: "client", "Imported sealed block #{} ({})", number, h); trace!(target: "client", "Imported sealed block #{} ({})", number, h);
self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false); self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route route
}; };
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
@ -2012,7 +2011,7 @@ impl ProvingBlockChainClient for Client {
}; };
env_info.gas_limit = transaction.gas.clone(); env_info.gas_limit = transaction.gas.clone();
let mut jdb = self.state_db.lock().journal_db().boxed_clone(); let mut jdb = self.state_db.read().journal_db().boxed_clone();
state::prove_transaction( state::prove_transaction(
jdb.as_hashdb_mut(), jdb.as_hashdb_mut(),

View File

@ -58,7 +58,7 @@ struct CacheQueueItem {
/// Account address. /// Account address.
address: Address, address: Address,
/// Acccount data or `None` if account does not exist. /// Acccount data or `None` if account does not exist.
account: Option<Account>, account: Mutex<Option<Account>>,
/// Indicates that the account was modified before being /// Indicates that the account was modified before being
/// added to the cache. /// added to the cache.
modified: bool, modified: bool,
@ -268,15 +268,16 @@ impl StateDB {
modifications.insert(account.address.clone()); modifications.insert(account.address.clone());
} }
if is_best { if is_best {
let acc = account.account.lock().take();
if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) { if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) {
if let Some(new) = account.account { if let Some(new) = acc {
if account.modified { if account.modified {
existing.overwrite_with(new); existing.overwrite_with(new);
} }
continue; continue;
} }
} }
cache.accounts.insert(account.address, account.account); cache.accounts.insert(account.address, acc);
} }
} }
@ -408,7 +409,7 @@ impl state::Backend for StateDB {
fn add_to_account_cache(&mut self, addr: Address, data: Option<Account>, modified: bool) { fn add_to_account_cache(&mut self, addr: Address, data: Option<Account>, modified: bool) {
self.local_cache.push(CacheQueueItem { self.local_cache.push(CacheQueueItem {
address: addr, address: addr,
account: data, account: Mutex::new(data),
modified: modified, modified: modified,
}) })
} }