diff --git a/Cargo.toml b/Cargo.toml index 22d0f9288..b572749c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,3 +44,7 @@ travis-nightly = ["ethcore/json-tests", "dev-clippy", "dev"] [[bin]] path = "parity/main.rs" name = "parity" + +[profile.release] +debug = false +lto = false diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 3dfb98e8a..4e335f705 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -95,7 +95,7 @@ pub struct BlockQueue { panic_handler: Arc, engine: Arc>, more_to_verify: Arc, - verification: Arc>, + verification: Arc, verifiers: Vec>, deleting: Arc, ready_signal: Arc, @@ -132,18 +132,23 @@ impl QueueSignal { } } -#[derive(Default)] struct Verification { - unverified: VecDeque, - verified: VecDeque, - verifying: VecDeque, - bad: HashSet, + // All locks must be captured in the order declared here. + unverified: Mutex>, + verified: Mutex>, + verifying: Mutex>, + bad: Mutex>, } impl BlockQueue { /// Creates a new queue instance. pub fn new(config: BlockQueueConfig, engine: Arc>, message_channel: IoChannel) -> BlockQueue { - let verification = Arc::new(Mutex::new(Verification::default())); + let verification = Arc::new(Verification { + unverified: Mutex::new(VecDeque::new()), + verified: Mutex::new(VecDeque::new()), + verifying: Mutex::new(VecDeque::new()), + bad: Mutex::new(HashSet::new()), + }); let more_to_verify = Arc::new(Condvar::new()); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); @@ -186,17 +191,17 @@ impl BlockQueue { } } - fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { - let mut lock = verification.lock().unwrap(); + let mut unverified = verification.unverified.lock().unwrap(); - if lock.unverified.is_empty() && lock.verifying.is_empty() { + if unverified.is_empty() && verification.verifying.lock().unwrap().is_empty() { empty.notify_all(); } - while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { - lock = wait.wait(lock).unwrap(); + while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { + unverified = wait.wait(unverified).unwrap(); } if deleting.load(AtomicOrdering::Acquire) { @@ -205,39 +210,42 @@ impl BlockQueue { } let block = { - let mut v = verification.lock().unwrap(); - if v.unverified.is_empty() { + let mut unverified = verification.unverified.lock().unwrap(); + if unverified.is_empty() { continue; } - let block = v.unverified.pop_front().unwrap(); - v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); + let mut verifying = verification.verifying.lock().unwrap(); + let block = unverified.pop_front().unwrap(); + verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); block }; let block_hash = block.header.hash(); match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { Ok(verified) => { - let mut v = verification.lock().unwrap(); - for e in &mut v.verifying { + let mut verifying = verification.verifying.lock().unwrap(); + for e in verifying.iter_mut() { if e.hash == block_hash { e.block = Some(verified); break; } } - if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash { + if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash { // we're next! - let mut vref = v.deref_mut(); - BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + let mut verified = verification.verified.lock().unwrap(); + let mut bad = verification.bad.lock().unwrap(); + BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } }, Err(err) => { - let mut v = verification.lock().unwrap(); + let mut verifying = verification.verifying.lock().unwrap(); + let mut verified = verification.verified.lock().unwrap(); + let mut bad = verification.bad.lock().unwrap(); warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); - v.bad.insert(block_hash.clone()); - v.verifying.retain(|e| e.hash != block_hash); - let mut vref = v.deref_mut(); - BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + bad.insert(block_hash.clone()); + verifying.retain(|e| e.hash != block_hash); + BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } } @@ -257,19 +265,21 @@ impl BlockQueue { } /// Clear the queue and stop verification activity. - pub fn clear(&mut self) { - let mut verification = self.verification.lock().unwrap(); - verification.unverified.clear(); - verification.verifying.clear(); - verification.verified.clear(); + pub fn clear(&self) { + let mut unverified = self.verification.unverified.lock().unwrap(); + let mut verifying = self.verification.verifying.lock().unwrap(); + let mut verified = self.verification.verified.lock().unwrap(); + unverified.clear(); + verifying.clear(); + verified.clear(); self.processing.write().unwrap().clear(); } - /// Wait for queue to be empty - pub fn flush(&mut self) { - let mut verification = self.verification.lock().unwrap(); - while !verification.unverified.is_empty() || !verification.verifying.is_empty() { - verification = self.empty.wait(verification).unwrap(); + /// Wait for unverified queue to be empty + pub fn flush(&self) { + let mut unverified = self.verification.unverified.lock().unwrap(); + while !unverified.is_empty() || !self.verification.verifying.lock().unwrap().is_empty() { + unverified = self.empty.wait(unverified).unwrap(); } } @@ -278,27 +288,28 @@ impl BlockQueue { if self.processing.read().unwrap().contains(&hash) { return BlockStatus::Queued; } - if self.verification.lock().unwrap().bad.contains(&hash) { + if self.verification.bad.lock().unwrap().contains(&hash) { return BlockStatus::Bad; } BlockStatus::Unknown } /// Add a block to the queue. - pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { + pub fn import_block(&self, bytes: Bytes) -> ImportResult { let header = BlockView::new(&bytes).header(); let h = header.hash(); - if self.processing.read().unwrap().contains(&h) { - return Err(x!(ImportError::AlreadyQueued)); - } { - let mut verification = self.verification.lock().unwrap(); - if verification.bad.contains(&h) { + if self.processing.read().unwrap().contains(&h) { + return Err(x!(ImportError::AlreadyQueued)); + } + + let mut bad = self.verification.bad.lock().unwrap(); + if bad.contains(&h) { return Err(x!(ImportError::KnownBad)); } - if verification.bad.contains(&header.parent_hash) { - verification.bad.insert(h.clone()); + if bad.contains(&header.parent_hash) { + bad.insert(h.clone()); return Err(x!(ImportError::KnownBad)); } } @@ -306,48 +317,47 @@ impl BlockQueue { match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { Ok(()) => { self.processing.write().unwrap().insert(h.clone()); - self.verification.lock().unwrap().unverified.push_back(UnverifiedBlock { header: header, bytes: bytes }); + self.verification.unverified.lock().unwrap().push_back(UnverifiedBlock { header: header, bytes: bytes }); self.more_to_verify.notify_all(); Ok(h) }, Err(err) => { warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); - self.verification.lock().unwrap().bad.insert(h.clone()); + self.verification.bad.lock().unwrap().insert(h.clone()); Err(err) } } } /// Mark given block and all its children as bad. Stops verification. - pub fn mark_as_bad(&mut self, block_hashes: &[H256]) { + pub fn mark_as_bad(&self, block_hashes: &[H256]) { if block_hashes.is_empty() { return; } - let mut verification_lock = self.verification.lock().unwrap(); + let mut verified_lock = self.verification.verified.lock().unwrap(); + let mut verified = verified_lock.deref_mut(); + let mut bad = self.verification.bad.lock().unwrap(); let mut processing = self.processing.write().unwrap(); - - let mut verification = verification_lock.deref_mut(); - - verification.bad.reserve(block_hashes.len()); + bad.reserve(block_hashes.len()); for hash in block_hashes { - verification.bad.insert(hash.clone()); + bad.insert(hash.clone()); processing.remove(&hash); } let mut new_verified = VecDeque::new(); - for block in verification.verified.drain(..) { - if verification.bad.contains(&block.header.parent_hash) { - verification.bad.insert(block.header.hash()); + for block in verified.drain(..) { + if bad.contains(&block.header.parent_hash) { + bad.insert(block.header.hash()); processing.remove(&block.header.hash()); } else { new_verified.push_back(block); } } - verification.verified = new_verified; + *verified = new_verified; } /// Mark given block as processed - pub fn mark_as_good(&mut self, block_hashes: &[H256]) { + pub fn mark_as_good(&self, block_hashes: &[H256]) { if block_hashes.is_empty() { return; } @@ -358,16 +368,16 @@ impl BlockQueue { } /// Removes up to `max` verified blocks from the queue - pub fn drain(&mut self, max: usize) -> Vec { - let mut verification = self.verification.lock().unwrap(); - let count = min(max, verification.verified.len()); + pub fn drain(&self, max: usize) -> Vec { + let mut verified = self.verification.verified.lock().unwrap(); + let count = min(max, verified.len()); let mut result = Vec::with_capacity(count); for _ in 0..count { - let block = verification.verified.pop_front().unwrap(); + let block = verified.pop_front().unwrap(); result.push(block); } self.ready_signal.reset(); - if !verification.verified.is_empty() { + if !verified.is_empty() { self.ready_signal.set(); } result @@ -375,17 +385,28 @@ impl BlockQueue { /// Get queue status. pub fn queue_info(&self) -> BlockQueueInfo { - let verification = self.verification.lock().unwrap(); + let (unverified_len, unverified_bytes) = { + let v = self.verification.unverified.lock().unwrap(); + (v.len(), v.heap_size_of_children()) + }; + let (verifying_len, verifying_bytes) = { + let v = self.verification.verifying.lock().unwrap(); + (v.len(), v.heap_size_of_children()) + }; + let (verified_len, verified_bytes) = { + let v = self.verification.verified.lock().unwrap(); + (v.len(), v.heap_size_of_children()) + }; BlockQueueInfo { - verified_queue_size: verification.verified.len(), - unverified_queue_size: verification.unverified.len(), - verifying_queue_size: verification.verifying.len(), + unverified_queue_size: unverified_len, + verifying_queue_size: verifying_len, + verified_queue_size: verified_len, max_queue_size: self.max_queue_size, max_mem_use: self.max_mem_use, mem_used: - verification.unverified.heap_size_of_children() - + verification.verifying.heap_size_of_children() - + verification.verified.heap_size_of_children(), + unverified_bytes + + verifying_bytes + + verified_bytes // TODO: https://github.com/servo/heapsize/pull/50 //+ self.processing.read().unwrap().heap_size_of_children(), } @@ -393,10 +414,9 @@ impl BlockQueue { pub fn collect_garbage(&self) { { - let mut verification = self.verification.lock().unwrap(); - verification.unverified.shrink_to_fit(); - verification.verifying.shrink_to_fit(); - verification.verified.shrink_to_fit(); + self.verification.unverified.lock().unwrap().shrink_to_fit(); + self.verification.verifying.lock().unwrap().shrink_to_fit(); + self.verification.verified.lock().unwrap().shrink_to_fit(); } self.processing.write().unwrap().shrink_to_fit(); } @@ -444,7 +464,7 @@ mod tests { #[test] fn can_import_blocks() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); if let Err(e) = queue.import_block(get_good_dummy_block()) { panic!("error importing block that is valid by definition({:?})", e); } @@ -452,7 +472,7 @@ mod tests { #[test] fn returns_error_for_duplicates() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); if let Err(e) = queue.import_block(get_good_dummy_block()) { panic!("error importing block that is valid by definition({:?})", e); } @@ -471,7 +491,7 @@ mod tests { #[test] fn returns_ok_for_drained_duplicates() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); let block = get_good_dummy_block(); let hash = BlockView::new(&block).header().hash().clone(); if let Err(e) = queue.import_block(block) { @@ -488,7 +508,7 @@ mod tests { #[test] fn returns_empty_once_finished() { - let mut queue = get_test_queue(); + let queue = get_test_queue(); queue.import_block(get_good_dummy_block()).expect("error importing block that is valid by definition"); queue.flush(); queue.drain(1); diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 8c21532c8..4ebd111d9 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -16,6 +16,7 @@ //! Blockchain database. +use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder}; use util::*; use header::*; use extras::*; @@ -134,8 +135,9 @@ struct CacheManager { /// /// **Does not do input data verification.** pub struct BlockChain { - pref_cache_size: usize, - max_cache_size: usize, + // All locks must be captured in the order declared here. + pref_cache_size: AtomicUsize, + max_cache_size: AtomicUsize, best_block: RwLock, @@ -157,6 +159,8 @@ pub struct BlockChain { // blooms indexing bloom_indexer: BloomIndexer, + + insert_lock: Mutex<()> } impl FilterDataSource for BlockChain { @@ -262,8 +266,8 @@ impl BlockChain { (0..COLLECTION_QUEUE_SIZE).foreach(|_| cache_man.cache_usage.push_back(HashSet::new())); let bc = BlockChain { - pref_cache_size: config.pref_cache_size, - max_cache_size: config.max_cache_size, + pref_cache_size: AtomicUsize::new(config.pref_cache_size), + max_cache_size: AtomicUsize::new(config.max_cache_size), best_block: RwLock::new(BestBlock::default()), blocks: RwLock::new(HashMap::new()), block_details: RwLock::new(HashMap::new()), @@ -275,7 +279,8 @@ impl BlockChain { extras_db: extras_db, blocks_db: blocks_db, cache_man: RwLock::new(cache_man), - bloom_indexer: BloomIndexer::new(BLOOM_INDEX_SIZE, BLOOM_LEVELS) + bloom_indexer: BloomIndexer::new(BLOOM_INDEX_SIZE, BLOOM_LEVELS), + insert_lock: Mutex::new(()), }; // load best block @@ -318,9 +323,9 @@ impl BlockChain { } /// Set the cache configuration. - pub fn configure_cache(&mut self, pref_cache_size: usize, max_cache_size: usize) { - self.pref_cache_size = pref_cache_size; - self.max_cache_size = max_cache_size; + pub fn configure_cache(&self, pref_cache_size: usize, max_cache_size: usize) { + self.pref_cache_size.store(pref_cache_size, AtomicOrder::Relaxed); + self.max_cache_size.store(max_cache_size, AtomicOrder::Relaxed); } /// Returns a tree route between `from` and `to`, which is a tuple of: @@ -424,6 +429,7 @@ impl BlockChain { return ImportRoute::none(); } + let _lock = self.insert_lock.lock(); // store block in db self.blocks_db.put(&hash, &bytes).unwrap(); @@ -446,48 +452,58 @@ impl BlockChain { let batch = DBTransaction::new(); batch.put(b"best", &update.info.hash).unwrap(); - // update best block - let mut best_block = self.best_block.write().unwrap(); - match update.info.location { - BlockLocation::Branch => (), - _ => { - *best_block = BestBlock { - hash: update.info.hash, - number: update.info.number, - total_difficulty: update.info.total_difficulty - }; + { + let mut write_details = self.block_details.write().unwrap(); + for (hash, details) in update.block_details.into_iter() { + batch.put_extras(&hash, &details); + self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash.clone())); + write_details.insert(hash, details); } } - let mut write_hashes = self.block_hashes.write().unwrap(); - for (number, hash) in &update.block_hashes { - batch.put_extras(number, hash); - write_hashes.remove(number); + { + let mut write_receipts = self.block_receipts.write().unwrap(); + for (hash, receipt) in &update.block_receipts { + batch.put_extras(hash, receipt); + write_receipts.remove(hash); + } } - let mut write_details = self.block_details.write().unwrap(); - for (hash, details) in update.block_details.into_iter() { - batch.put_extras(&hash, &details); - write_details.insert(hash.clone(), details); - self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash)); + { + let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); + for (bloom_hash, blocks_bloom) in &update.blocks_blooms { + batch.put_extras(bloom_hash, blocks_bloom); + write_blocks_blooms.remove(bloom_hash); + } } - let mut write_receipts = self.block_receipts.write().unwrap(); - for (hash, receipt) in &update.block_receipts { - batch.put_extras(hash, receipt); - write_receipts.remove(hash); - } + // These cached values must be updated last and togeterh + { + let mut best_block = self.best_block.write().unwrap(); + let mut write_hashes = self.block_hashes.write().unwrap(); + let mut write_txs = self.transaction_addresses.write().unwrap(); - let mut write_txs = self.transaction_addresses.write().unwrap(); - for (hash, tx_address) in &update.transactions_addresses { - batch.put_extras(hash, tx_address); - write_txs.remove(hash); - } + // update best block + match update.info.location { + BlockLocation::Branch => (), + _ => { + *best_block = BestBlock { + hash: update.info.hash, + number: update.info.number, + total_difficulty: update.info.total_difficulty + }; + } + } - let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); - for (bloom_hash, blocks_bloom) in &update.blocks_blooms { - batch.put_extras(bloom_hash, blocks_bloom); - write_blocks_blooms.remove(bloom_hash); + for (number, hash) in &update.block_hashes { + batch.put_extras(number, hash); + write_hashes.remove(number); + } + + for (hash, tx_address) in &update.transactions_addresses { + batch.put_extras(hash, tx_address); + write_txs.remove(hash); + } } // update extras database @@ -781,11 +797,10 @@ impl BlockChain { /// Ticks our cache system and throws out any old data. pub fn collect_garbage(&self) { - if self.cache_size().total() < self.pref_cache_size { return; } + if self.cache_size().total() < self.pref_cache_size.load(AtomicOrder::Relaxed) { return; } for _ in 0..COLLECTION_QUEUE_SIZE { { - let mut cache_man = self.cache_man.write().unwrap(); let mut blocks = self.blocks.write().unwrap(); let mut block_details = self.block_details.write().unwrap(); let mut block_hashes = self.block_hashes.write().unwrap(); @@ -793,6 +808,7 @@ impl BlockChain { let mut block_logs = self.block_logs.write().unwrap(); let mut blocks_blooms = self.blocks_blooms.write().unwrap(); let mut block_receipts = self.block_receipts.write().unwrap(); + let mut cache_man = self.cache_man.write().unwrap(); for id in cache_man.cache_usage.pop_back().unwrap().into_iter() { cache_man.in_use.remove(&id); @@ -819,7 +835,7 @@ impl BlockChain { blocks_blooms.shrink_to_fit(); block_receipts.shrink_to_fit(); } - if self.cache_size().total() < self.max_cache_size { break; } + if self.cache_size().total() < self.max_cache_size.load(AtomicOrder::Relaxed) { break; } } // TODO: m_lastCollection = chrono::system_clock::now(); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 40f86f7a2..a4336111d 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -100,10 +100,10 @@ impl ClientReport { /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. /// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue. pub struct Client where V: Verifier { - chain: Arc>, + chain: Arc, engine: Arc>, state_db: Mutex, - block_queue: RwLock, + block_queue: BlockQueue, report: RwLock, import_lock: Mutex<()>, panic_handler: Arc, @@ -136,7 +136,7 @@ impl Client where V: Verifier { dir.push(format!("v{}-sec-{}", CLIENT_DB_VER_STR, if config.prefer_journal { "pruned" } else { "archive" })); let path = dir.as_path(); let gb = spec.genesis_block(); - let chain = Arc::new(RwLock::new(BlockChain::new(config.blockchain, &gb, path))); + let chain = Arc::new(BlockChain::new(config.blockchain, &gb, path)); let mut state_path = path.to_path_buf(); state_path.push("state"); @@ -157,7 +157,7 @@ impl Client where V: Verifier { chain: chain, engine: engine, state_db: Mutex::new(state_db), - block_queue: RwLock::new(block_queue), + block_queue: block_queue, report: RwLock::new(Default::default()), import_lock: Mutex::new(()), panic_handler: panic_handler, @@ -172,16 +172,15 @@ impl Client where V: Verifier { /// Flush the block import queue. pub fn flush_queue(&self) { - self.block_queue.write().unwrap().flush(); + self.block_queue.flush(); } fn build_last_hashes(&self, parent_hash: H256) -> LastHashes { let mut last_hashes = LastHashes::new(); last_hashes.resize(256, H256::new()); last_hashes[0] = parent_hash; - let chain = self.chain.read().unwrap(); for i in 0..255 { - match chain.block_details(&last_hashes[i]) { + match self.chain.block_details(&last_hashes[i]) { Some(details) => { last_hashes[i + 1] = details.parent.clone(); }, @@ -201,21 +200,21 @@ impl Client where V: Verifier { let header = &block.header; // Check the block isn't so old we won't be able to enact it. - let best_block_number = self.chain.read().unwrap().best_block_number(); + let best_block_number = self.chain.best_block_number(); if best_block_number >= HISTORY && header.number() <= best_block_number - HISTORY { warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number); return Err(()); } // Verify Block Family - let verify_family_result = V::verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref()); + let verify_family_result = V::verify_block_family(&header, &block.bytes, engine, self.chain.deref()); if let Err(e) = verify_family_result { warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); return Err(()); }; // Check if Parent is in chain - let chain_has_parent = self.chain.read().unwrap().block_header(&header.parent_hash); + let chain_has_parent = self.chain.block_header(&header.parent_hash); if let None = chain_has_parent { warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); return Err(()); @@ -250,7 +249,7 @@ impl Client where V: Verifier { let mut bad_blocks = HashSet::new(); let _import_lock = self.import_lock.lock(); - let blocks = self.block_queue.write().unwrap().drain(max_blocks_to_import); + let blocks = self.block_queue.drain(max_blocks_to_import); let original_best = self.chain_info().best_block_hash; @@ -271,8 +270,7 @@ impl Client where V: Verifier { // Are we committing an era? let ancient = if header.number() >= HISTORY { let n = header.number() - HISTORY; - let chain = self.chain.read().unwrap(); - Some((n, chain.block_hash(n).unwrap())) + Some((n, self.chain.block_hash(n).unwrap())) } else { None }; @@ -286,8 +284,7 @@ impl Client where V: Verifier { // And update the chain after commit to prevent race conditions // (when something is in chain but you are not able to fetch details) - self.chain.write().unwrap() - .insert_block(&block.bytes, receipts); + self.chain.insert_block(&block.bytes, receipts); self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); @@ -297,18 +294,16 @@ impl Client where V: Verifier { let bad_blocks = bad_blocks.into_iter().collect::>(); { - let mut block_queue = self.block_queue.write().unwrap(); if !bad_blocks.is_empty() { - block_queue.mark_as_bad(&bad_blocks); + self.block_queue.mark_as_bad(&bad_blocks); } if !good_blocks.is_empty() { - block_queue.mark_as_good(&good_blocks); + self.block_queue.mark_as_good(&good_blocks); } } { - let block_queue = self.block_queue.read().unwrap(); - if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { + if !good_blocks.is_empty() && self.block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, bad: bad_blocks, @@ -332,7 +327,7 @@ impl Client where V: Verifier { /// Get info on the cache. pub fn blockchain_cache_info(&self) -> BlockChainCacheSize { - self.chain.read().unwrap().cache_size() + self.chain.cache_size() } /// Get the report. @@ -344,13 +339,13 @@ impl Client where V: Verifier { /// Tick the client. pub fn tick(&self) { - self.chain.read().unwrap().collect_garbage(); - self.block_queue.read().unwrap().collect_garbage(); + self.chain.collect_garbage(); + self.block_queue.collect_garbage(); } /// Set up the cache behaviour. pub fn configure_cache(&self, pref_cache_size: usize, max_cache_size: usize) { - self.chain.write().unwrap().configure_cache(pref_cache_size, max_cache_size); + self.chain.configure_cache(pref_cache_size, max_cache_size); } fn block_hash(chain: &BlockChain, id: BlockId) -> Option { @@ -365,9 +360,9 @@ impl Client where V: Verifier { fn block_number(&self, id: BlockId) -> Option { match id { BlockId::Number(number) => Some(number), - BlockId::Hash(ref hash) => self.chain.read().unwrap().block_number(hash), + BlockId::Hash(ref hash) => self.chain.block_number(hash), BlockId::Earliest => Some(0), - BlockId::Latest => Some(self.chain.read().unwrap().best_block_number()) + BlockId::Latest => Some(self.chain.best_block_number()) } } @@ -393,17 +388,17 @@ impl Client where V: Verifier { /// New chain head event. Restart mining operation. pub fn prepare_sealing(&self) { - let h = self.chain.read().unwrap().best_block_hash(); + let h = self.chain.best_block_hash(); let mut b = OpenBlock::new( self.engine.deref().deref(), self.state_db.lock().unwrap().clone(), - match self.chain.read().unwrap().block_header(&h) { Some(ref x) => x, None => {return;} }, + match self.chain.block_header(&h) { Some(ref x) => x, None => {return;} }, self.build_last_hashes(h.clone()), self.author(), self.extra_data() ); - self.chain.read().unwrap().find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); }); + self.chain.find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); }); // TODO: push transactions. @@ -417,14 +412,12 @@ impl Client where V: Verifier { impl BlockChainClient for Client where V: Verifier { fn block_header(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) + Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) } fn block_body(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| { - chain.block(&hash).map(|bytes| { + Self::block_hash(&self.chain, id).and_then(|hash| { + self.chain.block(&hash).map(|bytes| { let rlp = Rlp::new(&bytes); let mut body = RlpStream::new_list(2); body.append_raw(rlp.at(1).as_raw(), 1); @@ -435,24 +428,21 @@ impl BlockChainClient for Client where V: Verifier { } fn block(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| { - chain.block(&hash) + Self::block_hash(&self.chain, id).and_then(|hash| { + self.chain.block(&hash) }) } fn block_status(&self, id: BlockId) -> BlockStatus { - let chain = self.chain.read().unwrap(); - match Self::block_hash(&chain, id) { - Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain, - Some(hash) => self.block_queue.read().unwrap().block_status(&hash), + match Self::block_hash(&self.chain, id) { + Some(ref hash) if self.chain.is_known(hash) => BlockStatus::InChain, + Some(hash) => self.block_queue.block_status(&hash), None => BlockStatus::Unknown } } fn block_total_difficulty(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) + Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block_details(&hash)).map(|d| d.total_difficulty) } fn nonce(&self, address: &Address) -> U256 { @@ -460,8 +450,7 @@ impl BlockChainClient for Client where V: Verifier { } fn block_hash(&self, id: BlockId) -> Option { - let chain = self.chain.read().unwrap(); - Self::block_hash(&chain, id) + Self::block_hash(&self.chain, id) } fn code(&self, address: &Address) -> Option { @@ -469,20 +458,18 @@ impl BlockChainClient for Client where V: Verifier { } fn transaction(&self, id: TransactionId) -> Option { - let chain = self.chain.read().unwrap(); match id { - TransactionId::Hash(ref hash) => chain.transaction_address(hash), - TransactionId::Location(id, index) => Self::block_hash(&chain, id).map(|hash| TransactionAddress { + TransactionId::Hash(ref hash) => self.chain.transaction_address(hash), + TransactionId::Location(id, index) => Self::block_hash(&self.chain, id).map(|hash| TransactionAddress { block_hash: hash, index: index }) - }.and_then(|address| chain.transaction(&address)) + }.and_then(|address| self.chain.transaction(&address)) } fn tree_route(&self, from: &H256, to: &H256) -> Option { - let chain = self.chain.read().unwrap(); - match chain.is_known(from) && chain.is_known(to) { - true => Some(chain.tree_route(from.clone(), to.clone())), + match self.chain.is_known(from) && self.chain.is_known(to) { + true => Some(self.chain.tree_route(from.clone(), to.clone())), false => None } } @@ -498,38 +485,37 @@ impl BlockChainClient for Client where V: Verifier { fn import_block(&self, bytes: Bytes) -> ImportResult { { let header = BlockView::new(&bytes).header_view(); - if self.chain.read().unwrap().is_known(&header.sha3()) { + if self.chain.is_known(&header.sha3()) { return Err(x!(ImportError::AlreadyInChain)); } if self.block_status(BlockId::Hash(header.parent_hash())) == BlockStatus::Unknown { return Err(x!(BlockError::UnknownParent(header.parent_hash()))); } } - self.block_queue.write().unwrap().import_block(bytes) + self.block_queue.import_block(bytes) } fn queue_info(&self) -> BlockQueueInfo { - self.block_queue.read().unwrap().queue_info() + self.block_queue.queue_info() } fn clear_queue(&self) { - self.block_queue.write().unwrap().clear(); + self.block_queue.clear(); } fn chain_info(&self) -> BlockChainInfo { - let chain = self.chain.read().unwrap(); BlockChainInfo { - total_difficulty: chain.best_block_total_difficulty(), - pending_total_difficulty: chain.best_block_total_difficulty(), - genesis_hash: chain.genesis_hash(), - best_block_hash: chain.best_block_hash(), - best_block_number: From::from(chain.best_block_number()) + total_difficulty: self.chain.best_block_total_difficulty(), + pending_total_difficulty: self.chain.best_block_total_difficulty(), + genesis_hash: self.chain.genesis_hash(), + best_block_hash: self.chain.best_block_hash(), + best_block_number: From::from(self.chain.best_block_number()) } } fn blocks_with_bloom(&self, bloom: &H2048, from_block: BlockId, to_block: BlockId) -> Option> { match (self.block_number(from_block), self.block_number(to_block)) { - (Some(from), Some(to)) => Some(self.chain.read().unwrap().blocks_with_bloom(bloom, from, to)), + (Some(from), Some(to)) => Some(self.chain.blocks_with_bloom(bloom, from, to)), _ => None } } @@ -548,9 +534,9 @@ impl BlockChainClient for Client where V: Verifier { blocks.sort(); blocks.into_iter() - .filter_map(|number| self.chain.read().unwrap().block_hash(number).map(|hash| (number, hash))) - .filter_map(|(number, hash)| self.chain.read().unwrap().block_receipts(&hash).map(|r| (number, hash, r.receipts))) - .filter_map(|(number, hash, receipts)| self.chain.read().unwrap().block(&hash).map(|ref b| (number, hash, receipts, BlockView::new(b).transaction_hashes()))) + .filter_map(|number| self.chain.block_hash(number).map(|hash| (number, hash))) + .filter_map(|(number, hash)| self.chain.block_receipts(&hash).map(|r| (number, hash, r.receipts))) + .filter_map(|(number, hash, receipts)| self.chain.block(&hash).map(|ref b| (number, hash, receipts, BlockView::new(b).transaction_hashes()))) .flat_map(|(number, hash, receipts, hashes)| { let mut log_index = 0; receipts.into_iter()