From c5a0024eeb894504d85d4a27428ab0eda113db18 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 8 Aug 2016 16:14:37 +0200 Subject: [PATCH 1/2] fixed cache_manager lock order --- ethcore/src/blockchain/blockchain.rs | 94 ++++++++++++++++++---------- ethcore/src/cache_manager.rs | 11 ++-- ethcore/src/trace/db.rs | 35 +++++++---- 3 files changed, 89 insertions(+), 51 deletions(-) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index a374c0bf6..b5cd6076c 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -133,8 +133,9 @@ enum CacheID { impl bc::group::BloomGroupDatabase for BlockChain { fn blooms_at(&self, position: &bc::group::GroupPosition) -> Option { let position = LogGroupPosition::from(position.clone()); - self.note_used(CacheID::BlocksBlooms(position.clone())); - self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into) + let result = self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into); + self.note_used(CacheID::BlocksBlooms(position)); + result } } @@ -211,9 +212,7 @@ impl BlockProvider for BlockChain { let opt = self.db.get(DB_COL_HEADERS, hash) .expect("Low level database error. Some issue with disk?"); - self.note_used(CacheID::BlockHeader(hash.clone())); - - match opt { + let result = match opt { Some(b) => { let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec(); let mut write = self.block_headers.write(); @@ -221,7 +220,10 @@ impl BlockProvider for BlockChain { Some(bytes) }, None => None - } + }; + + self.note_used(CacheID::BlockHeader(hash.clone())); + result } /// Get block body data @@ -246,9 +248,7 @@ impl BlockProvider for BlockChain { let opt = self.db.get(DB_COL_BODIES, hash) .expect("Low level database error. Some issue with disk?"); - self.note_used(CacheID::BlockBody(hash.clone())); - - match opt { + let result = match opt { Some(b) => { let bytes: Bytes = UntrustedRlp::new(&b).decompress(RlpType::Blocks).to_vec(); let mut write = self.block_bodies.write(); @@ -256,31 +256,39 @@ impl BlockProvider for BlockChain { Some(bytes) }, None => None - } + }; + + self.note_used(CacheID::BlockBody(hash.clone())); + + result } /// Get the familial details concerning a block. fn block_details(&self, hash: &H256) -> Option { + let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash); self.note_used(CacheID::BlockDetails(hash.clone())); - self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash) + result } /// Get the hash of given block's number. fn block_hash(&self, index: BlockNumber) -> Option { + let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index); self.note_used(CacheID::BlockHashes(index)); - self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index) + result } /// Get the address of transaction with given hash. fn transaction_address(&self, hash: &H256) -> Option { + let result = self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash); self.note_used(CacheID::TransactionAddresses(hash.clone())); - self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash) + result } /// Get receipts of block with given hash. fn block_receipts(&self, hash: &H256) -> Option { + let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash); self.note_used(CacheID::BlockReceipts(hash.clone())); - self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash) + result } /// Returns numbers of blocks containing given bloom. @@ -635,11 +643,12 @@ impl BlockChain { let mut update = HashMap::new(); update.insert(block_hash, parent_details); - self.note_used(CacheID::BlockDetails(block_hash)); let mut write_details = self.block_details.write(); batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); + self.note_used(CacheID::BlockDetails(block_hash)); + self.db.write(batch).unwrap(); } @@ -730,12 +739,14 @@ impl BlockChain { /// Prepares extras update. fn prepare_update(&self, batch: &DBTransaction, update: ExtrasUpdate, is_best: bool) { { - for hash in update.block_details.keys().cloned() { - self.note_used(CacheID::BlockDetails(hash)); - } + let block_hashes: Vec<_> = update.block_details.keys().cloned().collect(); let mut write_details = self.block_details.write(); batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update.block_details, CacheUpdatePolicy::Overwrite); + + for hash in block_hashes.into_iter() { + self.note_used(CacheID::BlockDetails(hash)); + } } { @@ -779,13 +790,6 @@ impl BlockChain { let mut pending_write_hashes = self.pending_block_hashes.write(); let mut pending_write_txs = self.pending_transaction_addresses.write(); - for n in pending_write_hashes.keys() { - self.note_used(CacheID::BlockHashes(*n)); - } - for hash in pending_write_txs.keys() { - self.note_used(CacheID::TransactionAddresses(hash.clone())); - } - let mut best_block = self.best_block.write(); let mut write_hashes = self.block_hashes.write(); let mut write_txs = self.transaction_addresses.write(); @@ -794,8 +798,19 @@ impl BlockChain { *best_block = block; } + let pending_hashes_keys: Vec<_> = pending_write_hashes.keys().cloned().collect(); + let pending_txs_keys: Vec<_> = pending_write_txs.keys().cloned().collect(); + write_hashes.extend(mem::replace(&mut *pending_write_hashes, HashMap::new())); write_txs.extend(mem::replace(&mut *pending_write_txs, HashMap::new())); + + for n in pending_hashes_keys.into_iter() { + self.note_used(CacheID::BlockHashes(n)); + } + + for hash in pending_txs_keys.into_iter() { + self.note_used(CacheID::TransactionAddresses(hash)); + } } /// Iterator that lists `first` and then all of `first`'s ancestors, by hash. @@ -1000,16 +1015,18 @@ impl BlockChain { /// Ticks our cache system and throws out any old data. pub fn collect_garbage(&self) { - let mut cache_man = self.cache_man.write(); - cache_man.collect_garbage(|| self.cache_size().total(), | ids | { - let mut block_headers = self.block_headers.write(); - let mut block_bodies = self.block_bodies.write(); - let mut block_details = self.block_details.write(); - let mut block_hashes = self.block_hashes.write(); - let mut transaction_addresses = self.transaction_addresses.write(); - let mut blocks_blooms = self.blocks_blooms.write(); - let mut block_receipts = self.block_receipts.write(); + let current_size = self.cache_size().total(); + let mut block_headers = self.block_headers.write(); + let mut block_bodies = self.block_bodies.write(); + let mut block_details = self.block_details.write(); + let mut block_hashes = self.block_hashes.write(); + let mut transaction_addresses = self.transaction_addresses.write(); + let mut blocks_blooms = self.blocks_blooms.write(); + let mut block_receipts = self.block_receipts.write(); + + let mut cache_man = self.cache_man.write(); + cache_man.collect_garbage(current_size, | ids | { for id in &ids { match *id { CacheID::BlockHeader(ref h) => { block_headers.remove(h); }, @@ -1021,6 +1038,7 @@ impl BlockChain { CacheID::BlockReceipts(ref h) => { block_receipts.remove(h); } } } + block_headers.shrink_to_fit(); block_bodies.shrink_to_fit(); block_details.shrink_to_fit(); @@ -1028,6 +1046,14 @@ impl BlockChain { transaction_addresses.shrink_to_fit(); blocks_blooms.shrink_to_fit(); block_receipts.shrink_to_fit(); + + block_headers.heap_size_of_children() + + block_bodies.heap_size_of_children() + + block_details.heap_size_of_children() + + block_hashes.heap_size_of_children() + + transaction_addresses.heap_size_of_children() + + blocks_blooms.heap_size_of_children() + + block_receipts.heap_size_of_children() }); } diff --git a/ethcore/src/cache_manager.rs b/ethcore/src/cache_manager.rs index f68e3c616..715f68a57 100644 --- a/ethcore/src/cache_manager.rs +++ b/ethcore/src/cache_manager.rs @@ -45,16 +45,19 @@ impl CacheManager where T: Eq + Hash { } } - pub fn collect_garbage(&mut self, current_size: C, mut notify_unused: F) where C: Fn() -> usize, F: FnMut(HashSet) { - if current_size() < self.pref_cache_size { + /// Collects unused objects from cache. + /// First params is the current size of the cache. + /// Second one is an with objects to remove. It should also return new size of the cache. + pub fn collect_garbage(&mut self, current_size: usize, mut notify_unused: F) where F: FnMut(HashSet) -> usize { + if current_size < self.pref_cache_size { self.rotate_cache_if_needed(); return; } for _ in 0..COLLECTION_QUEUE_SIZE { - notify_unused(self.cache_usage.pop_back().unwrap()); + let current_size = notify_unused(self.cache_usage.pop_back().unwrap()); self.cache_usage.push_front(Default::default()); - if current_size() < self.max_cache_size { + if current_size < self.max_cache_size { break; } } diff --git a/ethcore/src/trace/db.rs b/ethcore/src/trace/db.rs index b03380829..3eb3ea125 100644 --- a/ethcore/src/trace/db.rs +++ b/ethcore/src/trace/db.rs @@ -119,8 +119,9 @@ pub struct TraceDB where T: DatabaseExtras { impl BloomGroupDatabase for TraceDB where T: DatabaseExtras { fn blooms_at(&self, position: &GroupPosition) -> Option { let position = TraceGroupPosition::from(position.clone()); - self.note_used(CacheID::Bloom(position.clone())); - self.tracesdb.read_with_cache(DB_COL_TRACE, &self.blooms, &position).map(Into::into) + let result = self.tracesdb.read_with_cache(DB_COL_TRACE, &self.blooms, &position).map(Into::into); + self.note_used(CacheID::Bloom(position)); + result } } @@ -174,11 +175,13 @@ impl TraceDB where T: DatabaseExtras { /// Ticks our cache system and throws out any old data. pub fn collect_garbage(&self) { - let mut cache_manager = self.cache_manager.write(); - cache_manager.collect_garbage(|| self.cache_size(), | ids | { - let mut traces = self.traces.write(); - let mut blooms = self.blooms.write(); + let current_size = self.cache_size(); + let mut traces = self.traces.write(); + let mut blooms = self.blooms.write(); + let mut cache_manager = self.cache_manager.write(); + + cache_manager.collect_garbage(current_size, | ids | { for id in &ids { match *id { CacheID::Trace(ref h) => { traces.remove(h); }, @@ -187,13 +190,16 @@ impl TraceDB where T: DatabaseExtras { } traces.shrink_to_fit(); blooms.shrink_to_fit(); + + traces.heap_size_of_children() + blooms.heap_size_of_children() }); } /// Returns traces for block with hash. fn traces(&self, block_hash: &H256) -> Option { + let result = self.tracesdb.read_with_cache(DB_COL_TRACE, &self.traces, block_hash); self.note_used(CacheID::Trace(block_hash.clone())); - self.tracesdb.read_with_cache(DB_COL_TRACE, &self.traces, block_hash) + result } /// Returns vector of transaction traces for given block. @@ -264,12 +270,13 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { // at first, let's insert new block traces { - // note_used must be called before locking traces to avoid cache/traces deadlock on garbage collection - self.note_used(CacheID::Trace(request.block_hash.clone())); let mut traces = self.traces.write(); // it's important to use overwrite here, // cause this value might be queried by hash later batch.write_with_cache(DB_COL_TRACE, traces.deref_mut(), request.block_hash, request.traces, CacheUpdatePolicy::Overwrite); + + // note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection + self.note_used(CacheID::Trace(request.block_hash.clone())); } // now let's rebuild the blooms @@ -294,12 +301,14 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { .map(|p| (From::from(p.0), From::from(p.1))) .collect::>(); - // note_used must be called before locking blooms to avoid cache/traces deadlock on garbage collection - for key in blooms_to_insert.keys() { - self.note_used(CacheID::Bloom(key.clone())); - } + let blooms_keys: Vec<_> = blooms_to_insert.keys().cloned().collect(); let mut blooms = self.blooms.write(); batch.extend_with_cache(DB_COL_TRACE, blooms.deref_mut(), blooms_to_insert, CacheUpdatePolicy::Remove); + + // note_used must be called after locking blooms to avoid cache/traces deadlock on garbage collection + for key in blooms_keys.into_iter() { + self.note_used(CacheID::Bloom(key)); + } } } From 9732da5101e0dfc9829d10fc2ab62cdc3ff1b346 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 8 Aug 2016 16:25:48 +0200 Subject: [PATCH 2/2] replace cache manager with mutex; reduce lock hold times --- ethcore/src/blockchain/blockchain.rs | 42 +++++++++++++--------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index b5cd6076c..6fb79632c 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -134,7 +134,7 @@ impl bc::group::BloomGroupDatabase for BlockChain { fn blooms_at(&self, position: &bc::group::GroupPosition) -> Option { let position = LogGroupPosition::from(position.clone()); let result = self.db.read_with_cache(DB_COL_EXTRA, &self.blocks_blooms, &position).map(Into::into); - self.note_used(CacheID::BlocksBlooms(position)); + self.cache_man.lock().note_used(CacheID::BlocksBlooms(position)); result } } @@ -161,7 +161,7 @@ pub struct BlockChain { db: Arc, - cache_man: RwLock>, + cache_man: Mutex>, pending_best_block: RwLock>, pending_block_hashes: RwLock>, @@ -222,7 +222,7 @@ impl BlockProvider for BlockChain { None => None }; - self.note_used(CacheID::BlockHeader(hash.clone())); + self.cache_man.lock().note_used(CacheID::BlockHeader(hash.clone())); result } @@ -258,7 +258,7 @@ impl BlockProvider for BlockChain { None => None }; - self.note_used(CacheID::BlockBody(hash.clone())); + self.cache_man.lock().note_used(CacheID::BlockBody(hash.clone())); result } @@ -266,28 +266,28 @@ impl BlockProvider for BlockChain { /// Get the familial details concerning a block. fn block_details(&self, hash: &H256) -> Option { let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_details, hash); - self.note_used(CacheID::BlockDetails(hash.clone())); + self.cache_man.lock().note_used(CacheID::BlockDetails(hash.clone())); result } /// Get the hash of given block's number. fn block_hash(&self, index: BlockNumber) -> Option { let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_hashes, &index); - self.note_used(CacheID::BlockHashes(index)); + self.cache_man.lock().note_used(CacheID::BlockHashes(index)); result } /// Get the address of transaction with given hash. fn transaction_address(&self, hash: &H256) -> Option { let result = self.db.read_with_cache(DB_COL_EXTRA, &self.transaction_addresses, hash); - self.note_used(CacheID::TransactionAddresses(hash.clone())); + self.cache_man.lock().note_used(CacheID::TransactionAddresses(hash.clone())); result } /// Get receipts of block with given hash. fn block_receipts(&self, hash: &H256) -> Option { let result = self.db.read_with_cache(DB_COL_EXTRA, &self.block_receipts, hash); - self.note_used(CacheID::BlockReceipts(hash.clone())); + self.cache_man.lock().note_used(CacheID::BlockReceipts(hash.clone())); result } @@ -340,7 +340,7 @@ impl BlockChain { blocks_blooms: RwLock::new(HashMap::new()), block_receipts: RwLock::new(HashMap::new()), db: db.clone(), - cache_man: RwLock::new(cache_man), + cache_man: Mutex::new(cache_man), pending_best_block: RwLock::new(None), pending_block_hashes: RwLock::new(HashMap::new()), pending_transaction_addresses: RwLock::new(HashMap::new()), @@ -647,7 +647,7 @@ impl BlockChain { let mut write_details = self.block_details.write(); batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update, CacheUpdatePolicy::Overwrite); - self.note_used(CacheID::BlockDetails(block_hash)); + self.cache_man.lock().note_used(CacheID::BlockDetails(block_hash)); self.db.write(batch).unwrap(); } @@ -744,8 +744,9 @@ impl BlockChain { let mut write_details = self.block_details.write(); batch.extend_with_cache(DB_COL_EXTRA, &mut *write_details, update.block_details, CacheUpdatePolicy::Overwrite); - for hash in block_hashes.into_iter() { - self.note_used(CacheID::BlockDetails(hash)); + let mut cache_man = self.cache_man.lock(); + for hash in block_hashes { + cache_man.note_used(CacheID::BlockDetails(hash)); } } @@ -804,12 +805,13 @@ impl BlockChain { write_hashes.extend(mem::replace(&mut *pending_write_hashes, HashMap::new())); write_txs.extend(mem::replace(&mut *pending_write_txs, HashMap::new())); - for n in pending_hashes_keys.into_iter() { - self.note_used(CacheID::BlockHashes(n)); + let mut cache_man = self.cache_man.lock(); + for n in pending_hashes_keys { + cache_man.note_used(CacheID::BlockHashes(n)); } - for hash in pending_txs_keys.into_iter() { - self.note_used(CacheID::TransactionAddresses(hash)); + for hash in pending_txs_keys { + cache_man.note_used(CacheID::TransactionAddresses(hash)); } } @@ -1007,12 +1009,6 @@ impl BlockChain { } } - /// Let the cache system know that a cacheable item has been used. - fn note_used(&self, id: CacheID) { - let mut cache_man = self.cache_man.write(); - cache_man.note_used(id); - } - /// Ticks our cache system and throws out any old data. pub fn collect_garbage(&self) { let current_size = self.cache_size().total(); @@ -1025,7 +1021,7 @@ impl BlockChain { let mut blocks_blooms = self.blocks_blooms.write(); let mut block_receipts = self.block_receipts.write(); - let mut cache_man = self.cache_man.write(); + let mut cache_man = self.cache_man.lock(); cache_man.collect_garbage(current_size, | ids | { for id in &ids { match *id {