Merge pull request #1877 from ethcore/cache_manager_order

fixed cache_manager lock order
This commit is contained in:
Marek Kotewicz 2016-08-09 14:12:58 +02:00 committed by GitHub
commit 505a054d10
3 changed files with 97 additions and 63 deletions

View File

@ -133,8 +133,9 @@ enum CacheID {
impl bc::group::BloomGroupDatabase for BlockChain {
fn blooms_at(&self, position: &bc::group::GroupPosition) -> Option<bc::group::BloomGroup> {
let position = LogGroupPosition::from(position.clone());
self.note_used(CacheID::BlocksBlooms(position.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into)
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into);
self.cache_man.lock().note_used(CacheID::BlocksBlooms(position));
result
}
}
@ -160,7 +161,7 @@ pub struct BlockChain {
db: Arc<Database>,
cache_man: RwLock<CacheManager<CacheID>>,
cache_man: Mutex<CacheManager<CacheID>>,
pending_best_block: RwLock<Option<BestBlock>>,
pending_block_hashes: RwLock<HashMap<BlockNumber, H256>>,
@ -211,9 +212,7 @@ impl BlockProvider for BlockChain {
let opt = self.db.get(DB_COL_HEADERS, hash)
.expect("Low level database error. Some issue with disk?");
self.note_used(CacheID::BlockHeader(hash.clone()));
match opt {
let result = match opt {
Some(b) => {
let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec();
let mut write = self.block_headers.write();
@ -221,7 +220,10 @@ impl BlockProvider for BlockChain {
Some(bytes)
},
None => None
}
};
self.cache_man.lock().note_used(CacheID::BlockHeader(hash.clone()));
result
}
/// Get block body data
@ -246,9 +248,7 @@ impl BlockProvider for BlockChain {
let opt = self.db.get(DB_COL_BODIES, hash)
.expect("Low level database error. Some issue with disk?");
self.note_used(CacheID::BlockBody(hash.clone()));
match opt {
let result = match opt {
Some(b) => {
let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec();
let mut write = self.block_bodies.write();
@ -256,31 +256,39 @@ impl BlockProvider for BlockChain {
Some(bytes)
},
None => None
}
};
self.cache_man.lock().note_used(CacheID::BlockBody(hash.clone()));
result
}
/// Get the familial details concerning a block.
fn block_details(&self, hash: &H256) -> Option<BlockDetails> {
self.note_used(CacheID::BlockDetails(hash.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash)
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash);
self.cache_man.lock().note_used(CacheID::BlockDetails(hash.clone()));
result
}
/// Get the hash of given block's number.
fn block_hash(&self, index: BlockNumber) -> Option<H256> {
self.note_used(CacheID::BlockHashes(index));
self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index)
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index);
self.cache_man.lock().note_used(CacheID::BlockHashes(index));
result
}
/// Get the address of transaction with given hash.
fn transaction_address(&self, hash: &H256) -> Option<TransactionAddress> {
self.note_used(CacheID::TransactionAddresses(hash.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash)
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash);
self.cache_man.lock().note_used(CacheID::TransactionAddresses(hash.clone()));
result
}
/// Get receipts of block with given hash.
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> {
self.note_used(CacheID::BlockReceipts(hash.clone()));
self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash)
let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash);
self.cache_man.lock().note_used(CacheID::BlockReceipts(hash.clone()));
result
}
/// Returns numbers of blocks containing given bloom.
@ -332,7 +340,7 @@ impl BlockChain {
blocks_blooms: RwLock::new(HashMap::new()),
block_receipts: RwLock::new(HashMap::new()),
db: db.clone(),
cache_man: RwLock::new(cache_man),
cache_man: Mutex::new(cache_man),
pending_best_block: RwLock::new(None),
pending_block_hashes: RwLock::new(HashMap::new()),
pending_transaction_addresses: RwLock::new(HashMap::new()),
@ -635,11 +643,12 @@ impl BlockChain {
let mut update = HashMap::new();
update.insert(block_hash, parent_details);
self.note_used(CacheID::BlockDetails(block_hash));
let mut write_details = self.block_details.write();
batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite);
self.cache_man.lock().note_used(CacheID::BlockDetails(block_hash));
self.db.write(batch).unwrap();
}
@ -730,12 +739,15 @@ impl BlockChain {
/// Prepares extras update.
fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate, is_best: bool) {
{
for hash in update.block_details.keys().cloned() {
self.note_used(CacheID::BlockDetails(hash));
}
let block_hashes: Vec<_> = update.block_details.keys().cloned().collect();
let mut write_details = self.block_details.write();
batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update.block_details, CacheUpdatePolicy::Overwrite);
let mut cache_man = self.cache_man.lock();
for hash in block_hashes {
cache_man.note_used(CacheID::BlockDetails(hash));
}
}
{
@ -779,13 +791,6 @@ impl BlockChain {
let mut pending_write_hashes = self.pending_block_hashes.write();
let mut pending_write_txs = self.pending_transaction_addresses.write();
for n in pending_write_hashes.keys() {
self.note_used(CacheID::BlockHashes(*n));
}
for hash in pending_write_txs.keys() {
self.note_used(CacheID::TransactionAddresses(hash.clone()));
}
let mut best_block = self.best_block.write();
let mut write_hashes = self.block_hashes.write();
let mut write_txs = self.transaction_addresses.write();
@ -794,8 +799,20 @@ impl BlockChain {
*best_block = block;
}
let pending_hashes_keys: Vec<_> = pending_write_hashes.keys().cloned().collect();
let pending_txs_keys: Vec<_> = pending_write_txs.keys().cloned().collect();
write_hashes.extend(mem::replace(&mut *pending_write_hashes, HashMap::new()));
write_txs.extend(mem::replace(&mut *pending_write_txs, HashMap::new()));
let mut cache_man = self.cache_man.lock();
for n in pending_hashes_keys {
cache_man.note_used(CacheID::BlockHashes(n));
}
for hash in pending_txs_keys {
cache_man.note_used(CacheID::TransactionAddresses(hash));
}
}
/// Iterator that lists `first` and then all of `first`'s ancestors, by hash.
@ -992,16 +1009,10 @@ impl BlockChain {
}
}
/// Let the cache system know that a cacheable item has been used.
fn note_used(&self, id: CacheID) {
let mut cache_man = self.cache_man.write();
cache_man.note_used(id);
}
/// Ticks our cache system and throws out any old data.
pub fn collect_garbage(&self) {
let mut cache_man = self.cache_man.write();
cache_man.collect_garbage(|| self.cache_size().total(), | ids | {
let current_size = self.cache_size().total();
let mut block_headers = self.block_headers.write();
let mut block_bodies = self.block_bodies.write();
let mut block_details = self.block_details.write();
@ -1010,6 +1021,8 @@ impl BlockChain {
let mut blocks_blooms = self.blocks_blooms.write();
let mut block_receipts = self.block_receipts.write();
let mut cache_man = self.cache_man.lock();
cache_man.collect_garbage(current_size, | ids | {
for id in &ids {
match *id {
CacheID::BlockHeader(ref h) => { block_headers.remove(h); },
@ -1021,6 +1034,7 @@ impl BlockChain {
CacheID::BlockReceipts(ref h) => { block_receipts.remove(h); }
}
}
block_headers.shrink_to_fit();
block_bodies.shrink_to_fit();
block_details.shrink_to_fit();
@ -1028,6 +1042,14 @@ impl BlockChain {
transaction_addresses.shrink_to_fit();
blocks_blooms.shrink_to_fit();
block_receipts.shrink_to_fit();
block_headers.heap_size_of_children() +
block_bodies.heap_size_of_children() +
block_details.heap_size_of_children() +
block_hashes.heap_size_of_children() +
transaction_addresses.heap_size_of_children() +
blocks_blooms.heap_size_of_children() +
block_receipts.heap_size_of_children()
});
}

View File

@ -45,16 +45,19 @@ impl<T> CacheManager<T> where T: Eq + Hash {
}
}
pub fn collect_garbage<C, F>(&mut self, current_size: C, mut notify_unused: F) where C: Fn() -> usize, F: FnMut(HashSet<T>) {
if current_size() < self.pref_cache_size {
/// Collects unused objects from cache.
/// First params is the current size of the cache.
/// Second one is an with objects to remove. It should also return new size of the cache.
pub fn collect_garbage<F>(&mut self, current_size: usize, mut notify_unused: F) where F: FnMut(HashSet<T>) -> usize {
if current_size < self.pref_cache_size {
self.rotate_cache_if_needed();
return;
}
for _ in 0..COLLECTION_QUEUE_SIZE {
notify_unused(self.cache_usage.pop_back().unwrap());
let current_size = notify_unused(self.cache_usage.pop_back().unwrap());
self.cache_usage.push_front(Default::default());
if current_size() < self.max_cache_size {
if current_size < self.max_cache_size {
break;
}
}

View File

@ -119,8 +119,9 @@ pub struct TraceDB<T> where T: DatabaseExtras {
impl<T> BloomGroupDatabase for TraceDB<T> where T: DatabaseExtras {
fn blooms_at(&self, position: &GroupPosition) -> Option<BloomGroup> {
let position = TraceGroupPosition::from(position.clone());
self.note_used(CacheID::Bloom(position.clone()));
self.tracesdb.read_with_cache(DB_COL_TRACE, &self.blooms, &position).map(Into::into)
let result = self.tracesdb.read_with_cache(DB_COL_TRACE, &self.blooms, &position).map(Into::into);
self.note_used(CacheID::Bloom(position));
result
}
}
@ -174,11 +175,13 @@ impl<T> TraceDB<T> where T: DatabaseExtras {
/// Ticks our cache system and throws out any old data.
pub fn collect_garbage(&self) {
let mut cache_manager = self.cache_manager.write();
cache_manager.collect_garbage(|| self.cache_size(), | ids | {
let current_size = self.cache_size();
let mut traces = self.traces.write();
let mut blooms = self.blooms.write();
let mut cache_manager = self.cache_manager.write();
cache_manager.collect_garbage(current_size, | ids | {
for id in &ids {
match *id {
CacheID::Trace(ref h) => { traces.remove(h); },
@ -187,13 +190,16 @@ impl<T> TraceDB<T> where T: DatabaseExtras {
}
traces.shrink_to_fit();
blooms.shrink_to_fit();
traces.heap_size_of_children() + blooms.heap_size_of_children()
});
}
/// Returns traces for block with hash.
fn traces(&self, block_hash: &H256) -> Option<FlatBlockTraces> {
let result = self.tracesdb.read_with_cache(DB_COL_TRACE, &self.traces, block_hash);
self.note_used(CacheID::Trace(block_hash.clone()));
self.tracesdb.read_with_cache(DB_COL_TRACE, &self.traces, block_hash)
result
}
/// Returns vector of transaction traces for given block.
@ -264,12 +270,13 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
// at first, let's insert new block traces
{
// note_used must be called before locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
let mut traces = self.traces.write();
// it's important to use overwrite here,
// cause this value might be queried by hash later
batch.write_with_cache(DB_COL_TRACE, traces.deref_mut(), request.block_hash, request.traces, CacheUpdatePolicy::Overwrite);
// note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
}
// now let's rebuild the blooms
@ -294,12 +301,14 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
.map(|p| (From::from(p.0), From::from(p.1)))
.collect::<HashMap<TraceGroupPosition, blooms::BloomGroup>>();
// note_used must be called before locking blooms to avoid cache/traces deadlock on garbage collection
for key in blooms_to_insert.keys() {
self.note_used(CacheID::Bloom(key.clone()));
}
let blooms_keys: Vec<_> = blooms_to_insert.keys().cloned().collect();
let mut blooms = self.blooms.write();
batch.extend_with_cache(DB_COL_TRACE, blooms.deref_mut(), blooms_to_insert, CacheUpdatePolicy::Remove);
// note_used must be called after locking blooms to avoid cache/traces deadlock on garbage collection
for key in blooms_keys.into_iter() {
self.note_used(CacheID::Bloom(key));
}
}
}