fixed cache_manager lock order

This commit is contained in:
debris 2016-08-08 16:14:37 +02:00
parent 35451e31d4
commit c5a0024eeb
3 changed files with 89 additions and 51 deletions

View File

@ -133,8 +133,9 @@ enum CacheID {
impl bc::group::BloomGroupDatabase for BlockChain { impl bc::group::BloomGroupDatabase for BlockChain {
fn blooms_at(&self, position: &bc::group::GroupPosition) -> Option<bc::group::BloomGroup> { fn blooms_at(&self, position: &bc::group::GroupPosition) -> Option<bc::group::BloomGroup> {
let position = LogGroupPosition::from(position.clone()); let position = LogGroupPosition::from(position.clone());
self.note_used(CacheID::BlocksBlooms(position.clone())); let result = self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into);
self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into) self.note_used(CacheID::BlocksBlooms(position));
result
} }
} }
@ -211,9 +212,7 @@ impl BlockProvider for BlockChain {
let opt = self.db.get(DB_COL_HEADERS, hash) let opt = self.db.get(DB_COL_HEADERS, hash)
.expect("Low level database error. Some issue with disk?"); .expect("Low level database error. Some issue with disk?");
self.note_used(CacheID::BlockHeader(hash.clone())); let result = match opt {
match opt {
Some(b) => { Some(b) => {
let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec(); let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec();
let mut write = self.block_headers.write(); let mut write = self.block_headers.write();
@ -221,7 +220,10 @@ impl BlockProvider for BlockChain {
Some(bytes) Some(bytes)
}, },
None => None None => None
} };
self.note_used(CacheID::BlockHeader(hash.clone()));
result
} }
/// Get block body data /// Get block body data
@ -246,9 +248,7 @@ impl BlockProvider for BlockChain {
let opt = self.db.get(DB_COL_BODIES, hash) let opt = self.db.get(DB_COL_BODIES, hash)
.expect("Low level database error. Some issue with disk?"); .expect("Low level database error. Some issue with disk?");
self.note_used(CacheID::BlockBody(hash.clone())); let result = match opt {
match opt {
Some(b) => { Some(b) => {
let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec(); let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec();
let mut write = self.block_bodies.write(); let mut write = self.block_bodies.write();
@ -256,31 +256,39 @@ impl BlockProvider for BlockChain {
Some(bytes) Some(bytes)
}, },
None => None None => None
} };
self.note_used(CacheID::BlockBody(hash.clone()));
result
} }
/// Get the familial details concerning a block. /// Get the familial details concerning a block.
fn block_details(&self, hash: &H256) -> Option<BlockDetails> { fn block_details(&self, hash: &H256) -> Option<BlockDetails> {
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash);
self.note_used(CacheID::BlockDetails(hash.clone())); self.note_used(CacheID::BlockDetails(hash.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash) result
} }
/// Get the hash of given block's number. /// Get the hash of given block's number.
fn block_hash(&self, index: BlockNumber) -> Option<H256> { fn block_hash(&self, index: BlockNumber) -> Option<H256> {
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index);
self.note_used(CacheID::BlockHashes(index)); self.note_used(CacheID::BlockHashes(index));
self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index) result
} }
/// Get the address of transaction with given hash. /// Get the address of transaction with given hash.
fn transaction_address(&self, hash: &H256) -> Option<TransactionAddress> { fn transaction_address(&self, hash: &H256) -> Option<TransactionAddress> {
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash);
self.note_used(CacheID::TransactionAddresses(hash.clone())); self.note_used(CacheID::TransactionAddresses(hash.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash) result
} }
/// Get receipts of block with given hash. /// Get receipts of block with given hash.
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> { fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> {
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash);
self.note_used(CacheID::BlockReceipts(hash.clone())); self.note_used(CacheID::BlockReceipts(hash.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash) result
} }
/// Returns numbers of blocks containing given bloom. /// Returns numbers of blocks containing given bloom.
@ -635,11 +643,12 @@ impl BlockChain {
let mut update = HashMap::new(); let mut update = HashMap::new();
update.insert(block_hash, parent_details); update.insert(block_hash, parent_details);
self.note_used(CacheID::BlockDetails(block_hash));
let mut write_details = self.block_details.write(); let mut write_details = self.block_details.write();
batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite);
self.note_used(CacheID::BlockDetails(block_hash));
self.db.write(batch).unwrap(); self.db.write(batch).unwrap();
} }
@ -730,12 +739,14 @@ impl BlockChain {
/// Prepares extras update. /// Prepares extras update.
fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate, is_best: bool) { fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate, is_best: bool) {
{ {
for hash in update.block_details.keys().cloned() { let block_hashes: Vec<_> = update.block_details.keys().cloned().collect();
self.note_used(CacheID::BlockDetails(hash));
}
let mut write_details = self.block_details.write(); let mut write_details = self.block_details.write();
batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update.block_details, CacheUpdatePolicy::Overwrite); batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update.block_details, CacheUpdatePolicy::Overwrite);
for hash in block_hashes.into_iter() {
self.note_used(CacheID::BlockDetails(hash));
}
} }
{ {
@ -779,13 +790,6 @@ impl BlockChain {
let mut pending_write_hashes = self.pending_block_hashes.write(); let mut pending_write_hashes = self.pending_block_hashes.write();
let mut pending_write_txs = self.pending_transaction_addresses.write(); let mut pending_write_txs = self.pending_transaction_addresses.write();
for n in pending_write_hashes.keys() {
self.note_used(CacheID::BlockHashes(*n));
}
for hash in pending_write_txs.keys() {
self.note_used(CacheID::TransactionAddresses(hash.clone()));
}
let mut best_block = self.best_block.write(); let mut best_block = self.best_block.write();
let mut write_hashes = self.block_hashes.write(); let mut write_hashes = self.block_hashes.write();
let mut write_txs = self.transaction_addresses.write(); let mut write_txs = self.transaction_addresses.write();
@ -794,8 +798,19 @@ impl BlockChain {
*best_block = block; *best_block = block;
} }
let pending_hashes_keys: Vec<_> = pending_write_hashes.keys().cloned().collect();
let pending_txs_keys: Vec<_> = pending_write_txs.keys().cloned().collect();
write_hashes.extend(mem::replace(&mut *pending_write_hashes, HashMap::new())); write_hashes.extend(mem::replace(&mut *pending_write_hashes, HashMap::new()));
write_txs.extend(mem::replace(&mut *pending_write_txs, HashMap::new())); write_txs.extend(mem::replace(&mut *pending_write_txs, HashMap::new()));
for n in pending_hashes_keys.into_iter() {
self.note_used(CacheID::BlockHashes(n));
}
for hash in pending_txs_keys.into_iter() {
self.note_used(CacheID::TransactionAddresses(hash));
}
} }
/// Iterator that lists `first` and then all of `first`'s ancestors, by hash. /// Iterator that lists `first` and then all of `first`'s ancestors, by hash.
@ -1000,8 +1015,8 @@ impl BlockChain {
/// Ticks our cache system and throws out any old data. /// Ticks our cache system and throws out any old data.
pub fn collect_garbage(&self) { pub fn collect_garbage(&self) {
let mut cache_man = self.cache_man.write(); let current_size = self.cache_size().total();
cache_man.collect_garbage(|| self.cache_size().total(), | ids | {
let mut block_headers = self.block_headers.write(); let mut block_headers = self.block_headers.write();
let mut block_bodies = self.block_bodies.write(); let mut block_bodies = self.block_bodies.write();
let mut block_details = self.block_details.write(); let mut block_details = self.block_details.write();
@ -1010,6 +1025,8 @@ impl BlockChain {
let mut blocks_blooms = self.blocks_blooms.write(); let mut blocks_blooms = self.blocks_blooms.write();
let mut block_receipts = self.block_receipts.write(); let mut block_receipts = self.block_receipts.write();
let mut cache_man = self.cache_man.write();
cache_man.collect_garbage(current_size, | ids | {
for id in &ids { for id in &ids {
match *id { match *id {
CacheID::BlockHeader(ref h) => { block_headers.remove(h); }, CacheID::BlockHeader(ref h) => { block_headers.remove(h); },
@ -1021,6 +1038,7 @@ impl BlockChain {
CacheID::BlockReceipts(ref h) => { block_receipts.remove(h); } CacheID::BlockReceipts(ref h) => { block_receipts.remove(h); }
} }
} }
block_headers.shrink_to_fit(); block_headers.shrink_to_fit();
block_bodies.shrink_to_fit(); block_bodies.shrink_to_fit();
block_details.shrink_to_fit(); block_details.shrink_to_fit();
@ -1028,6 +1046,14 @@ impl BlockChain {
transaction_addresses.shrink_to_fit(); transaction_addresses.shrink_to_fit();
blocks_blooms.shrink_to_fit(); blocks_blooms.shrink_to_fit();
block_receipts.shrink_to_fit(); block_receipts.shrink_to_fit();
block_headers.heap_size_of_children() +
block_bodies.heap_size_of_children() +
block_details.heap_size_of_children() +
block_hashes.heap_size_of_children() +
transaction_addresses.heap_size_of_children() +
blocks_blooms.heap_size_of_children() +
block_receipts.heap_size_of_children()
}); });
} }

View File

@ -45,16 +45,19 @@ impl<T> CacheManager<T> where T: Eq + Hash {
} }
} }
pub fn collect_garbage<C, F>(&mut self, current_size: C, mut notify_unused: F) where C: Fn() -> usize, F: FnMut(HashSet<T>) { /// Collects unused objects from cache.
if current_size() < self.pref_cache_size { /// First params is the current size of the cache.
/// Second one is an with objects to remove. It should also return new size of the cache.
pub fn collect_garbage<F>(&mut self, current_size: usize, mut notify_unused: F) where F: FnMut(HashSet<T>) -> usize {
if current_size < self.pref_cache_size {
self.rotate_cache_if_needed(); self.rotate_cache_if_needed();
return; return;
} }
for _ in 0..COLLECTION_QUEUE_SIZE { for _ in 0..COLLECTION_QUEUE_SIZE {
notify_unused(self.cache_usage.pop_back().unwrap()); let current_size = notify_unused(self.cache_usage.pop_back().unwrap());
self.cache_usage.push_front(Default::default()); self.cache_usage.push_front(Default::default());
if current_size() < self.max_cache_size { if current_size < self.max_cache_size {
break; break;
} }
} }

View File

@ -119,8 +119,9 @@ pub struct TraceDB<T> where T: DatabaseExtras {
impl<T> BloomGroupDatabase for TraceDB<T> where T: DatabaseExtras { impl<T> BloomGroupDatabase for TraceDB<T> where T: DatabaseExtras {
fn blooms_at(&self, position: &GroupPosition) -> Option<BloomGroup> { fn blooms_at(&self, position: &GroupPosition) -> Option<BloomGroup> {
let position = TraceGroupPosition::from(position.clone()); let position = TraceGroupPosition::from(position.clone());
self.note_used(CacheID::Bloom(position.clone())); let result = self.tracesdb.read_with_cache(DB_COL_TRACE, &self.blooms, &position).map(Into::into);
self.tracesdb.read_with_cache(DB_COL_TRACE, &self.blooms, &position).map(Into::into) self.note_used(CacheID::Bloom(position));
result
} }
} }
@ -174,11 +175,13 @@ impl<T> TraceDB<T> where T: DatabaseExtras {
/// Ticks our cache system and throws out any old data. /// Ticks our cache system and throws out any old data.
pub fn collect_garbage(&self) { pub fn collect_garbage(&self) {
let mut cache_manager = self.cache_manager.write(); let current_size = self.cache_size();
cache_manager.collect_garbage(|| self.cache_size(), | ids | {
let mut traces = self.traces.write(); let mut traces = self.traces.write();
let mut blooms = self.blooms.write(); let mut blooms = self.blooms.write();
let mut cache_manager = self.cache_manager.write();
cache_manager.collect_garbage(current_size, | ids | {
for id in &ids { for id in &ids {
match *id { match *id {
CacheID::Trace(ref h) => { traces.remove(h); }, CacheID::Trace(ref h) => { traces.remove(h); },
@ -187,13 +190,16 @@ impl<T> TraceDB<T> where T: DatabaseExtras {
} }
traces.shrink_to_fit(); traces.shrink_to_fit();
blooms.shrink_to_fit(); blooms.shrink_to_fit();
traces.heap_size_of_children() + blooms.heap_size_of_children()
}); });
} }
/// Returns traces for block with hash. /// Returns traces for block with hash.
fn traces(&self, block_hash: &H256) -> Option<FlatBlockTraces> { fn traces(&self, block_hash: &H256) -> Option<FlatBlockTraces> {
let result = self.tracesdb.read_with_cache(DB_COL_TRACE, &self.traces, block_hash);
self.note_used(CacheID::Trace(block_hash.clone())); self.note_used(CacheID::Trace(block_hash.clone()));
self.tracesdb.read_with_cache(DB_COL_TRACE, &self.traces, block_hash) result
} }
/// Returns vector of transaction traces for given block. /// Returns vector of transaction traces for given block.
@ -264,12 +270,13 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
// at first, let's insert new block traces // at first, let's insert new block traces
{ {
// note_used must be called before locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
let mut traces = self.traces.write(); let mut traces = self.traces.write();
// it's important to use overwrite here, // it's important to use overwrite here,
// cause this value might be queried by hash later // cause this value might be queried by hash later
batch.write_with_cache(DB_COL_TRACE, traces.deref_mut(), request.block_hash, request.traces, CacheUpdatePolicy::Overwrite); batch.write_with_cache(DB_COL_TRACE, traces.deref_mut(), request.block_hash, request.traces, CacheUpdatePolicy::Overwrite);
// note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
} }
// now let's rebuild the blooms // now let's rebuild the blooms
@ -294,12 +301,14 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
.map(|p| (From::from(p.0), From::from(p.1))) .map(|p| (From::from(p.0), From::from(p.1)))
.collect::<HashMap<TraceGroupPosition, blooms::BloomGroup>>(); .collect::<HashMap<TraceGroupPosition, blooms::BloomGroup>>();
// note_used must be called before locking blooms to avoid cache/traces deadlock on garbage collection let blooms_keys: Vec<_> = blooms_to_insert.keys().cloned().collect();
for key in blooms_to_insert.keys() {
self.note_used(CacheID::Bloom(key.clone()));
}
let mut blooms = self.blooms.write(); let mut blooms = self.blooms.write();
batch.extend_with_cache(DB_COL_TRACE, blooms.deref_mut(), blooms_to_insert, CacheUpdatePolicy::Remove); batch.extend_with_cache(DB_COL_TRACE, blooms.deref_mut(), blooms_to_insert, CacheUpdatePolicy::Remove);
// note_used must be called after locking blooms to avoid cache/traces deadlock on garbage collection
for key in blooms_keys.into_iter() {
self.note_used(CacheID::Bloom(key));
}
} }
} }