From 456ad9e21b8e57fa4e39e982d0fde570679fe25f Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 6 Jul 2016 19:52:34 +0200 Subject: [PATCH] Remove .lock().unwrap() idiom into locked(). --- dapps/src/lib.rs | 4 +- ethcore/src/block_queue.rs | 56 +++++++-------- ethcore/src/client/client.rs | 22 +++--- ethcore/src/miner/miner.rs | 82 +++++++++++----------- ethcore/src/miner/mod.rs | 2 +- ethcore/src/miner/work_notify.rs | 4 +- parity/main.rs | 4 +- rpc/rpctest/src/main.rs | 3 +- rpc/src/v1/helpers/signing_queue.rs | 16 ++--- rpc/src/v1/impls/eth.rs | 3 +- rpc/src/v1/impls/eth_filter.rs | 13 ++-- rpc/src/v1/tests/helpers/miner_service.rs | 20 +++--- rpc/src/v1/tests/mocked/eth.rs | 3 +- rpc/src/v1/tests/mocked/personal_signer.rs | 5 +- util/src/io/worker.rs | 3 +- util/src/misc.rs | 10 +++ util/src/network/host.rs | 66 ++++++++--------- util/src/network/tests.rs | 4 +- util/src/panics.rs | 5 +- 19 files changed, 172 insertions(+), 153 deletions(-) diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index 1c550fb07..6952d509c 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -53,6 +53,7 @@ extern crate jsonrpc_core; extern crate jsonrpc_http_server; extern crate parity_dapps; extern crate ethcore_rpc; +extern crate ethcore_util; extern crate mime_guess; mod endpoint; @@ -67,6 +68,7 @@ mod proxypac; use std::sync::{Arc, Mutex}; use std::net::SocketAddr; use std::collections::HashMap; +use ethcore_util::misc::Lockable; use jsonrpc_core::{IoHandler, IoDelegate}; use router::auth::{Authorization, NoAuth, HttpBasicAuth}; use ethcore_rpc::Extendable; @@ -149,7 +151,7 @@ impl Server { /// Set callback for panics. pub fn set_panic_handler(&self, handler: F) where F : Fn() -> () + Send + 'static { - *self.panic_handler.lock().unwrap() = Some(Box::new(handler)); + *self.panic_handler.locked() = Some(Box::new(handler)); } } diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index ce99dcccd..3ad329af5 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -207,9 +207,9 @@ impl BlockQueue { fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { - let mut unverified = verification.unverified.lock().unwrap(); + let mut unverified = verification.unverified.locked(); - if unverified.is_empty() && verification.verifying.lock().unwrap().is_empty() { + if unverified.is_empty() && verification.verifying.locked().is_empty() { empty.notify_all(); } @@ -223,11 +223,11 @@ impl BlockQueue { } let block = { - let mut unverified = verification.unverified.lock().unwrap(); + let mut unverified = verification.unverified.locked(); if unverified.is_empty() { continue; } - let mut verifying = verification.verifying.lock().unwrap(); + let mut verifying = verification.verifying.locked(); let block = unverified.pop_front().unwrap(); verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); block @@ -236,7 +236,7 @@ impl BlockQueue { let block_hash = block.header.hash(); match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { Ok(verified) => { - let mut verifying = verification.verifying.lock().unwrap(); + let mut verifying = verification.verifying.locked(); for e in verifying.iter_mut() { if e.hash == block_hash { e.block = Some(verified); @@ -245,16 +245,16 @@ impl BlockQueue { } if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash { // we're next! - let mut verified = verification.verified.lock().unwrap(); - let mut bad = verification.bad.lock().unwrap(); + let mut verified = verification.verified.locked(); + let mut bad = verification.bad.locked(); BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } }, Err(err) => { - let mut verifying = verification.verifying.lock().unwrap(); - let mut verified = verification.verified.lock().unwrap(); - let mut bad = verification.bad.lock().unwrap(); + let mut verifying = verification.verifying.locked(); + let mut verified = verification.verified.locked(); + let mut bad = verification.bad.locked(); warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); bad.insert(block_hash.clone()); verifying.retain(|e| e.hash != block_hash); @@ -279,9 +279,9 @@ impl BlockQueue { /// Clear the queue and stop verification activity. pub fn clear(&self) { - let mut unverified = self.verification.unverified.lock().unwrap(); - let mut verifying = self.verification.verifying.lock().unwrap(); - let mut verified = self.verification.verified.lock().unwrap(); + let mut unverified = self.verification.unverified.locked(); + let mut verifying = self.verification.verifying.locked(); + let mut verified = self.verification.verified.locked(); unverified.clear(); verifying.clear(); verified.clear(); @@ -290,8 +290,8 @@ impl BlockQueue { /// Wait for unverified queue to be empty pub fn flush(&self) { - let mut unverified = self.verification.unverified.lock().unwrap(); - while !unverified.is_empty() || !self.verification.verifying.lock().unwrap().is_empty() { + let mut unverified = self.verification.unverified.locked(); + while !unverified.is_empty() || !self.verification.verifying.locked().is_empty() { unverified = self.empty.wait(unverified).unwrap(); } } @@ -301,7 +301,7 @@ impl BlockQueue { if self.processing.read().unwrap().contains(&hash) { return BlockStatus::Queued; } - if self.verification.bad.lock().unwrap().contains(&hash) { + if self.verification.bad.locked().contains(&hash) { return BlockStatus::Bad; } BlockStatus::Unknown @@ -316,7 +316,7 @@ impl BlockQueue { return Err(ImportError::AlreadyQueued.into()); } - let mut bad = self.verification.bad.lock().unwrap(); + let mut bad = self.verification.bad.locked(); if bad.contains(&h) { return Err(ImportError::KnownBad.into()); } @@ -330,13 +330,13 @@ impl BlockQueue { match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { Ok(()) => { self.processing.write().unwrap().insert(h.clone()); - self.verification.unverified.lock().unwrap().push_back(UnverifiedBlock { header: header, bytes: bytes }); + self.verification.unverified.locked().push_back(UnverifiedBlock { header: header, bytes: bytes }); self.more_to_verify.notify_all(); Ok(h) }, Err(err) => { warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); - self.verification.bad.lock().unwrap().insert(h.clone()); + self.verification.bad.locked().insert(h.clone()); Err(err) } } @@ -347,9 +347,9 @@ impl BlockQueue { if block_hashes.is_empty() { return; } - let mut verified_lock = self.verification.verified.lock().unwrap(); + let mut verified_lock = self.verification.verified.locked(); let mut verified = verified_lock.deref_mut(); - let mut bad = self.verification.bad.lock().unwrap(); + let mut bad = self.verification.bad.locked(); let mut processing = self.processing.write().unwrap(); bad.reserve(block_hashes.len()); for hash in block_hashes { @@ -382,7 +382,7 @@ impl BlockQueue { /// Removes up to `max` verified blocks from the queue pub fn drain(&self, max: usize) -> Vec { - let mut verified = self.verification.verified.lock().unwrap(); + let mut verified = self.verification.verified.locked(); let count = min(max, verified.len()); let mut result = Vec::with_capacity(count); for _ in 0..count { @@ -399,15 +399,15 @@ impl BlockQueue { /// Get queue status. pub fn queue_info(&self) -> BlockQueueInfo { let (unverified_len, unverified_bytes) = { - let v = self.verification.unverified.lock().unwrap(); + let v = self.verification.unverified.locked(); (v.len(), v.heap_size_of_children()) }; let (verifying_len, verifying_bytes) = { - let v = self.verification.verifying.lock().unwrap(); + let v = self.verification.verifying.locked(); (v.len(), v.heap_size_of_children()) }; let (verified_len, verified_bytes) = { - let v = self.verification.verified.lock().unwrap(); + let v = self.verification.verified.locked(); (v.len(), v.heap_size_of_children()) }; BlockQueueInfo { @@ -428,9 +428,9 @@ impl BlockQueue { /// Optimise memory footprint of the heap fields. pub fn collect_garbage(&self) { { - self.verification.unverified.lock().unwrap().shrink_to_fit(); - self.verification.verifying.lock().unwrap().shrink_to_fit(); - self.verification.verified.lock().unwrap().shrink_to_fit(); + self.verification.unverified.locked().shrink_to_fit(); + self.verification.verifying.locked().shrink_to_fit(); + self.verification.verified.locked().shrink_to_fit(); } self.processing.write().unwrap().shrink_to_fit(); } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index fffef2bb2..a2796a09e 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -258,7 +258,7 @@ impl Client { // Enact Verified Block let parent = chain_has_parent.unwrap(); let last_hashes = self.build_last_hashes(header.parent_hash.clone()); - let db = self.state_db.lock().unwrap().boxed_clone(); + let db = self.state_db.locked().boxed_clone(); let enact_result = enact_verified(&block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone()); if let Err(e) = enact_result { @@ -432,7 +432,7 @@ impl Client { }; self.block_header(id).and_then(|header| { - let db = self.state_db.lock().unwrap().boxed_clone(); + let db = self.state_db.locked().boxed_clone(); // early exit for pruned blocks if db.is_pruned() && self.chain.best_block_number() >= block_number + HISTORY { @@ -448,7 +448,7 @@ impl Client { /// Get a copy of the best block's state. pub fn state(&self) -> State { State::from_existing( - self.state_db.lock().unwrap().boxed_clone(), + self.state_db.locked().boxed_clone(), HeaderView::new(&self.best_block_header()).state_root(), self.engine.account_start_nonce(), self.trie_factory.clone()) @@ -463,7 +463,7 @@ impl Client { /// Get the report. pub fn report(&self) -> ClientReport { let mut report = self.report.read().unwrap().clone(); - report.state_db_mem = self.state_db.lock().unwrap().mem_used(); + report.state_db_mem = self.state_db.locked().mem_used(); report } @@ -475,7 +475,7 @@ impl Client { match self.mode { Mode::Dark(timeout) => { - let mut ss = self.sleep_state.lock().unwrap(); + let mut ss = self.sleep_state.locked(); if let Some(t) = ss.last_activity { if Instant::now() > t + timeout { self.sleep(); @@ -484,7 +484,7 @@ impl Client { } } Mode::Passive(timeout, wakeup_after) => { - let mut ss = self.sleep_state.lock().unwrap(); + let mut ss = self.sleep_state.locked(); let now = Instant::now(); if let Some(t) = ss.last_activity { if now > t + timeout { @@ -557,14 +557,14 @@ impl Client { } else { trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing."); // TODO: Consider uncommenting. - //*self.last_activity.lock().unwrap() = Some(Instant::now()); + //*self.last_activity.locked() = Some(Instant::now()); } } } /// Notify us that the network has been started. pub fn network_started(&self, url: &String) { - let mut previous_enode = self.previous_enode.lock().unwrap(); + let mut previous_enode = self.previous_enode.locked(); if let Some(ref u) = *previous_enode { if u == url { return; @@ -616,7 +616,7 @@ impl BlockChainClient for Client { fn keep_alive(&self) { if self.mode != Mode::Active { self.wake_up(); - (*self.sleep_state.lock().unwrap()).last_activity = Some(Instant::now()); + (*self.sleep_state.locked()).last_activity = Some(Instant::now()); } } @@ -740,7 +740,7 @@ impl BlockChainClient for Client { } fn state_data(&self, hash: &H256) -> Option { - self.state_db.lock().unwrap().state(hash) + self.state_db.locked().state(hash) } fn block_receipts(&self, hash: &H256) -> Option { @@ -902,7 +902,7 @@ impl MiningBlockChainClient for Client { &self.vm_factory, self.trie_factory.clone(), false, // TODO: this will need to be parameterised once we want to do immediate mining insertion. - self.state_db.lock().unwrap().boxed_clone(), + self.state_db.locked().boxed_clone(), &self.chain.block_header(&h).expect("h is best block hash: so it's header must exist: qed"), self.build_last_hashes(h.clone()), author, diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 0ee56dccb..c9a20e7f1 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -161,8 +161,8 @@ impl Miner { trace!(target: "miner", "prepare_sealing: entering"); let (transactions, mut open_block, original_work_hash) = { - let transactions = {self.transaction_queue.lock().unwrap().top_transactions()}; - let mut sealing_work = self.sealing_work.lock().unwrap(); + let transactions = {self.transaction_queue.locked().top_transactions()}; + let mut sealing_work = self.sealing_work.locked(); let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); let best_hash = chain.best_block_header().sha3(); /* @@ -232,7 +232,7 @@ impl Miner { }; { - let mut queue = self.transaction_queue.lock().unwrap(); + let mut queue = self.transaction_queue.locked(); for hash in invalid_transactions.into_iter() { queue.remove_invalid(&hash, &fetch_account); } @@ -263,7 +263,7 @@ impl Miner { } let (work, is_new) = { - let mut sealing_work = self.sealing_work.lock().unwrap(); + let mut sealing_work = self.sealing_work.locked(); let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); trace!(target: "miner", "Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash()); let (work, is_new) = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) { @@ -291,20 +291,20 @@ impl Miner { fn update_gas_limit(&self, chain: &MiningBlockChainClient) { let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit(); - let mut queue = self.transaction_queue.lock().unwrap(); + let mut queue = self.transaction_queue.locked(); queue.set_gas_limit(gas_limit); } /// Returns true if we had to prepare new pending block fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool { trace!(target: "miner", "enable_and_prepare_sealing: entering"); - let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_some(); + let have_work = self.sealing_work.locked().peek_last_ref().is_some(); trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work); if !have_work { self.sealing_enabled.store(true, atomic::Ordering::Relaxed); self.prepare_sealing(chain); } - let mut sealing_block_last_request = self.sealing_block_last_request.lock().unwrap(); + let mut sealing_block_last_request = self.sealing_block_last_request.locked(); let best_number = chain.chain_info().best_block_number; if *sealing_block_last_request != best_number { trace!(target: "miner", "enable_and_prepare_sealing: Miner received request (was {}, now {}) - waking up.", *sealing_block_last_request, best_number); @@ -329,7 +329,7 @@ impl Miner { } /// Are we allowed to do a non-mandatory reseal? - fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() } + fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.locked() } } const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5; @@ -337,13 +337,13 @@ const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5; impl MinerService for Miner { fn clear_and_reset(&self, chain: &MiningBlockChainClient) { - self.transaction_queue.lock().unwrap().clear(); + self.transaction_queue.locked().clear(); self.update_sealing(chain); } fn status(&self) -> MinerStatus { - let status = self.transaction_queue.lock().unwrap().status(); - let sealing_work = self.sealing_work.lock().unwrap(); + let status = self.transaction_queue.locked().status(); + let sealing_work = self.sealing_work.locked(); MinerStatus { transactions_in_pending_queue: status.pending, transactions_in_future_queue: status.future, @@ -352,7 +352,7 @@ impl MinerService for Miner { } fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction, analytics: CallAnalytics) -> Result { - let sealing_work = self.sealing_work.lock().unwrap(); + let sealing_work = self.sealing_work.locked(); match sealing_work.peek_last_ref() { Some(work) => { let block = work.block(); @@ -399,7 +399,7 @@ impl MinerService for Miner { } fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 { - let sealing_work = self.sealing_work.lock().unwrap(); + let sealing_work = self.sealing_work.locked(); sealing_work.peek_last_ref().map_or_else( || chain.latest_balance(address), |b| b.block().fields().state.balance(address) @@ -407,7 +407,7 @@ impl MinerService for Miner { } fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 { - let sealing_work = self.sealing_work.lock().unwrap(); + let sealing_work = self.sealing_work.locked(); sealing_work.peek_last_ref().map_or_else( || chain.latest_storage_at(address, position), |b| b.block().fields().state.storage_at(address, position) @@ -415,12 +415,12 @@ impl MinerService for Miner { } fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 { - let sealing_work = self.sealing_work.lock().unwrap(); + let sealing_work = self.sealing_work.locked(); sealing_work.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address)) } fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option { - let sealing_work = self.sealing_work.lock().unwrap(); + let sealing_work = self.sealing_work.locked(); sealing_work.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address)) } @@ -442,16 +442,16 @@ impl MinerService for Miner { } fn set_minimal_gas_price(&self, min_gas_price: U256) { - self.transaction_queue.lock().unwrap().set_minimal_gas_price(min_gas_price); + self.transaction_queue.locked().set_minimal_gas_price(min_gas_price); } fn minimal_gas_price(&self) -> U256 { - *self.transaction_queue.lock().unwrap().minimal_gas_price() + *self.transaction_queue.locked().minimal_gas_price() } fn sensible_gas_price(&self) -> U256 { // 10% above our minimum. - *self.transaction_queue.lock().unwrap().minimal_gas_price() * 110.into() / 100.into() + *self.transaction_queue.locked().minimal_gas_price() * 110.into() / 100.into() } fn sensible_gas_limit(&self) -> U256 { @@ -459,15 +459,15 @@ impl MinerService for Miner { } fn transactions_limit(&self) -> usize { - self.transaction_queue.lock().unwrap().limit() + self.transaction_queue.locked().limit() } fn set_transactions_limit(&self, limit: usize) { - self.transaction_queue.lock().unwrap().set_limit(limit) + self.transaction_queue.locked().set_limit(limit) } fn set_tx_gas_limit(&self, limit: U256) { - self.transaction_queue.lock().unwrap().set_tx_gas_limit(limit) + self.transaction_queue.locked().set_tx_gas_limit(limit) } /// Get the author that we will seal blocks as. @@ -493,7 +493,7 @@ impl MinerService for Miner { fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec) -> Vec> { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let mut transaction_queue = self.transaction_queue.locked(); let results = self.add_transactions_to_queue(chain, transactions, TransactionOrigin::External, &mut transaction_queue); @@ -514,7 +514,7 @@ impl MinerService for Miner { let imported = { // Be sure to release the lock before we call enable_and_prepare_sealing - let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let mut transaction_queue = self.transaction_queue.locked(); let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap(); match import { @@ -546,13 +546,13 @@ impl MinerService for Miner { } fn all_transactions(&self) -> Vec { - let queue = self.transaction_queue.lock().unwrap(); + let queue = self.transaction_queue.locked(); queue.top_transactions() } fn pending_transactions(&self) -> Vec { - let queue = self.transaction_queue.lock().unwrap(); - let sw = self.sealing_work.lock().unwrap(); + let queue = self.transaction_queue.locked(); + let sw = self.sealing_work.locked(); // TODO: should only use the sealing_work when it's current (it could be an old block) let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { true => sw.peek_last_ref(), @@ -565,8 +565,8 @@ impl MinerService for Miner { } fn pending_transactions_hashes(&self) -> Vec { - let queue = self.transaction_queue.lock().unwrap(); - let sw = self.sealing_work.lock().unwrap(); + let queue = self.transaction_queue.locked(); + let sw = self.sealing_work.locked(); let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { true => sw.peek_last_ref(), false => None, @@ -578,8 +578,8 @@ impl MinerService for Miner { } fn transaction(&self, hash: &H256) -> Option { - let queue = self.transaction_queue.lock().unwrap(); - let sw = self.sealing_work.lock().unwrap(); + let queue = self.transaction_queue.locked(); + let sw = self.sealing_work.locked(); let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { true => sw.peek_last_ref(), false => None, @@ -591,7 +591,7 @@ impl MinerService for Miner { } fn pending_receipts(&self) -> BTreeMap { - match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) { + match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.locked().peek_last_ref()) { (true, Some(pending)) => { let hashes = pending.transactions() .iter() @@ -606,14 +606,14 @@ impl MinerService for Miner { } fn last_nonce(&self, address: &Address) -> Option { - self.transaction_queue.lock().unwrap().last_nonce(address) + self.transaction_queue.locked().last_nonce(address) } fn update_sealing(&self, chain: &MiningBlockChainClient) { if self.sealing_enabled.load(atomic::Ordering::Relaxed) { let current_no = chain.chain_info().best_block_number; - let has_local_transactions = self.transaction_queue.lock().unwrap().has_local_pending_transactions(); - let last_request = *self.sealing_block_last_request.lock().unwrap(); + let has_local_transactions = self.transaction_queue.locked().has_local_pending_transactions(); + let last_request = *self.sealing_block_last_request.locked(); let should_disable_sealing = !self.forced_sealing() && !has_local_transactions && current_no > last_request @@ -622,9 +622,9 @@ impl MinerService for Miner { if should_disable_sealing { trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request); self.sealing_enabled.store(false, atomic::Ordering::Relaxed); - self.sealing_work.lock().unwrap().reset(); + self.sealing_work.locked().reset(); } else { - *self.next_allowed_reseal.lock().unwrap() = Instant::now() + self.options.reseal_min_period; + *self.next_allowed_reseal.locked() = Instant::now() + self.options.reseal_min_period; self.prepare_sealing(chain); } } @@ -634,14 +634,14 @@ impl MinerService for Miner { trace!(target: "miner", "map_sealing_work: entering"); self.enable_and_prepare_sealing(chain); trace!(target: "miner", "map_sealing_work: sealing prepared"); - let mut sealing_work = self.sealing_work.lock().unwrap(); + let mut sealing_work = self.sealing_work.locked(); let ret = sealing_work.use_last_ref(); trace!(target: "miner", "map_sealing_work: leaving use_last_ref={:?}", ret.as_ref().map(|b| b.block().fields().header.hash())); ret.map(f) } fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error> { - let result = if let Some(b) = self.sealing_work.lock().unwrap().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { + let result = if let Some(b) = self.sealing_work.locked().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { b.lock().try_seal(self.engine(), seal).or_else(|_| { warn!(target: "miner", "Mined solution rejected: Invalid."); Err(Error::PowInvalid) @@ -688,7 +688,7 @@ impl MinerService for Miner { .par_iter() .map(|h| fetch_transactions(chain, h)); out_of_chain.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let mut transaction_queue = self.transaction_queue.locked(); let _ = self.add_transactions_to_queue( chain, txs, TransactionOrigin::External, &mut transaction_queue ); @@ -702,7 +702,7 @@ impl MinerService for Miner { .map(|h: &H256| fetch_transactions(chain, h)); in_chain.for_each(|mut txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let mut transaction_queue = self.transaction_queue.locked(); let to_remove = txs.drain(..) .map(|tx| { diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 59acaebd9..06a2ccf2e 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -38,7 +38,7 @@ //! assert_eq!(miner.status().transactions_in_pending_queue, 0); //! //! // Check block for sealing -//! //assert!(miner.sealing_block(client.deref()).lock().unwrap().is_some()); +//! //assert!(miner.sealing_block(client.deref()).locked().is_some()); //! } //! ``` diff --git a/ethcore/src/miner/work_notify.rs b/ethcore/src/miner/work_notify.rs index ea2f6140e..a1b6f0a8b 100644 --- a/ethcore/src/miner/work_notify.rs +++ b/ethcore/src/miner/work_notify.rs @@ -61,13 +61,13 @@ impl WorkPoster { pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { // TODO: move this to engine let target = Ethash::difficulty_to_boundary(&difficulty); - let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(number); + let seed_hash = &self.seed_compute.locked().get_seedhash(number); let seed_hash = H256::from_slice(&seed_hash[..]); let body = format!( r#"{{ "result": ["0x{}","0x{}","0x{}","0x{:x}"] }}"#, pow_hash.hex(), seed_hash.hex(), target.hex(), number ); - let mut client = self.client.lock().unwrap(); + let mut client = self.client.locked(); for u in &self.urls { if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) { warn!("Error sending HTTP notification to {} : {}, retrying", u, e); diff --git a/parity/main.rs b/parity/main.rs index f54c39ab5..495d2a3ad 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -80,7 +80,7 @@ use std::thread::sleep; use std::time::Duration; use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; -use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError, Colour, Applyable, version, journaldb}; +use util::{Lockable, H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError, Colour, Applyable, version, journaldb}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; use ethcore::client::{Mode, BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError}; use ethcore::error::{ImportError}; @@ -614,7 +614,7 @@ fn wait_for_exit( // Wait for signal let mutex = Mutex::new(()); - let _ = exit.wait(mutex.lock().unwrap()).unwrap(); + let _ = exit.wait(mutex.locked()).unwrap(); info!("Finishing work, please wait..."); } diff --git a/rpc/rpctest/src/main.rs b/rpc/rpctest/src/main.rs index 139514cb1..ed812cd79 100644 --- a/rpc/rpctest/src/main.rs +++ b/rpc/rpctest/src/main.rs @@ -41,6 +41,7 @@ use rpc::v1::tests::helpers::{TestSyncProvider, Config as SyncConfig, TestMinerS use rpc::v1::{Eth, EthClient, EthFilter, EthFilterClient}; use util::panics::MayPanic; use util::hash::Address; +use util::Lockable; const USAGE: &'static str = r#" Parity rpctest client. @@ -137,7 +138,7 @@ impl Configuration { panic_handler.on_panic(move |_reason| { e.notify_all(); }); let mutex = Mutex::new(()); - let _ = exit.wait(mutex.lock().unwrap()).unwrap(); + let _ = exit.wait(mutex.locked()).unwrap(); } } diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index 756718000..c21a056d8 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -19,7 +19,7 @@ use std::time::{Instant, Duration}; use std::sync::{mpsc, Mutex, RwLock, Arc}; use std::collections::HashMap; use jsonrpc_core; -use util::U256; +use util::{U256, Lockable}; use v1::helpers::{TransactionRequest, TransactionConfirmation}; /// Result that can be returned from JSON RPC. @@ -110,7 +110,7 @@ pub struct ConfirmationPromise { impl ConfirmationToken { /// Submit solution to all listeners fn resolve(&self, result: Option) { - let mut res = self.result.lock().unwrap(); + let mut res = self.result.locked(); *res = result.map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h)); // Notify listener self.handle.unpark(); @@ -142,7 +142,7 @@ impl ConfirmationPromise { // Park thread (may wake up spuriously) thread::park_timeout(deadline - now); // Take confirmation result - let res = self.result.lock().unwrap(); + let res = self.result.locked(); // Check the result match *res { ConfirmationResult::Rejected => return None, @@ -183,7 +183,7 @@ impl ConfirmationsQueue { /// This method can be used only once (only single consumer of events can exist). pub fn start_listening(&self, listener: F) -> Result<(), QueueError> where F: Fn(QueueEvent) -> () { - let recv = self.receiver.lock().unwrap().take(); + let recv = self.receiver.locked().take(); if let None = recv { return Err(QueueError::AlreadyUsed); } @@ -208,7 +208,7 @@ impl ConfirmationsQueue { /// Notifies receiver about the event happening in this queue. fn notify(&self, message: QueueEvent) { // We don't really care about the result - let _ = self.sender.lock().unwrap().send(message); + let _ = self.sender.locked().send(message); } /// Removes transaction from this queue and notifies `ConfirmationPromise` holders about the result. @@ -241,7 +241,7 @@ impl SigningQueue for ConfirmationsQueue { fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise { // Increment id let id = { - let mut last_id = self.id.lock().unwrap(); + let mut last_id = self.id.locked(); *last_id = *last_id + U256::from(1); *last_id }; @@ -354,7 +354,7 @@ mod test { let r = received.clone(); let handle = thread::spawn(move || { q.start_listening(move |notification| { - let mut v = r.lock().unwrap(); + let mut v = r.locked(); *v = Some(notification); }).expect("Should be closed nicely.") }); @@ -363,7 +363,7 @@ mod test { // then handle.join().expect("Thread should finish nicely"); - let r = received.lock().unwrap().take(); + let r = received.locked().take(); assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1)))); } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 13f54feea..180d93769 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -28,6 +28,7 @@ use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; use util::rlp::{encode, decode, UntrustedRlp, View}; +use util::Lockable; use ethcore::account_provider::AccountProvider; use ethcore::client::{MiningBlockChainClient, BlockID, TransactionID, UncleID}; use ethcore::block::IsBlock; @@ -561,7 +562,7 @@ impl Eth for EthClient where miner.map_sealing_work(client.deref(), |b| { let pow_hash = b.hash(); let target = Ethash::difficulty_to_boundary(b.block().header().difficulty()); - let seed_hash = self.seed_compute.lock().unwrap().get_seedhash(b.block().header().number()); + let seed_hash = self.seed_compute.locked().get_seedhash(b.block().header().number()); let block_number = RpcU256::from(b.block().header().number()); to_value(&(RpcH256::from(pow_hash), RpcH256::from(seed_hash), RpcH256::from(target), block_number)) }).unwrap_or(Err(Error::internal_error())) // no work found. diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index cf2006788..017e0ad32 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -20,6 +20,7 @@ use std::ops::Deref; use std::sync::{Arc, Weak, Mutex}; use std::collections::HashSet; use jsonrpc_core::*; +use util::Lockable; use util::numbers::*; use ethcore::miner::MinerService; use ethcore::filter::Filter as EthcoreFilter; @@ -68,7 +69,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); from_params::<(Filter,)>(params) .and_then(|(filter,)| { - let mut polls = self.polls.lock().unwrap(); + let mut polls = self.polls.locked(); let block_number = take_weak!(self.client).chain_info().best_block_number; let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); to_value(&RpcU256::from(id)) @@ -79,7 +80,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); match params { Params::None => { - let mut polls = self.polls.lock().unwrap(); + let mut polls = self.polls.locked(); let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); to_value(&RpcU256::from(id)) }, @@ -91,7 +92,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); match params { Params::None => { - let mut polls = self.polls.lock().unwrap(); + let mut polls = self.polls.locked(); let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); @@ -106,7 +107,7 @@ impl EthFilter for EthFilterClient where let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { - let mut polls = self.polls.lock().unwrap(); + let mut polls = self.polls.locked(); match polls.poll_mut(&index.value()) { None => Ok(Value::Array(vec![] as Vec)), Some(filter) => match *filter { @@ -196,7 +197,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); from_params::<(Index,)>(params) .and_then(|(index,)| { - let mut polls = self.polls.lock().unwrap(); + let mut polls = self.polls.locked(); match polls.poll(&index.value()) { Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { let include_pending = filter.to_block == Some(BlockNumber::Pending); @@ -222,7 +223,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); from_params::<(Index,)>(params) .and_then(|(index,)| { - self.polls.lock().unwrap().remove_poll(&index.value()); + self.polls.locked().remove_poll(&index.value()); to_value(&true) }) } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index deb12d14d..8c050b118 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -16,7 +16,7 @@ //! Test implementation of miner service. -use util::{Address, H256, Bytes, U256, FixedHash, Uint}; +use util::{Address, H256, Bytes, U256, FixedHash, Uint, Lockable}; use util::standard::*; use ethcore::error::{Error, ExecutionError}; use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics}; @@ -133,7 +133,7 @@ impl MinerService for TestMinerService { fn import_external_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec) -> Vec> { // lets assume that all txs are valid - self.imported_transactions.lock().unwrap().extend_from_slice(&transactions); + self.imported_transactions.locked().extend_from_slice(&transactions); for sender in transactions.iter().filter_map(|t| t.sender().ok()) { let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests"); @@ -156,7 +156,7 @@ impl MinerService for TestMinerService { } // lets assume that all txs are valid - self.imported_transactions.lock().unwrap().push(transaction); + self.imported_transactions.locked().push(transaction); Ok(TransactionImportResult::Current) } @@ -186,19 +186,19 @@ impl MinerService for TestMinerService { } fn transaction(&self, hash: &H256) -> Option { - self.pending_transactions.lock().unwrap().get(hash).cloned() + self.pending_transactions.locked().get(hash).cloned() } fn all_transactions(&self) -> Vec { - self.pending_transactions.lock().unwrap().values().cloned().collect() + self.pending_transactions.locked().values().cloned().collect() } fn pending_transactions(&self) -> Vec { - self.pending_transactions.lock().unwrap().values().cloned().collect() + self.pending_transactions.locked().values().cloned().collect() } fn pending_receipts(&self) -> BTreeMap { - self.pending_receipts.lock().unwrap().clone() + self.pending_receipts.locked().clone() } fn last_nonce(&self, address: &Address) -> Option { @@ -212,7 +212,7 @@ impl MinerService for TestMinerService { } fn balance(&self, _chain: &MiningBlockChainClient, address: &Address) -> U256 { - self.latest_closed_block.lock().unwrap().as_ref().map_or_else(U256::zero, |b| b.block().fields().state.balance(address).clone()) + self.latest_closed_block.locked().as_ref().map_or_else(U256::zero, |b| b.block().fields().state.balance(address).clone()) } fn call(&self, _chain: &MiningBlockChainClient, _t: &SignedTransaction, _analytics: CallAnalytics) -> Result { @@ -220,7 +220,7 @@ impl MinerService for TestMinerService { } fn storage_at(&self, _chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 { - self.latest_closed_block.lock().unwrap().as_ref().map_or_else(H256::default, |b| b.block().fields().state.storage_at(address, position).clone()) + self.latest_closed_block.locked().as_ref().map_or_else(H256::default, |b| b.block().fields().state.storage_at(address, position).clone()) } fn nonce(&self, _chain: &MiningBlockChainClient, address: &Address) -> U256 { @@ -230,7 +230,7 @@ impl MinerService for TestMinerService { } fn code(&self, _chain: &MiningBlockChainClient, address: &Address) -> Option { - self.latest_closed_block.lock().unwrap().as_ref().map_or(None, |b| b.block().fields().state.code(address).clone()) + self.latest_closed_block.locked().as_ref().map_or(None, |b| b.block().fields().state.code(address).clone()) } } diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index 29c10a92b..d68e4b8b4 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use jsonrpc_core::IoHandler; +use util::Lockable; use util::hash::{Address, H256, FixedHash}; use util::numbers::{Uint, U256}; use ethcore::account_provider::AccountProvider; @@ -363,7 +364,7 @@ fn rpc_eth_pending_transaction_by_hash() { let tester = EthTester::default(); { let tx: SignedTransaction = decode(&FromHex::from_hex("f85f800182520894095e7baea6a6c7c4c2dfeb977efac326af552d870a801ba048b55bfa915ac795c431978d8a6a992b628d557da5ff759b307d495a36649353a0efffd310ac743f371de3b9f7f9cb56c0b28ad43601b4ab949f53faa07bd2c804").unwrap()); - tester.miner.pending_transactions.lock().unwrap().insert(H256::zero(), tx); + tester.miner.pending_transactions.locked().insert(H256::zero(), tx); } let response = r#"{"jsonrpc":"2.0","result":{"blockHash":null,"blockNumber":null,"creates":null,"from":"0x0f65fe9276bc9a24ae7083ae28e2660ef72df99e","gas":"0x5208","gasPrice":"0x01","hash":"0x41df922fd0d4766fcc02e161f8295ec28522f329ae487f14d811e4b64c8d6e31","input":"0x","nonce":"0x00","to":"0x095e7baea6a6c7c4c2dfeb977efac326af552d87","transactionIndex":null,"value":"0x0a"},"id":1}"#; diff --git a/rpc/src/v1/tests/mocked/personal_signer.rs b/rpc/src/v1/tests/mocked/personal_signer.rs index b2ef694a1..374c5dfa0 100644 --- a/rpc/src/v1/tests/mocked/personal_signer.rs +++ b/rpc/src/v1/tests/mocked/personal_signer.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::str::FromStr; use jsonrpc_core::IoHandler; use util::numbers::*; +use util::Lockable; use ethcore::account_provider::AccountProvider; use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Transaction, Action}; @@ -112,7 +113,7 @@ fn should_reject_transaction_from_queue_without_dispatching() { // then assert_eq!(tester.io.handle_request(&request), Some(response.to_owned())); assert_eq!(tester.queue.requests().len(), 0); - assert_eq!(tester.miner.imported_transactions.lock().unwrap().len(), 0); + assert_eq!(tester.miner.imported_transactions.locked().len(), 0); } #[test] @@ -181,6 +182,6 @@ fn should_confirm_transaction_and_dispatch() { // then assert_eq!(tester.io.handle_request(&request), Some(response.to_owned())); assert_eq!(tester.queue.requests().len(), 0); - assert_eq!(tester.miner.imported_transactions.lock().unwrap().len(), 1); + assert_eq!(tester.miner.imported_transactions.locked().len(), 1); } diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 917b1ad79..333acb6af 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -22,6 +22,7 @@ use crossbeam::sync::chase_lev; use io::service::{HandlerId, IoChannel, IoContext}; use io::{IoHandler}; use panics::*; +use misc::Lockable; pub enum WorkType { Readable, @@ -81,7 +82,7 @@ impl Worker { where Message: Send + Sync + Clone + 'static { loop { { - let lock = wait_mutex.lock().unwrap(); + let lock = wait_mutex.locked(); if deleting.load(AtomicOrdering::Acquire) { return; } diff --git a/util/src/misc.rs b/util/src/misc.rs index 8d22e04d7..3d4df8b94 100644 --- a/util/src/misc.rs +++ b/util/src/misc.rs @@ -65,3 +65,13 @@ pub fn version_data() -> Bytes { s.append(&&Target::os()[0..2]); s.out() } + +/// Object can be locked directly into a `MutexGuard`. +pub trait Lockable { + /// Lock object directly into a `MutexGuard`. + fn locked(&self) -> MutexGuard; +} + +impl Lockable for Mutex { + fn locked(&self) -> MutexGuard { self.lock().unwrap() } +} diff --git a/util/src/network/host.rs b/util/src/network/host.rs index a09295083..c85b4cc57 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -28,7 +28,7 @@ use std::fs; use mio::*; use mio::tcp::*; use hash::*; -use misc::version; +use misc::*; use crypto::*; use sha3::Hashable; use rlp::*; @@ -202,7 +202,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone protocol: ProtocolId, session: Option, sessions: Arc>>, reserved_peers: &'s HashSet) -> NetworkContext<'s, Message> { - let id = session.as_ref().map(|s| s.lock().unwrap().token()); + let id = session.as_ref().map(|s| s.locked().token()); NetworkContext { io: io, protocol: protocol, @@ -224,7 +224,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { let session = self.resolve_session(peer); if let Some(session) = session { - try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data)); + try!(session.locked().send_packet(self.io, self.protocol, packet_id as u8, &data)); } else { trace!(target: "network", "Send: Peer no longer exist") } @@ -262,7 +262,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone /// Check if the session is still active. pub fn is_expired(&self) -> bool { - self.session.as_ref().map_or(false, |s| s.lock().unwrap().expired()) + self.session.as_ref().map_or(false, |s| s.locked().expired()) } /// Register a new IO timer. 'IoHandler::timeout' will be called with the token. @@ -279,7 +279,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone pub fn peer_info(&self, peer: PeerId) -> String { let session = self.resolve_session(peer); if let Some(session) = session { - return session.lock().unwrap().info.client_version.clone() + return session.locked().info.client_version.clone() } "unknown".to_owned() } @@ -423,7 +423,7 @@ impl Host where Message: Send + Sync + Clone { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; self.nodes.write().unwrap().add_node(n); - if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { + if let Some(ref mut discovery) = *self.discovery.locked() { discovery.add_node(entry); } } @@ -436,7 +436,7 @@ impl Host where Message: Send + Sync + Clone { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; self.reserved_nodes.write().unwrap().insert(n.id.clone()); - if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { + if let Some(ref mut discovery) = *self.discovery.locked() { discovery.add_node(entry); } @@ -454,7 +454,7 @@ impl Host where Message: Send + Sync + Clone { let reserved: HashSet = self.reserved_nodes.read().unwrap().clone(); let mut to_kill = Vec::new(); for e in self.sessions.write().unwrap().iter_mut() { - let mut s = e.lock().unwrap(); + let mut s = e.locked(); { let id = s.id(); if id.is_some() && reserved.contains(id.unwrap()) { @@ -498,7 +498,7 @@ impl Host where Message: Send + Sync + Clone { self.stopping.store(true, AtomicOrdering::Release); let mut to_kill = Vec::new(); for e in self.sessions.write().unwrap().iter_mut() { - let mut s = e.lock().unwrap(); + let mut s = e.locked(); s.disconnect(io, DisconnectReason::ClientQuit); to_kill.push(s.token()); } @@ -555,7 +555,7 @@ impl Host where Message: Send + Sync + Clone { for n in self.nodes.read().unwrap().unordered_entries() { discovery.add_node(n.clone()); } - *self.discovery.lock().unwrap() = Some(discovery); + *self.discovery.locked() = Some(discovery); io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); @@ -571,7 +571,7 @@ impl Host where Message: Send + Sync + Clone { } fn have_session(&self, id: &NodeId) -> bool { - self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id == Some(id.clone())) + self.sessions.read().unwrap().iter().any(|e| e.locked().info.id == Some(id.clone())) } fn session_count(&self) -> usize { @@ -579,7 +579,7 @@ impl Host where Message: Send + Sync + Clone { } fn connecting_to(&self, id: &NodeId) -> bool { - self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().id() == Some(id)) + self.sessions.read().unwrap().iter().any(|e| e.locked().id() == Some(id)) } fn handshake_count(&self) -> usize { @@ -589,7 +589,7 @@ impl Host where Message: Send + Sync + Clone { fn keep_alive(&self, io: &IoContext>) { let mut to_kill = Vec::new(); for e in self.sessions.write().unwrap().iter_mut() { - let mut s = e.lock().unwrap(); + let mut s = e.locked(); if !s.keep_alive(io) { s.disconnect(io, DisconnectReason::PingTimeout); to_kill.push(s.token()); @@ -711,7 +711,7 @@ impl Host where Message: Send + Sync + Clone { fn accept(&self, io: &IoContext>) { trace!(target: "network", "Accepting incoming connection"); loop { - let socket = match self.tcp_listener.lock().unwrap().accept() { + let socket = match self.tcp_listener.locked().accept() { Ok(None) => break, Ok(Some((sock, _addr))) => sock, Err(e) => { @@ -728,7 +728,7 @@ impl Host where Message: Send + Sync + Clone { fn session_writable(&self, token: StreamToken, io: &IoContext>) { let session = { self.sessions.read().unwrap().get(token).cloned() }; if let Some(session) = session { - let mut s = session.lock().unwrap(); + let mut s = session.locked(); if let Err(e) = s.writable(io, &self.info.read().unwrap()) { trace!(target: "network", "Session write error: {}: {:?}", token, e); } @@ -750,7 +750,7 @@ impl Host where Message: Send + Sync + Clone { let mut kill = false; let session = { self.sessions.read().unwrap().get(token).cloned() }; if let Some(session) = session.clone() { - let mut s = session.lock().unwrap(); + let mut s = session.locked(); loop { match s.readable(io, &self.info.read().unwrap()) { Err(e) => { @@ -785,7 +785,7 @@ impl Host where Message: Send + Sync + Clone { if let Ok(address) = s.remote_addr() { let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); - let mut discovery = self.discovery.lock().unwrap(); + let mut discovery = self.discovery.locked(); if let Some(ref mut discovery) = *discovery.deref_mut() { discovery.add_node(entry); } @@ -843,7 +843,7 @@ impl Host where Message: Send + Sync + Clone { let sessions = self.sessions.write().unwrap(); if let Some(session) = sessions.get(token).cloned() { expired_session = Some(session.clone()); - let mut s = session.lock().unwrap(); + let mut s = session.locked(); if !s.expired() { if s.is_ready() { self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); @@ -879,7 +879,7 @@ impl Host where Message: Send + Sync + Clone { { let sessions = self.sessions.write().unwrap(); for c in sessions.iter() { - let s = c.lock().unwrap(); + let s = c.locked(); if let Some(id) = s.id() { if node_changes.removed.contains(id) { to_remove.push(s.token()); @@ -918,7 +918,7 @@ impl IoHandler> for Host where Messa match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), DISCOVERY => { - let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable(io) }; + let node_changes = { self.discovery.locked().as_mut().unwrap().readable(io) }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -935,7 +935,7 @@ impl IoHandler> for Host where Messa match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), DISCOVERY => { - self.discovery.lock().unwrap().as_mut().unwrap().writable(io); + self.discovery.locked().as_mut().unwrap().writable(io); } _ => panic!("Received unknown writable token"), } @@ -951,11 +951,11 @@ impl IoHandler> for Host where Messa warn!("Error initializing public interface: {:?}", e)), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { - self.discovery.lock().unwrap().as_mut().unwrap().refresh(); + self.discovery.locked().as_mut().unwrap().refresh(); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, DISCOVERY_ROUND => { - let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() }; + let node_changes = { self.discovery.locked().as_mut().unwrap().round() }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -1015,7 +1015,7 @@ impl IoHandler> for Host where Messa NetworkIoMessage::Disconnect(ref peer) => { let session = { self.sessions.read().unwrap().get(*peer).cloned() }; if let Some(session) = session { - session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested); + session.locked().disconnect(io, DisconnectReason::DisconnectRequested); } trace!(target: "network", "Disconnect requested {}", peer); self.kill_connection(*peer, io, false); @@ -1023,8 +1023,8 @@ impl IoHandler> for Host where Messa NetworkIoMessage::DisablePeer(ref peer) => { let session = { self.sessions.read().unwrap().get(*peer).cloned() }; if let Some(session) = session { - session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested); - if let Some(id) = session.lock().unwrap().id() { + session.locked().disconnect(io, DisconnectReason::DisconnectRequested); + if let Some(id) = session.locked().id() { self.nodes.write().unwrap().mark_as_useless(id) } } @@ -1046,11 +1046,11 @@ impl IoHandler> for Host where Messa FIRST_SESSION ... LAST_SESSION => { let session = { self.sessions.read().unwrap().get(stream).cloned() }; if let Some(session) = session { - session.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket"); + session.locked().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + DISCOVERY => self.discovery.locked().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), + TCP_ACCEPT => event_loop.register(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -1060,7 +1060,7 @@ impl IoHandler> for Host where Messa FIRST_SESSION ... LAST_SESSION => { let mut connections = self.sessions.write().unwrap(); if let Some(connection) = connections.get(stream).cloned() { - connection.lock().unwrap().deregister_socket(event_loop).expect("Error deregistering socket"); + connection.locked().deregister_socket(event_loop).expect("Error deregistering socket"); connections.remove(stream); } } @@ -1074,11 +1074,11 @@ impl IoHandler> for Host where Messa FIRST_SESSION ... LAST_SESSION => { let connection = { self.sessions.read().unwrap().get(stream).cloned() }; if let Some(connection) = connection { - connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket"); + connection.locked().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + DISCOVERY => self.discovery.locked().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), + TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } } diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index cd3f48d9a..5b6c863cb 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -51,7 +51,7 @@ impl TestProtocol { } pub fn got_packet(&self) -> bool { - self.packet.lock().unwrap().deref()[..] == b"hello"[..] + self.packet.locked().deref()[..] == b"hello"[..] } pub fn got_timeout(&self) -> bool { @@ -70,7 +70,7 @@ impl NetworkProtocolHandler for TestProtocol { fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { assert_eq!(packet_id, 33); - self.packet.lock().unwrap().extend(data); + self.packet.locked().extend(data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { diff --git a/util/src/panics.rs b/util/src/panics.rs index 854e495a3..368e6a9b4 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -20,6 +20,7 @@ use std::thread; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use std::default::Default; +use misc::Lockable; /// Thread-safe closure for handling possible panics pub trait OnPanicListener: Send + Sync + 'static { @@ -88,7 +89,7 @@ impl PanicHandler { /// Notifies all listeners in case there is a panic. /// You should use `catch_panic` instead of calling this method explicitly. pub fn notify_all(&self, r: String) { - let mut listeners = self.listeners.lock().unwrap(); + let mut listeners = self.listeners.locked(); for listener in listeners.deref_mut() { listener.call(&r); } @@ -97,7 +98,7 @@ impl PanicHandler { impl MayPanic for PanicHandler { fn on_panic(&self, closure: F) where F: OnPanicListener { - self.listeners.lock().unwrap().push(Box::new(closure)); + self.listeners.locked().push(Box::new(closure)); } }