From 3b662c285fc4338e5dfcaee5aba57dea64b8767e Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Thu, 7 Jul 2016 09:37:31 +0200 Subject: [PATCH] Switch out .X().unwrap() for .unwrapped_X --- db/src/database.rs | 38 +++---- ethcore/src/account_provider.rs | 12 +- ethcore/src/block_queue.rs | 16 +-- ethcore/src/blockchain/blockchain.rs | 50 ++++---- ethcore/src/client/client.rs | 4 +- ethcore/src/client/test_client.rs | 90 +++++++-------- ethcore/src/db.rs | 8 +- ethcore/src/miner/external.rs | 9 +- ethcore/src/miner/miner.rs | 18 +-- ethcore/src/spec/spec.rs | 10 +- ethcore/src/trace/db.rs | 9 +- parity/hypervisor/mod.rs | 11 +- parity/informant.rs | 16 +-- rpc/src/v1/helpers/signing_queue.rs | 16 +-- rpc/src/v1/tests/helpers/miner_service.rs | 34 +++--- rpc/src/v1/tests/helpers/sync_provider.rs | 4 +- rpc/src/v1/tests/mocked/eth.rs | 22 ++-- rpc/src/v1/tests/mocked/personal.rs | 3 +- sync/src/chain.rs | 2 +- sync/src/lib.rs | 24 ++-- sync/src/tests/chain.rs | 22 ++-- sync/src/tests/helpers.rs | 10 +- util/src/io/service.rs | 11 +- util/src/journaldb/earlymergedb.rs | 7 +- util/src/journaldb/overlayrecentdb.rs | 11 +- util/src/log.rs | 5 +- util/src/misc.rs | 15 +++ util/src/network/host.rs | 132 +++++++++++----------- util/src/network/service.rs | 15 +-- util/src/network/tests.rs | 1 + util/src/panics.rs | 21 ++-- 31 files changed, 335 insertions(+), 311 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index d535f1f56..0ad02b783 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -17,8 +17,8 @@ //! Ethcore rocksdb ipc service use traits::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, -IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; +use misc::RwLockable; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; use std::sync::{RwLock, Arc}; use std::convert::From; use ipc::IpcConfig; @@ -137,8 +137,8 @@ impl Database { } pub fn flush(&self) -> Result<(), Error> { - let mut cache_lock = self.write_cache.write().unwrap(); - let db_lock = self.db.read().unwrap(); + let mut cache_lock = self.write_cache.unwrapped_write(); + let db_lock = self.db.unwrapped_read(); if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().unwrap(); @@ -147,8 +147,8 @@ impl Database { } pub fn flush_all(&self) -> Result<(), Error> { - let mut cache_lock = self.write_cache.write().unwrap(); - let db_lock = self.db.read().unwrap(); + let mut cache_lock = self.write_cache.unwrapped_write(); + let db_lock = self.db.unwrapped_read(); if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().expect("we should have exited with Ok(()) on the previous step"); @@ -167,7 +167,7 @@ impl Drop for Database { #[derive(Ipc)] impl DatabaseService for Database { fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error> { - let mut db = self.db.write().unwrap(); + let mut db = self.db.unwrapped_write(); if db.is_some() { return Err(Error::AlreadyOpen); } let mut opts = Options::new(); @@ -194,7 +194,7 @@ impl DatabaseService for Database { fn close(&self) -> Result<(), Error> { try!(self.flush_all()); - let mut db = self.db.write().unwrap(); + let mut db = self.db.unwrapped_write(); if db.is_none() { return Err(Error::IsClosed); } *db = None; @@ -202,19 +202,19 @@ impl DatabaseService for Database { } fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - let mut cache_lock = self.write_cache.write().unwrap(); + let mut cache_lock = self.write_cache.unwrapped_write(); cache_lock.write(key.to_vec(), value.to_vec()); Ok(()) } fn delete(&self, key: &[u8]) -> Result<(), Error> { - let mut cache_lock = self.write_cache.write().unwrap(); + let mut cache_lock = self.write_cache.unwrapped_write(); cache_lock.remove(key.to_vec()); Ok(()) } fn write(&self, transaction: DBTransaction) -> Result<(), Error> { - let mut cache_lock = self.write_cache.write().unwrap(); + let mut cache_lock = self.write_cache.unwrapped_write(); let mut writes = transaction.writes.borrow_mut(); for kv in writes.drain(..) { @@ -231,13 +231,13 @@ impl DatabaseService for Database { fn get(&self, key: &[u8]) -> Result>, Error> { { let key_vec = key.to_vec(); - let cache_hit = self.write_cache.read().unwrap().get(&key_vec); + let cache_hit = self.write_cache.unwrapped_read().get(&key_vec); if cache_hit.is_some() { return Ok(Some(cache_hit.expect("cache_hit.is_some() = true, still there is none somehow here"))) } } - let db_lock = self.db.read().unwrap(); + let db_lock = self.db.unwrapped_read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); match try!(db.get(key)) { @@ -249,7 +249,7 @@ impl DatabaseService for Database { } fn get_by_prefix(&self, prefix: &[u8]) -> Result>, Error> { - let db_lock = self.db.read().unwrap(); + let db_lock = self.db.unwrapped_read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); let mut iter = db.iterator(IteratorMode::From(prefix, Direction::Forward)); @@ -261,17 +261,17 @@ impl DatabaseService for Database { } fn is_empty(&self) -> Result { - let db_lock = self.db.read().unwrap(); + let db_lock = self.db.unwrapped_read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); Ok(db.iterator(IteratorMode::Start).next().is_none()) } fn iter(&self) -> Result { - let db_lock = self.db.read().unwrap(); + let db_lock = self.db.unwrapped_read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - let mut iterators = self.iterators.write().unwrap(); + let mut iterators = self.iterators.unwrapped_write(); let next_iterator = iterators.keys().last().unwrap_or(&0) + 1; iterators.insert(next_iterator, db.iterator(IteratorMode::Start)); Ok(next_iterator) @@ -279,7 +279,7 @@ impl DatabaseService for Database { fn iter_next(&self, handle: IteratorHandle) -> Option { - let mut iterators = self.iterators.write().unwrap(); + let mut iterators = self.iterators.unwrapped_write(); let mut iterator = match iterators.get_mut(&handle) { Some(some_iterator) => some_iterator, None => { return None; }, @@ -294,7 +294,7 @@ impl DatabaseService for Database { } fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error> { - let mut iterators = self.iterators.write().unwrap(); + let mut iterators = self.iterators.unwrapped_write(); iterators.remove(&handle); Ok(()) } diff --git a/ethcore/src/account_provider.rs b/ethcore/src/account_provider.rs index a71251727..90d44bfba 100644 --- a/ethcore/src/account_provider.rs +++ b/ethcore/src/account_provider.rs @@ -19,7 +19,7 @@ use std::fmt; use std::sync::RwLock; use std::collections::HashMap; -use util::{Address as H160, H256, H520}; +use util::{Address as H160, H256, H520, RwLockable}; use ethstore::{SecretStore, Error as SSError, SafeAccount, EthStore}; use ethstore::dir::{KeyDirectory}; use ethstore::ethkey::{Address as SSAddress, Message as SSMessage, Secret as SSSecret, Random, Generator}; @@ -177,7 +177,7 @@ impl AccountProvider { // check if account is already unlocked pernamently, if it is, do nothing { - let unlocked = self.unlocked.read().unwrap(); + let unlocked = self.unlocked.unwrapped_read(); if let Some(data) = unlocked.get(&account) { if let Unlock::Perm = data.unlock { return Ok(()) @@ -190,7 +190,7 @@ impl AccountProvider { password: password, }; - let mut unlocked = self.unlocked.write().unwrap(); + let mut unlocked = self.unlocked.unwrapped_write(); unlocked.insert(account, data); Ok(()) } @@ -208,7 +208,7 @@ impl AccountProvider { /// Checks if given account is unlocked pub fn is_unlocked(&self, account: A) -> bool where Address: From { let account = Address::from(account).into(); - let unlocked = self.unlocked.read().unwrap(); + let unlocked = self.unlocked.unwrapped_read(); unlocked.get(&account).is_some() } @@ -218,12 +218,12 @@ impl AccountProvider { let message = Message::from(message).into(); let data = { - let unlocked = self.unlocked.read().unwrap(); + let unlocked = self.unlocked.unwrapped_read(); try!(unlocked.get(&account).ok_or(Error::NotUnlocked)).clone() }; if let Unlock::Temp = data.unlock { - let mut unlocked = self.unlocked.write().unwrap(); + let mut unlocked = self.unlocked.unwrapped_write(); unlocked.remove(&account).expect("data exists: so key must exist: qed"); } diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 3ad329af5..c0b712d62 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -285,7 +285,7 @@ impl BlockQueue { unverified.clear(); verifying.clear(); verified.clear(); - self.processing.write().unwrap().clear(); + self.processing.unwrapped_write().clear(); } /// Wait for unverified queue to be empty @@ -298,7 +298,7 @@ impl BlockQueue { /// Check if the block is currently in the queue pub fn block_status(&self, hash: &H256) -> BlockStatus { - if self.processing.read().unwrap().contains(&hash) { + if self.processing.unwrapped_read().contains(&hash) { return BlockStatus::Queued; } if self.verification.bad.locked().contains(&hash) { @@ -312,7 +312,7 @@ impl BlockQueue { let header = BlockView::new(&bytes).header(); let h = header.hash(); { - if self.processing.read().unwrap().contains(&h) { + if self.processing.unwrapped_read().contains(&h) { return Err(ImportError::AlreadyQueued.into()); } @@ -329,7 +329,7 @@ impl BlockQueue { match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { Ok(()) => { - self.processing.write().unwrap().insert(h.clone()); + self.processing.unwrapped_write().insert(h.clone()); self.verification.unverified.locked().push_back(UnverifiedBlock { header: header, bytes: bytes }); self.more_to_verify.notify_all(); Ok(h) @@ -350,7 +350,7 @@ impl BlockQueue { let mut verified_lock = self.verification.verified.locked(); let mut verified = verified_lock.deref_mut(); let mut bad = self.verification.bad.locked(); - let mut processing = self.processing.write().unwrap(); + let mut processing = self.processing.unwrapped_write(); bad.reserve(block_hashes.len()); for hash in block_hashes { bad.insert(hash.clone()); @@ -374,7 +374,7 @@ impl BlockQueue { if block_hashes.is_empty() { return; } - let mut processing = self.processing.write().unwrap(); + let mut processing = self.processing.unwrapped_write(); for hash in block_hashes { processing.remove(&hash); } @@ -421,7 +421,7 @@ impl BlockQueue { + verifying_bytes + verified_bytes // TODO: https://github.com/servo/heapsize/pull/50 - //+ self.processing.read().unwrap().heap_size_of_children(), + //+ self.processing.unwrapped_read().heap_size_of_children(), } } @@ -432,7 +432,7 @@ impl BlockQueue { self.verification.verifying.locked().shrink_to_fit(); self.verification.verified.locked().shrink_to_fit(); } - self.processing.write().unwrap().shrink_to_fit(); + self.processing.unwrapped_write().shrink_to_fit(); } } diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 252c17b34..3ff030ca0 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -170,7 +170,7 @@ impl BlockProvider for BlockChain { /// Get raw block data fn block(&self, hash: &H256) -> Option { { - let read = self.blocks.read().unwrap(); + let read = self.blocks.unwrapped_read(); if let Some(v) = read.get(hash) { return Some(v.clone()); } @@ -184,7 +184,7 @@ impl BlockProvider for BlockChain { match opt { Some(b) => { let bytes: Bytes = b.to_vec(); - let mut write = self.blocks.write().unwrap(); + let mut write = self.blocks.unwrapped_write(); write.insert(hash.clone(), bytes.clone()); Some(bytes) }, @@ -338,7 +338,7 @@ impl BlockChain { }; { - let mut best_block = bc.best_block.write().unwrap(); + let mut best_block = bc.best_block.unwrapped_write(); best_block.number = bc.block_number(&best_block_hash).unwrap(); best_block.total_difficulty = bc.block_details(&best_block_hash).unwrap().total_difficulty; best_block.hash = best_block_hash; @@ -483,25 +483,25 @@ impl BlockChain { self.note_used(CacheID::BlockDetails(hash)); } - let mut write_details = self.block_details.write().unwrap(); + let mut write_details = self.block_details.unwrapped_write(); batch.extend_with_cache(write_details.deref_mut(), update.block_details, CacheUpdatePolicy::Overwrite); } { - let mut write_receipts = self.block_receipts.write().unwrap(); + let mut write_receipts = self.block_receipts.unwrapped_write(); batch.extend_with_cache(write_receipts.deref_mut(), update.block_receipts, CacheUpdatePolicy::Remove); } { - let mut write_blocks_blooms = self.blocks_blooms.write().unwrap(); + let mut write_blocks_blooms = self.blocks_blooms.unwrapped_write(); batch.extend_with_cache(write_blocks_blooms.deref_mut(), update.blocks_blooms, CacheUpdatePolicy::Remove); } // These cached values must be updated last and togeterh { - let mut best_block = self.best_block.write().unwrap(); - let mut write_hashes = self.block_hashes.write().unwrap(); - let mut write_txs = self.transaction_addresses.write().unwrap(); + let mut best_block = self.best_block.unwrapped_write(); + let mut write_hashes = self.block_hashes.unwrapped_write(); + let mut write_txs = self.transaction_addresses.unwrapped_write(); // update best block match update.info.location { @@ -728,33 +728,33 @@ impl BlockChain { /// Get best block hash. pub fn best_block_hash(&self) -> H256 { - self.best_block.read().unwrap().hash.clone() + self.best_block.unwrapped_read().hash.clone() } /// Get best block number. pub fn best_block_number(&self) -> BlockNumber { - self.best_block.read().unwrap().number + self.best_block.unwrapped_read().number } /// Get best block total difficulty. pub fn best_block_total_difficulty(&self) -> U256 { - self.best_block.read().unwrap().total_difficulty + self.best_block.unwrapped_read().total_difficulty } /// Get current cache size. pub fn cache_size(&self) -> CacheSize { CacheSize { - blocks: self.blocks.read().unwrap().heap_size_of_children(), - block_details: self.block_details.read().unwrap().heap_size_of_children(), - transaction_addresses: self.transaction_addresses.read().unwrap().heap_size_of_children(), - blocks_blooms: self.blocks_blooms.read().unwrap().heap_size_of_children(), - block_receipts: self.block_receipts.read().unwrap().heap_size_of_children(), + blocks: self.blocks.unwrapped_read().heap_size_of_children(), + block_details: self.block_details.unwrapped_read().heap_size_of_children(), + transaction_addresses: self.transaction_addresses.unwrapped_read().heap_size_of_children(), + blocks_blooms: self.blocks_blooms.unwrapped_read().heap_size_of_children(), + block_receipts: self.block_receipts.unwrapped_read().heap_size_of_children(), } } /// Let the cache system know that a cacheable item has been used. fn note_used(&self, id: CacheID) { - let mut cache_man = self.cache_man.write().unwrap(); + let mut cache_man = self.cache_man.unwrapped_write(); if !cache_man.cache_usage[0].contains(&id) { cache_man.cache_usage[0].insert(id.clone()); if cache_man.in_use.contains(&id) { @@ -773,13 +773,13 @@ impl BlockChain { for _ in 0..COLLECTION_QUEUE_SIZE { { - let mut blocks = self.blocks.write().unwrap(); - let mut block_details = self.block_details.write().unwrap(); - let mut block_hashes = self.block_hashes.write().unwrap(); - let mut transaction_addresses = self.transaction_addresses.write().unwrap(); - let mut blocks_blooms = self.blocks_blooms.write().unwrap(); - let mut block_receipts = self.block_receipts.write().unwrap(); - let mut cache_man = self.cache_man.write().unwrap(); + let mut blocks = self.blocks.unwrapped_write(); + let mut block_details = self.block_details.unwrapped_write(); + let mut block_hashes = self.block_hashes.unwrapped_write(); + let mut transaction_addresses = self.transaction_addresses.unwrapped_write(); + let mut blocks_blooms = self.blocks_blooms.unwrapped_write(); + let mut block_receipts = self.block_receipts.unwrapped_write(); + let mut cache_man = self.cache_man.unwrapped_write(); for id in cache_man.cache_usage.pop_back().unwrap().into_iter() { cache_man.in_use.remove(&id); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index a2796a09e..34f820f3b 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -334,7 +334,7 @@ impl Client { let route = self.commit_block(closed_block, &header.hash(), &block.bytes); import_results.push(route); - self.report.write().unwrap().accrue_block(&block); + self.report.unwrapped_write().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } @@ -462,7 +462,7 @@ impl Client { /// Get the report. pub fn report(&self) -> ClientReport { - let mut report = self.report.read().unwrap().clone(); + let mut report = self.report.unwrapped_read().clone(); report.state_db_mem = self.state_db.locked().mem_used(); report } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 16adfe7c3..5ae94a1cd 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -108,38 +108,38 @@ impl TestBlockChainClient { miner: Arc::new(Miner::with_spec(Spec::new_test())), }; client.add_blocks(1, EachBlockWith::Nothing); // add genesis block - client.genesis_hash = client.last_hash.read().unwrap().clone(); + client.genesis_hash = client.last_hash.unwrapped_read().clone(); client } /// Set the transaction receipt result pub fn set_transaction_receipt(&self, id: TransactionID, receipt: LocalizedReceipt) { - self.receipts.write().unwrap().insert(id, receipt); + self.receipts.unwrapped_write().insert(id, receipt); } /// Set the execution result. pub fn set_execution_result(&self, result: Executed) { - *self.execution_result.write().unwrap() = Some(result); + *self.execution_result.unwrapped_write() = Some(result); } /// Set the balance of account `address` to `balance`. pub fn set_balance(&self, address: Address, balance: U256) { - self.balances.write().unwrap().insert(address, balance); + self.balances.unwrapped_write().insert(address, balance); } /// Set nonce of account `address` to `nonce`. pub fn set_nonce(&self, address: Address, nonce: U256) { - self.nonces.write().unwrap().insert(address, nonce); + self.nonces.unwrapped_write().insert(address, nonce); } /// Set `code` at `address`. pub fn set_code(&self, address: Address, code: Bytes) { - self.code.write().unwrap().insert(address, code); + self.code.unwrapped_write().insert(address, code); } /// Set storage `position` to `value` for account `address`. pub fn set_storage(&self, address: Address, position: H256, value: H256) { - self.storage.write().unwrap().insert((address, position), value); + self.storage.unwrapped_write().insert((address, position), value); } /// Set block queue size for testing @@ -149,11 +149,11 @@ impl TestBlockChainClient { /// Add blocks to test client. pub fn add_blocks(&self, count: usize, with: EachBlockWith) { - let len = self.numbers.read().unwrap().len(); + let len = self.numbers.unwrapped_read().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); header.difficulty = From::from(n); - header.parent_hash = self.last_hash.read().unwrap().clone(); + header.parent_hash = self.last_hash.unwrapped_read().clone(); header.number = n as BlockNumber; header.gas_limit = U256::from(1_000_000); let uncles = match with { @@ -161,7 +161,7 @@ impl TestBlockChainClient { let mut uncles = RlpStream::new_list(1); let mut uncle_header = BlockHeader::new(); uncle_header.difficulty = From::from(n); - uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); + uncle_header.parent_hash = self.last_hash.unwrapped_read().clone(); uncle_header.number = n as BlockNumber; uncles.append(&uncle_header); header.uncles_hash = uncles.as_raw().sha3(); @@ -174,7 +174,7 @@ impl TestBlockChainClient { let mut txs = RlpStream::new_list(1); let keypair = KeyPair::create().unwrap(); // Update nonces value - self.nonces.write().unwrap().insert(keypair.address(), U256::one()); + self.nonces.unwrapped_write().insert(keypair.address(), U256::one()); let tx = Transaction { action: Action::Create, value: U256::from(100), @@ -207,7 +207,7 @@ impl TestBlockChainClient { rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(&rlp::NULL_RLP, 1); - self.blocks.write().unwrap().insert(hash, rlp.out()); + self.blocks.unwrapped_write().insert(hash, rlp.out()); } /// Make a bad block by setting invalid parent hash. @@ -219,12 +219,12 @@ impl TestBlockChainClient { rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(&rlp::NULL_RLP, 1); - self.blocks.write().unwrap().insert(hash, rlp.out()); + self.blocks.unwrapped_write().insert(hash, rlp.out()); } /// TODO: pub fn block_hash_delta_minus(&mut self, delta: usize) -> H256 { - let blocks_read = self.numbers.read().unwrap(); + let blocks_read = self.numbers.unwrapped_read(); let index = blocks_read.len() - delta; blocks_read[&index].clone() } @@ -232,9 +232,9 @@ impl TestBlockChainClient { fn block_hash(&self, id: BlockID) -> Option { match id { BlockID::Hash(hash) => Some(hash), - BlockID::Number(n) => self.numbers.read().unwrap().get(&(n as usize)).cloned(), - BlockID::Earliest => self.numbers.read().unwrap().get(&0).cloned(), - BlockID::Latest => self.numbers.read().unwrap().get(&(self.numbers.read().unwrap().len() - 1)).cloned() + BlockID::Number(n) => self.numbers.unwrapped_read().get(&(n as usize)).cloned(), + BlockID::Earliest => self.numbers.unwrapped_read().get(&0).cloned(), + BlockID::Latest => self.numbers.unwrapped_read().get(&(self.numbers.unwrapped_read().len() - 1)).cloned() } } } @@ -255,7 +255,7 @@ impl MiningBlockChainClient for TestBlockChainClient { impl BlockChainClient for TestBlockChainClient { fn call(&self, _t: &SignedTransaction, _analytics: CallAnalytics) -> Result { - Ok(self.execution_result.read().unwrap().clone().unwrap()) + Ok(self.execution_result.unwrapped_read().clone().unwrap()) } fn block_total_difficulty(&self, _id: BlockID) -> Option { @@ -268,7 +268,7 @@ impl BlockChainClient for TestBlockChainClient { fn nonce(&self, address: &Address, id: BlockID) -> Option { match id { - BlockID::Latest => Some(self.nonces.read().unwrap().get(address).cloned().unwrap_or_else(U256::zero)), + BlockID::Latest => Some(self.nonces.unwrapped_read().get(address).cloned().unwrap_or_else(U256::zero)), _ => None, } } @@ -278,12 +278,12 @@ impl BlockChainClient for TestBlockChainClient { } fn code(&self, address: &Address) -> Option { - self.code.read().unwrap().get(address).cloned() + self.code.unwrapped_read().get(address).cloned() } fn balance(&self, address: &Address, id: BlockID) -> Option { if let BlockID::Latest = id { - Some(self.balances.read().unwrap().get(address).cloned().unwrap_or_else(U256::zero)) + Some(self.balances.unwrapped_read().get(address).cloned().unwrap_or_else(U256::zero)) } else { None } @@ -295,7 +295,7 @@ impl BlockChainClient for TestBlockChainClient { fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option { if let BlockID::Latest = id { - Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new)) + Some(self.storage.unwrapped_read().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new)) } else { None } @@ -310,7 +310,7 @@ impl BlockChainClient for TestBlockChainClient { } fn transaction_receipt(&self, id: TransactionID) -> Option { - self.receipts.read().unwrap().get(&id).cloned() + self.receipts.unwrapped_read().get(&id).cloned() } fn blocks_with_bloom(&self, _bloom: &H2048, _from_block: BlockID, _to_block: BlockID) -> Option> { @@ -326,11 +326,11 @@ impl BlockChainClient for TestBlockChainClient { } fn block_header(&self, id: BlockID) -> Option { - self.block_hash(id).and_then(|hash| self.blocks.read().unwrap().get(&hash).map(|r| Rlp::new(r).at(0).as_raw().to_vec())) + self.block_hash(id).and_then(|hash| self.blocks.unwrapped_read().get(&hash).map(|r| Rlp::new(r).at(0).as_raw().to_vec())) } fn block_body(&self, id: BlockID) -> Option { - self.block_hash(id).and_then(|hash| self.blocks.read().unwrap().get(&hash).map(|r| { + self.block_hash(id).and_then(|hash| self.blocks.unwrapped_read().get(&hash).map(|r| { let mut stream = RlpStream::new_list(2); stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); @@ -339,13 +339,13 @@ impl BlockChainClient for TestBlockChainClient { } fn block(&self, id: BlockID) -> Option { - self.block_hash(id).and_then(|hash| self.blocks.read().unwrap().get(&hash).cloned()) + self.block_hash(id).and_then(|hash| self.blocks.unwrapped_read().get(&hash).cloned()) } fn block_status(&self, id: BlockID) -> BlockStatus { match id { - BlockID::Number(number) if (number as usize) < self.blocks.read().unwrap().len() => BlockStatus::InChain, - BlockID::Hash(ref hash) if self.blocks.read().unwrap().get(hash).is_some() => BlockStatus::InChain, + BlockID::Number(number) if (number as usize) < self.blocks.unwrapped_read().len() => BlockStatus::InChain, + BlockID::Hash(ref hash) if self.blocks.unwrapped_read().get(hash).is_some() => BlockStatus::InChain, _ => BlockStatus::Unknown } } @@ -356,7 +356,7 @@ impl BlockChainClient for TestBlockChainClient { ancestor: H256::new(), index: 0, blocks: { - let numbers_read = self.numbers.read().unwrap(); + let numbers_read = self.numbers.unwrapped_read(); let mut adding = false; let mut blocks = Vec::new(); @@ -413,11 +413,11 @@ impl BlockChainClient for TestBlockChainClient { let header = Rlp::new(&b).val_at::(0); let h = header.hash(); let number: usize = header.number as usize; - if number > self.blocks.read().unwrap().len() { - panic!("Unexpected block number. Expected {}, got {}", self.blocks.read().unwrap().len(), number); + if number > self.blocks.unwrapped_read().len() { + panic!("Unexpected block number. Expected {}, got {}", self.blocks.unwrapped_read().len(), number); } if number > 0 { - match self.blocks.read().unwrap().get(&header.parent_hash) { + match self.blocks.unwrapped_read().get(&header.parent_hash) { Some(parent) => { let parent = Rlp::new(parent).val_at::(0); if parent.number != (header.number - 1) { @@ -429,27 +429,27 @@ impl BlockChainClient for TestBlockChainClient { } } } - let len = self.numbers.read().unwrap().len(); + let len = self.numbers.unwrapped_read().len(); if number == len { { - let mut difficulty = self.difficulty.write().unwrap(); + let mut difficulty = self.difficulty.unwrapped_write(); *difficulty.deref_mut() = *difficulty.deref() + header.difficulty; } - mem::replace(self.last_hash.write().unwrap().deref_mut(), h.clone()); - self.blocks.write().unwrap().insert(h.clone(), b); - self.numbers.write().unwrap().insert(number, h.clone()); + mem::replace(self.last_hash.unwrapped_write().deref_mut(), h.clone()); + self.blocks.unwrapped_write().insert(h.clone(), b); + self.numbers.unwrapped_write().insert(number, h.clone()); let mut parent_hash = header.parent_hash; if number > 0 { let mut n = number - 1; - while n > 0 && self.numbers.read().unwrap()[&n] != parent_hash { - *self.numbers.write().unwrap().get_mut(&n).unwrap() = parent_hash.clone(); + while n > 0 && self.numbers.unwrapped_read()[&n] != parent_hash { + *self.numbers.unwrapped_write().get_mut(&n).unwrap() = parent_hash.clone(); n -= 1; - parent_hash = Rlp::new(&self.blocks.read().unwrap()[&parent_hash]).val_at::(0).parent_hash; + parent_hash = Rlp::new(&self.blocks.unwrapped_read()[&parent_hash]).val_at::(0).parent_hash; } } } else { - self.blocks.write().unwrap().insert(h.clone(), b.to_vec()); + self.blocks.unwrapped_write().insert(h.clone(), b.to_vec()); } Ok(h) } @@ -470,11 +470,11 @@ impl BlockChainClient for TestBlockChainClient { fn chain_info(&self) -> BlockChainInfo { BlockChainInfo { - total_difficulty: *self.difficulty.read().unwrap(), - pending_total_difficulty: *self.difficulty.read().unwrap(), + total_difficulty: *self.difficulty.unwrapped_read(), + pending_total_difficulty: *self.difficulty.unwrapped_read(), genesis_hash: self.genesis_hash.clone(), - best_block_hash: self.last_hash.read().unwrap().clone(), - best_block_number: self.blocks.read().unwrap().len() as BlockNumber - 1, + best_block_hash: self.last_hash.unwrapped_read().clone(), + best_block_number: self.blocks.unwrapped_read().len() as BlockNumber - 1, } } diff --git a/ethcore/src/db.rs b/ethcore/src/db.rs index d38ce3041..576aaef5b 100644 --- a/ethcore/src/db.rs +++ b/ethcore/src/db.rs @@ -20,7 +20,7 @@ use std::ops::Deref; use std::hash::Hash; use std::sync::RwLock; use std::collections::HashMap; -use util::{DBTransaction, Database}; +use util::{DBTransaction, Database, RwLockable}; use util::rlp::{encode, Encodable, decode, Decodable}; #[derive(Clone, Copy)] @@ -115,14 +115,14 @@ pub trait Readable { T: Clone + Decodable, C: Cache { { - let read = cache.read().unwrap(); + let read = cache.unwrapped_read(); if let Some(v) = read.get(key) { return Some(v.clone()); } } self.read(key).map(|value: T|{ - let mut write = cache.write().unwrap(); + let mut write = cache.unwrapped_write(); write.insert(key.clone(), value.clone()); value }) @@ -137,7 +137,7 @@ pub trait Readable { R: Deref, C: Cache { { - let read = cache.read().unwrap(); + let read = cache.unwrapped_read(); if read.get(key).is_some() { return true; } diff --git a/ethcore/src/miner/external.rs b/ethcore/src/miner/external.rs index ece91877d..650df228e 100644 --- a/ethcore/src/miner/external.rs +++ b/ethcore/src/miner/external.rs @@ -16,8 +16,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use util::numbers::U256; -use util::hash::H256; +use util::{RwLockable, U256, H256}; /// External miner interface. pub trait ExternalMinerService: Send + Sync { @@ -55,15 +54,15 @@ impl ExternalMiner { impl ExternalMinerService for ExternalMiner { fn submit_hashrate(&self, hashrate: U256, id: H256) { - self.hashrates.write().unwrap().insert(id, hashrate); + self.hashrates.unwrapped_write().insert(id, hashrate); } fn hashrate(&self) -> U256 { - self.hashrates.read().unwrap().iter().fold(U256::from(0), |sum, (_, v)| sum + *v) + self.hashrates.unwrapped_read().iter().fold(U256::from(0), |sum, (_, v)| sum + *v) } fn is_mining(&self) -> bool { - !self.hashrates.read().unwrap().is_empty() + !self.hashrates.unwrapped_read().is_empty() } } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index c9a20e7f1..becd872e5 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -425,20 +425,20 @@ impl MinerService for Miner { } fn set_author(&self, author: Address) { - *self.author.write().unwrap() = author; + *self.author.unwrapped_write() = author; } fn set_extra_data(&self, extra_data: Bytes) { - *self.extra_data.write().unwrap() = extra_data; + *self.extra_data.unwrapped_write() = extra_data; } /// Set the gas limit we wish to target when sealing a new block. fn set_gas_floor_target(&self, target: U256) { - self.gas_range_target.write().unwrap().0 = target; + self.gas_range_target.unwrapped_write().0 = target; } fn set_gas_ceil_target(&self, target: U256) { - self.gas_range_target.write().unwrap().1 = target; + self.gas_range_target.unwrapped_write().1 = target; } fn set_minimal_gas_price(&self, min_gas_price: U256) { @@ -455,7 +455,7 @@ impl MinerService for Miner { } fn sensible_gas_limit(&self) -> U256 { - self.gas_range_target.read().unwrap().0 / 5.into() + self.gas_range_target.unwrapped_read().0 / 5.into() } fn transactions_limit(&self) -> usize { @@ -472,22 +472,22 @@ impl MinerService for Miner { /// Get the author that we will seal blocks as. fn author(&self) -> Address { - *self.author.read().unwrap() + *self.author.unwrapped_read() } /// Get the extra_data that we will seal blocks with. fn extra_data(&self) -> Bytes { - self.extra_data.read().unwrap().clone() + self.extra_data.unwrapped_read().clone() } /// Get the gas limit we wish to target when sealing a new block. fn gas_floor_target(&self) -> U256 { - self.gas_range_target.read().unwrap().0 + self.gas_range_target.unwrapped_read().0 } /// Get the gas limit we wish to target when sealing a new block. fn gas_ceil_target(&self) -> U256 { - self.gas_range_target.read().unwrap().1 + self.gas_range_target.unwrapped_read().1 } fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec) -> diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index 02b311b24..5c2a7d8d7 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -136,10 +136,10 @@ impl Spec { /// Return the state root for the genesis state, memoising accordingly. pub fn state_root(&self) -> H256 { - if self.state_root_memo.read().unwrap().is_none() { - *self.state_root_memo.write().unwrap() = Some(self.genesis_state.root()); + if self.state_root_memo.unwrapped_read().is_none() { + *self.state_root_memo.unwrapped_write() = Some(self.genesis_state.root()); } - self.state_root_memo.read().unwrap().as_ref().unwrap().clone() + self.state_root_memo.unwrapped_read().as_ref().unwrap().clone() } /// Get the known knodes of the network in enode format. @@ -209,12 +209,12 @@ impl Spec { /// Alter the value of the genesis state. pub fn set_genesis_state(&mut self, s: PodState) { self.genesis_state = s; - *self.state_root_memo.write().unwrap() = None; + *self.state_root_memo.unwrapped_write() = None; } /// Returns `false` if the memoized state root is invalid. `true` otherwise. pub fn is_state_root_valid(&self) -> bool { - self.state_root_memo.read().unwrap().clone().map_or(true, |sr| sr == self.genesis_state.root()) + self.state_root_memo.unwrapped_read().clone().map_or(true, |sr| sr == self.genesis_state.root()) } /// Ensure that the given state DB has the trie nodes in for the genesis state. diff --git a/ethcore/src/trace/db.rs b/ethcore/src/trace/db.rs index 07f5f9c27..a49d28924 100644 --- a/ethcore/src/trace/db.rs +++ b/ethcore/src/trace/db.rs @@ -22,10 +22,9 @@ use std::sync::{RwLock, Arc}; use std::path::Path; use bloomchain::{Number, Config as BloomConfig}; use bloomchain::group::{BloomGroupDatabase, BloomGroupChain, GroupPosition, BloomGroup}; -use util::{H256, H264, Database, DatabaseConfig, DBTransaction}; +use util::{H256, H264, Database, DatabaseConfig, DBTransaction, RwLockable}; use header::BlockNumber; -use trace::{BlockTraces, LocalizedTrace, Config, Switch, Filter, Database as TraceDatabase, ImportRequest, -DatabaseExtras, Error}; +use trace::{BlockTraces, LocalizedTrace, Config, Switch, Filter, Database as TraceDatabase, ImportRequest, DatabaseExtras, Error}; use db::{Key, Writable, Readable, CacheUpdatePolicy}; use blooms; use super::flat::{FlatTrace, FlatBlockTraces, FlatTransactionTraces}; @@ -232,7 +231,7 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { // at first, let's insert new block traces { - let mut traces = self.traces.write().unwrap(); + let mut traces = self.traces.unwrapped_write(); // it's important to use overwrite here, // cause this value might be queried by hash later batch.write_with_cache(traces.deref_mut(), request.block_hash, request.traces, CacheUpdatePolicy::Overwrite); @@ -260,7 +259,7 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { .map(|p| (From::from(p.0), From::from(p.1))) .collect::>(); - let mut blooms = self.blooms.write().unwrap(); + let mut blooms = self.blooms.unwrapped_write(); batch.extend_with_cache(blooms.deref_mut(), blooms_to_insert, CacheUpdatePolicy::Remove); } diff --git a/parity/hypervisor/mod.rs b/parity/hypervisor/mod.rs index fbd807d94..501594d1f 100644 --- a/parity/hypervisor/mod.rs +++ b/parity/hypervisor/mod.rs @@ -20,19 +20,18 @@ #![allow(dead_code)] #![cfg_attr(feature="dev", allow(used_underscore_binding))] -pub mod service; - -/// Default value for hypervisor ipc listener -pub const HYPERVISOR_IPC_URL: &'static str = "ipc:///tmp/parity-internal-hyper-status.ipc"; - use nanoipc; use std::sync::{Arc,RwLock}; use hypervisor::service::*; use std::process::{Command,Child}; use std::collections::HashMap; -type BinaryId = &'static str; +pub mod service; +/// Default value for hypervisor ipc listener +pub const HYPERVISOR_IPC_URL: &'static str = "ipc:///tmp/parity-internal-hyper-status.ipc"; + +type BinaryId = &'static str; const BLOCKCHAIN_DB_BINARY: BinaryId = "blockchain"; pub struct Hypervisor { diff --git a/parity/informant.rs b/parity/informant.rs index 0eaebe10f..f0709458f 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -22,7 +22,7 @@ use std::time::{Instant, Duration}; use std::sync::RwLock; use std::ops::{Deref, DerefMut}; use ethsync::{EthSync, SyncProvider}; -use util::Uint; +use util::{Uint, RwLockable}; use ethcore::client::*; use number_prefix::{binary_prefix, Standalone, Prefixed}; @@ -77,18 +77,18 @@ impl Informant { #[cfg_attr(feature="dev", allow(match_bool))] pub fn tick(&self, client: &Client, maybe_sync: Option<&EthSync>) { - let elapsed = self.last_tick.read().unwrap().elapsed(); + let elapsed = self.last_tick.unwrapped_read().elapsed(); if elapsed < Duration::from_secs(5) { return; } - *self.last_tick.write().unwrap() = Instant::now(); + *self.last_tick.unwrapped_write() = Instant::now(); let chain_info = client.chain_info(); let queue_info = client.queue_info(); let cache_info = client.blockchain_cache_info(); - let mut write_report = self.report.write().unwrap(); + let mut write_report = self.report.unwrapped_write(); let report = client.report(); let paint = |c: Style, t: String| match self.with_color { @@ -97,8 +97,8 @@ impl Informant { }; if let (_, _, &Some(ref last_report)) = ( - self.chain_info.read().unwrap().deref(), - self.cache_info.read().unwrap().deref(), + self.chain_info.unwrapped_read().deref(), + self.cache_info.unwrapped_read().deref(), write_report.deref() ) { println!("{} {} {} blk/s {} tx/s {} Mgas/s {}{}+{} Qed {} db {} chain {} queue{}", @@ -137,8 +137,8 @@ impl Informant { ); } - *self.chain_info.write().unwrap().deref_mut() = Some(chain_info); - *self.cache_info.write().unwrap().deref_mut() = Some(cache_info); + *self.chain_info.unwrapped_write().deref_mut() = Some(chain_info); + *self.cache_info.unwrapped_write().deref_mut() = Some(cache_info); *write_report.deref_mut() = Some(report); } } diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index c21a056d8..d41114f96 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -19,7 +19,7 @@ use std::time::{Instant, Duration}; use std::sync::{mpsc, Mutex, RwLock, Arc}; use std::collections::HashMap; use jsonrpc_core; -use util::{U256, Lockable}; +use util::{U256, Lockable, RwLockable}; use v1::helpers::{TransactionRequest, TransactionConfirmation}; /// Result that can be returned from JSON RPC. @@ -214,7 +214,7 @@ impl ConfirmationsQueue { /// Removes transaction from this queue and notifies `ConfirmationPromise` holders about the result. /// Notifies also a receiver about that event. fn remove(&self, id: U256, result: Option) -> Option { - let token = self.queue.write().unwrap().remove(&id); + let token = self.queue.unwrapped_write().remove(&id); if let Some(token) = token { // notify receiver about the event @@ -247,7 +247,7 @@ impl SigningQueue for ConfirmationsQueue { }; // Add request to queue let res = { - let mut queue = self.queue.write().unwrap(); + let mut queue = self.queue.unwrapped_write(); queue.insert(id, ConfirmationToken { result: Arc::new(Mutex::new(ConfirmationResult::Waiting)), handle: thread::current(), @@ -266,7 +266,7 @@ impl SigningQueue for ConfirmationsQueue { } fn peek(&self, id: &U256) -> Option { - self.queue.read().unwrap().get(id).map(|token| token.request.clone()) + self.queue.unwrapped_read().get(id).map(|token| token.request.clone()) } fn request_rejected(&self, id: U256) -> Option { @@ -280,17 +280,17 @@ impl SigningQueue for ConfirmationsQueue { } fn requests(&self) -> Vec { - let queue = self.queue.read().unwrap(); + let queue = self.queue.unwrapped_read(); queue.values().map(|token| token.request.clone()).collect() } fn len(&self) -> usize { - let queue = self.queue.read().unwrap(); + let queue = self.queue.unwrapped_read(); queue.len() } fn is_empty(&self) -> bool { - let queue = self.queue.read().unwrap(); + let queue = self.queue.unwrapped_read(); queue.is_empty() } } @@ -301,7 +301,7 @@ mod test { use std::time::Duration; use std::thread; use std::sync::{Arc, Mutex}; - use util::{Address, U256, H256}; + use util::{Address, U256, H256, Lockable}; use v1::helpers::{SigningQueue, ConfirmationsQueue, QueueEvent, TransactionRequest}; use v1::types::H256 as NH256; use jsonrpc_core::to_value; diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 8c050b118..b0a4f1115 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -16,7 +16,7 @@ //! Test implementation of miner service. -use util::{Address, H256, Bytes, U256, FixedHash, Uint, Lockable}; +use util::{Address, H256, Bytes, U256, FixedHash, Uint, Lockable, RwLockable}; use util::standard::*; use ethcore::error::{Error, ExecutionError}; use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics}; @@ -76,57 +76,57 @@ impl MinerService for TestMinerService { } fn set_author(&self, author: Address) { - *self.author.write().unwrap() = author; + *self.author.unwrapped_write() = author; } fn set_extra_data(&self, extra_data: Bytes) { - *self.extra_data.write().unwrap() = extra_data; + *self.extra_data.unwrapped_write() = extra_data; } /// Set the lower gas limit we wish to target when sealing a new block. fn set_gas_floor_target(&self, target: U256) { - self.gas_range_target.write().unwrap().0 = target; + self.gas_range_target.unwrapped_write().0 = target; } /// Set the upper gas limit we wish to target when sealing a new block. fn set_gas_ceil_target(&self, target: U256) { - self.gas_range_target.write().unwrap().1 = target; + self.gas_range_target.unwrapped_write().1 = target; } fn set_minimal_gas_price(&self, min_gas_price: U256) { - *self.min_gas_price.write().unwrap() = min_gas_price; + *self.min_gas_price.unwrapped_write() = min_gas_price; } fn set_transactions_limit(&self, limit: usize) { - *self.limit.write().unwrap() = limit; + *self.limit.unwrapped_write() = limit; } fn set_tx_gas_limit(&self, limit: U256) { - *self.tx_gas_limit.write().unwrap() = limit; + *self.tx_gas_limit.unwrapped_write() = limit; } fn transactions_limit(&self) -> usize { - *self.limit.read().unwrap() + *self.limit.unwrapped_read() } fn author(&self) -> Address { - *self.author.read().unwrap() + *self.author.unwrapped_read() } fn minimal_gas_price(&self) -> U256 { - *self.min_gas_price.read().unwrap() + *self.min_gas_price.unwrapped_read() } fn extra_data(&self) -> Bytes { - self.extra_data.read().unwrap().clone() + self.extra_data.unwrapped_read().clone() } fn gas_floor_target(&self) -> U256 { - self.gas_range_target.read().unwrap().0 + self.gas_range_target.unwrapped_read().0 } fn gas_ceil_target(&self) -> U256 { - self.gas_range_target.read().unwrap().1 + self.gas_range_target.unwrapped_read().1 } /// Imports transactions to transaction queue. @@ -137,7 +137,7 @@ impl MinerService for TestMinerService { for sender in transactions.iter().filter_map(|t| t.sender().ok()) { let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests"); - self.last_nonces.write().unwrap().insert(sender, nonce + U256::from(1)); + self.last_nonces.unwrapped_write().insert(sender, nonce + U256::from(1)); } transactions .iter() @@ -152,7 +152,7 @@ impl MinerService for TestMinerService { // keep the pending nonces up to date if let Ok(ref sender) = transaction.sender() { let nonce = self.last_nonce(sender).unwrap_or(chain.latest_nonce(sender)); - self.last_nonces.write().unwrap().insert(sender.clone(), nonce + U256::from(1)); + self.last_nonces.unwrapped_write().insert(sender.clone(), nonce + U256::from(1)); } // lets assume that all txs are valid @@ -202,7 +202,7 @@ impl MinerService for TestMinerService { } fn last_nonce(&self, address: &Address) -> Option { - self.last_nonces.read().unwrap().get(address).cloned() + self.last_nonces.unwrapped_read().get(address).cloned() } /// Submit `seal` as a valid solution for the header of `pow_hash`. diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 11fa1cb9e..d91c2a98c 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,7 +16,7 @@ //! Test implementation of SyncProvider. -use util::U256; +use util::{U256, RwLockable}; use ethsync::{SyncProvider, SyncStatus, SyncState}; use std::sync::RwLock; @@ -57,7 +57,7 @@ impl TestSyncProvider { impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { - self.status.read().unwrap().clone() + self.status.unwrapped_read().clone() } fn start_network(&self) { diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index d68e4b8b4..9e0067c8c 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -18,7 +18,7 @@ use std::str::FromStr; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use jsonrpc_core::IoHandler; -use util::Lockable; +use util::{Lockable, RwLockable}; use util::hash::{Address, H256, FixedHash}; use util::numbers::{Uint, U256}; use ethcore::account_provider::AccountProvider; @@ -104,13 +104,13 @@ fn rpc_eth_syncing() { assert_eq!(tester.io.handle_request(request), Some(false_res.to_owned())); { - let mut status = tester.sync.status.write().unwrap(); + let mut status = tester.sync.status.unwrapped_write(); status.state = SyncState::Blocks; status.highest_block_number = Some(2500); // "sync" to 1000 blocks. // causes TestBlockChainClient to return 1000 for its best block number. - let mut blocks = tester.client.blocks.write().unwrap(); + let mut blocks = tester.client.blocks.unwrapped_write(); for i in 0..1000 { blocks.insert(H256::from(i), Vec::new()); } @@ -121,7 +121,7 @@ fn rpc_eth_syncing() { { // finish "syncing" - let mut blocks = tester.client.blocks.write().unwrap(); + let mut blocks = tester.client.blocks.unwrapped_write(); for i in 0..1500 { blocks.insert(H256::from(i + 1000), Vec::new()); } @@ -133,9 +133,9 @@ fn rpc_eth_syncing() { #[test] fn rpc_eth_hashrate() { let tester = EthTester::default(); - tester.hashrates.write().unwrap().insert(H256::from(0), U256::from(0xfffa)); - tester.hashrates.write().unwrap().insert(H256::from(0), U256::from(0xfffb)); - tester.hashrates.write().unwrap().insert(H256::from(1), U256::from(0x1)); + tester.hashrates.unwrapped_write().insert(H256::from(0), U256::from(0xfffa)); + tester.hashrates.unwrapped_write().insert(H256::from(0), U256::from(0xfffb)); + tester.hashrates.unwrapped_write().insert(H256::from(1), U256::from(0x1)); let request = r#"{"jsonrpc": "2.0", "method": "eth_hashrate", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0xfffc","id":1}"#; @@ -158,7 +158,7 @@ fn rpc_eth_submit_hashrate() { let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; assert_eq!(tester.io.handle_request(request), Some(response.to_owned())); - assert_eq!(tester.hashrates.read().unwrap().get(&H256::from("0x59daa26581d0acd1fce254fb7e85952f4c09d0915afd33d3886cd914bc7d283c")).cloned(), + assert_eq!(tester.hashrates.unwrapped_read().get(&H256::from("0x59daa26581d0acd1fce254fb7e85952f4c09d0915afd33d3886cd914bc7d283c")).cloned(), Some(U256::from(0x500_000))); } @@ -215,7 +215,7 @@ fn rpc_eth_mining() { let response = r#"{"jsonrpc":"2.0","result":false,"id":1}"#; assert_eq!(tester.io.handle_request(request), Some(response.to_owned())); - tester.hashrates.write().unwrap().insert(H256::from(1), U256::from(0x1)); + tester.hashrates.unwrapped_write().insert(H256::from(1), U256::from(0x1)); let request = r#"{"jsonrpc": "2.0", "method": "eth_mining", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -591,7 +591,7 @@ fn rpc_eth_send_transaction() { assert_eq!(tester.io.handle_request(&request), Some(response)); - tester.miner.last_nonces.write().unwrap().insert(address.clone(), U256::zero()); + tester.miner.last_nonces.unwrapped_write().insert(address.clone(), U256::zero()); let t = Transaction { nonce: U256::one(), @@ -749,7 +749,7 @@ fn returns_error_if_can_mine_and_no_closed_block() { use ethsync::{SyncState}; let eth_tester = EthTester::default(); - eth_tester.sync.status.write().unwrap().state = SyncState::Idle; + eth_tester.sync.status.unwrapped_write().state = SyncState::Idle; let request = r#"{"jsonrpc": "2.0", "method": "eth_getWork", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","error":{"code":-32603,"message":"Internal error","data":null},"id":1}"#; diff --git a/rpc/src/v1/tests/mocked/personal.rs b/rpc/src/v1/tests/mocked/personal.rs index cca58c1d7..6cd3ae583 100644 --- a/rpc/src/v1/tests/mocked/personal.rs +++ b/rpc/src/v1/tests/mocked/personal.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::str::FromStr; use jsonrpc_core::IoHandler; use util::numbers::*; +use util::RwLockable; use ethcore::account_provider::AccountProvider; use v1::{PersonalClient, Personal}; use v1::tests::helpers::TestMinerService; @@ -174,7 +175,7 @@ fn sign_and_send_transaction() { assert_eq!(tester.io.handle_request(request.as_ref()), Some(response)); - tester.miner.last_nonces.write().unwrap().insert(address.clone(), U256::zero()); + tester.miner.last_nonces.unwrapped_write().insert(address.clone(), U256::zero()); let t = Transaction { nonce: U256::one(), diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6845c32e3..c96ba8b34 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1142,7 +1142,7 @@ impl ChainSync { |e| format!("Error sending nodes: {:?}", e)), _ => { - sync.write().unwrap().on_packet(io, peer, packet_id, data); + sync.unwrapped_write().on_packet(io, peer, packet_id, data); Ok(()) } }; diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 93e4f4382..dfb11ee0d 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -71,7 +71,7 @@ extern crate heapsize; use std::ops::*; use std::sync::*; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; -use util::{TimerToken, U256}; +use util::{TimerToken, U256, RwLockable}; use ethcore::client::Client; use ethcore::service::{SyncMessage, NetSyncMessage}; use io::NetSyncIo; @@ -143,28 +143,28 @@ impl EthSync { /// Stop sync pub fn stop(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.unwrapped_write().abort(&mut NetSyncIo::new(io, self.chain.deref())); } /// Restart sync pub fn restart(&mut self, io: &mut NetworkContext) { - self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.unwrapped_write().restart(&mut NetSyncIo::new(io, self.chain.deref())); } } impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> SyncStatus { - self.sync.read().unwrap().status() + self.sync.unwrapped_read().status() } fn start_network(&self) { - self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)) + self.io_channel.unwrapped_read().send(NetworkIoMessage::User(SyncMessage::StartNetwork)) .unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); } fn stop_network(&self) { - self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)) + self.io_channel.unwrapped_read().send(NetworkIoMessage::User(SyncMessage::StopNetwork)) .unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); } } @@ -172,7 +172,7 @@ impl SyncProvider for EthSync { impl NetworkProtocolHandler for EthSync { fn initialize(&self, io: &NetworkContext) { io.register_timer(0, 1000).expect("Error registering sync timer"); - *self.io_channel.write().unwrap() = io.io_channel(); + *self.io_channel.unwrapped_write() = io.io_channel(); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -180,16 +180,16 @@ impl NetworkProtocolHandler for EthSync { } fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + self.sync.unwrapped_write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + self.sync.unwrapped_write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.sync.write().unwrap().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); - self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.unwrapped_write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.unwrapped_write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); } #[cfg_attr(feature="dev", allow(single_match))] @@ -197,7 +197,7 @@ impl NetworkProtocolHandler for EthSync { match *message { SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted, ref sealed } => { let mut sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, sealed); + self.sync.unwrapped_write().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, sealed); }, _ => {/* Ignore other messages */}, } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 2f2bde171..e5a8b3fb9 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -27,7 +27,7 @@ fn two_peers() { net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); assert!(net.peer(0).chain.block(BlockID::Number(1000)).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); + assert_eq!(net.peer(0).chain.blocks.unwrapped_read().deref(), net.peer(1).chain.blocks.unwrapped_read().deref()); } #[test] @@ -37,7 +37,7 @@ fn long_chain() { net.peer_mut(1).chain.add_blocks(50000, EachBlockWith::Nothing); net.sync(); assert!(net.peer(0).chain.block(BlockID::Number(50000)).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); + assert_eq!(net.peer(0).chain.blocks.unwrapped_read().deref(), net.peer(1).chain.blocks.unwrapped_read().deref()); } #[test] @@ -47,7 +47,7 @@ fn status_after_sync() { net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); - let status = net.peer(0).sync.read().unwrap().status(); + let status = net.peer(0).sync.unwrapped_read().status(); assert_eq!(status.state, SyncState::Idle); } @@ -71,7 +71,7 @@ fn empty_blocks() { } net.sync(); assert!(net.peer(0).chain.block(BlockID::Number(1000)).is_some()); - assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); + assert_eq!(net.peer(0).chain.blocks.unwrapped_read().deref(), net.peer(1).chain.blocks.unwrapped_read().deref()); } #[test] @@ -87,12 +87,12 @@ fn forked() { net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2 net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); // peer 1 has the best chain of 601 blocks - let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); + let peer1_chain = net.peer(1).chain.numbers.unwrapped_read().clone(); net.sync(); - assert_eq!(net.peer(0).chain.difficulty.read().unwrap().deref(), net.peer(1).chain.difficulty.read().unwrap().deref()); - assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain); - assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); - assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); + assert_eq!(net.peer(0).chain.difficulty.unwrapped_read().deref(), net.peer(1).chain.difficulty.unwrapped_read().deref()); + assert_eq!(net.peer(0).chain.numbers.unwrapped_read().deref(), &peer1_chain); + assert_eq!(net.peer(1).chain.numbers.unwrapped_read().deref(), &peer1_chain); + assert_eq!(net.peer(2).chain.numbers.unwrapped_read().deref(), &peer1_chain); } #[test] @@ -107,14 +107,14 @@ fn restart() { assert!(net.peer(0).chain.chain_info().best_block_number > 100); net.restart_peer(0); - let status = net.peer(0).sync.read().unwrap().status(); + let status = net.peer(0).sync.unwrapped_read().status(); assert_eq!(status.state, SyncState::ChainHead); } #[test] fn status_empty() { let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.read().unwrap().status().state, SyncState::Idle); + assert_eq!(net.peer(0).sync.unwrapped_read().status().state, SyncState::Idle); } #[test] diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 9a9afca49..f3a5c6c4c 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -118,7 +118,7 @@ impl TestNet { for client in 0..self.peers.len() { if peer != client { let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.write().unwrap().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); + p.sync.unwrapped_write().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); } } } @@ -133,18 +133,18 @@ impl TestNet { trace!("----------------"); } let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + p.sync.unwrapped_write().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); } } pub fn sync_step_peer(&mut self, peer_num: usize) { let mut peer = self.peer_mut(peer_num); - peer.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.unwrapped_write().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn restart_peer(&mut self, i: usize) { let peer = self.peer_mut(i); - peer.sync.write().unwrap().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.unwrapped_write().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn sync(&mut self) -> u32 { @@ -173,6 +173,6 @@ impl TestNet { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]); + peer.sync.unwrapped_write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]); } } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 1e2169808..7f80d50d7 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -21,6 +21,7 @@ use mio::*; use crossbeam::sync::chase_lev; use slab::Slab; use error::*; +use misc::*; use io::{IoError, IoHandler}; use io::worker::{Worker, Work, WorkType}; use panics::*; @@ -227,7 +228,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; if let Some(handler) = self.handlers.get(handler_index) { - if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) { + if let Some(timer) = self.timers.unwrapped_read().get(&token.as_usize()) { event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); self.work_ready.notify_all(); @@ -249,7 +250,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync // TODO: flush event loop self.handlers.remove(handler_id); // unregister timers - let mut timers = self.timers.write().unwrap(); + let mut timers = self.timers.unwrapped_write(); let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect(); for timer_id in to_remove { let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed"); @@ -259,11 +260,11 @@ impl Handler for IoManager where Message: Send + Clone + Sync IoMessage::AddTimer { handler_id, token, delay } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer"); - self.timers.write().unwrap().insert(timer_id, UserTimer { delay: delay, timeout: timeout }); + self.timers.unwrapped_write().insert(timer_id, UserTimer { delay: delay, timeout: timeout }); }, IoMessage::RemoveTimer { handler_id, token } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; - if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) { + if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) { event_loop.clear_timeout(timer.timeout); } }, @@ -277,7 +278,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync handler.deregister_stream(token, event_loop); // unregister a timer associated with the token (if any) let timer_id = token + handler_id * TOKENS_PER_HANDLER; - if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) { + if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) { event_loop.clear_timeout(timer.timeout); } } diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index b860ae598..15f6a28b5 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -20,6 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; +use misc::RwLockable; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY}; use super::traits::JournalDB; use kvdb::{Database, DBTransaction, DatabaseConfig}; @@ -225,7 +226,7 @@ impl EarlyMergeDB { #[cfg(test)] fn can_reconstruct_refs(&self) -> bool { let (latest_era, reconstructed) = Self::read_refs(&self.backing); - let refs = self.refs.as_ref().unwrap().write().unwrap(); + let refs = self.refs.as_ref().unwrap().unwrapped_write(); if *refs != reconstructed || latest_era != self.latest_era { let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::>(); let clean_recon = reconstructed.into_iter().filter_map(|(k, v)| if refs.get(&k) == Some(&v) {None} else {Some((k.clone(), v.clone()))}).collect::>(); @@ -333,7 +334,7 @@ impl JournalDB for EarlyMergeDB { fn mem_used(&self) -> usize { self.overlay.mem_used() + match self.refs { - Some(ref c) => c.read().unwrap().heap_size_of_children(), + Some(ref c) => c.unwrapped_read().heap_size_of_children(), None => 0 } } @@ -385,7 +386,7 @@ impl JournalDB for EarlyMergeDB { // // record new commit's details. - let mut refs = self.refs.as_ref().unwrap().write().unwrap(); + let mut refs = self.refs.as_ref().unwrap().unwrapped_write(); let batch = DBTransaction::new(); let trace = false; { diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 7afe7e5ac..5c60e22b5 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -20,6 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; +use misc::RwLockable; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY}; use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] @@ -136,7 +137,7 @@ impl OverlayRecentDB { #[cfg(test)] fn can_reconstruct_refs(&self) -> bool { let reconstructed = Self::read_overlay(&self.backing); - let journal_overlay = self.journal_overlay.read().unwrap(); + let journal_overlay = self.journal_overlay.unwrapped_read(); *journal_overlay == reconstructed } @@ -199,7 +200,7 @@ impl JournalDB for OverlayRecentDB { fn mem_used(&self) -> usize { let mut mem = self.transaction_overlay.mem_used(); - let overlay = self.journal_overlay.read().unwrap(); + let overlay = self.journal_overlay.unwrapped_read(); mem += overlay.backing_overlay.mem_used(); mem += overlay.journal.heap_size_of_children(); mem @@ -209,12 +210,12 @@ impl JournalDB for OverlayRecentDB { self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() } - fn latest_era(&self) -> Option { self.journal_overlay.read().unwrap().latest_era } + fn latest_era(&self) -> Option { self.journal_overlay.unwrapped_read().latest_era } fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // record new commit's details. trace!("commit: #{} ({}), end era: {:?}", now, id, end); - let mut journal_overlay = self.journal_overlay.write().unwrap(); + let mut journal_overlay = self.journal_overlay.unwrapped_write(); let batch = DBTransaction::new(); { let mut r = RlpStream::new_list(3); @@ -321,7 +322,7 @@ impl HashDB for OverlayRecentDB { match k { Some(&(ref d, rc)) if rc > 0 => Some(d), _ => { - let v = self.journal_overlay.read().unwrap().backing_overlay.get(key).map(|v| v.to_vec()); + let v = self.journal_overlay.unwrapped_read().backing_overlay.get(key).map(|v| v.to_vec()); match v { Some(x) => { Some(&self.transaction_overlay.denote(key, x).0) diff --git a/util/src/log.rs b/util/src/log.rs index ed3a5376d..be441384e 100644 --- a/util/src/log.rs +++ b/util/src/log.rs @@ -23,6 +23,7 @@ use env_logger::LogBuilder; use std::sync::{RwLock, RwLockReadGuard}; use std::sync::atomic::{Ordering, AtomicBool}; use arrayvec::ArrayVec; +use misc::RwLockable; pub use ansi_term::{Colour, Style}; lazy_static! { @@ -90,7 +91,7 @@ impl RotatingLogger { /// Append new log entry pub fn append(&self, log: String) { - self.logs.write().unwrap().insert(0, log); + self.logs.unwrapped_write().insert(0, log); } /// Return levels @@ -100,7 +101,7 @@ impl RotatingLogger { /// Return logs pub fn logs(&self) -> RwLockReadGuard> { - self.logs.read().unwrap() + self.logs.unwrapped_read() } } diff --git a/util/src/misc.rs b/util/src/misc.rs index 3d4df8b94..c56bcb85a 100644 --- a/util/src/misc.rs +++ b/util/src/misc.rs @@ -75,3 +75,18 @@ pub trait Lockable { impl Lockable for Mutex { fn locked(&self) -> MutexGuard { self.lock().unwrap() } } + +/// Object can be read or write locked directly into a guard. +pub trait RwLockable { + /// Read-lock object directly into a `ReadGuard`. + fn unwrapped_read(&self) -> RwLockReadGuard; + + /// Write-lock object directly into a `WriteGuard`. + fn unwrapped_write(&self) -> RwLockWriteGuard; +} + +impl RwLockable for RwLock { + fn unwrapped_read(&self) -> RwLockReadGuard { self.read().unwrap() } + fn unwrapped_write(&self) -> RwLockWriteGuard { self.write().unwrap() } +} + diff --git a/util/src/network/host.rs b/util/src/network/host.rs index c85b4cc57..0caaef74c 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -216,7 +216,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone fn resolve_session(&self, peer: PeerId) -> Option { match self.session_id { Some(id) if id == peer => self.session.clone(), - _ => self.sessions.read().unwrap().get(peer).cloned(), + _ => self.sessions.unwrapped_read().get(peer).cloned(), } } @@ -422,7 +422,7 @@ impl Host where Message: Send + Sync + Clone { Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - self.nodes.write().unwrap().add_node(n); + self.nodes.unwrapped_write().add_node(n); if let Some(ref mut discovery) = *self.discovery.locked() { discovery.add_node(entry); } @@ -434,7 +434,7 @@ impl Host where Message: Send + Sync + Clone { let n = try!(Node::from_str(id)); let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - self.reserved_nodes.write().unwrap().insert(n.id.clone()); + self.reserved_nodes.unwrapped_write().insert(n.id.clone()); if let Some(ref mut discovery) = *self.discovery.locked() { discovery.add_node(entry); @@ -444,16 +444,16 @@ impl Host where Message: Send + Sync + Clone { } pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext>) { - let mut info = self.info.write().unwrap(); + let mut info = self.info.unwrapped_write(); if info.config.non_reserved_mode != mode { info.config.non_reserved_mode = mode.clone(); drop(info); if let NonReservedPeerMode::Deny = mode { // disconnect all non-reserved peers here. - let reserved: HashSet = self.reserved_nodes.read().unwrap().clone(); + let reserved: HashSet = self.reserved_nodes.unwrapped_read().clone(); let mut to_kill = Vec::new(); - for e in self.sessions.write().unwrap().iter_mut() { + for e in self.sessions.unwrapped_write().iter_mut() { let mut s = e.locked(); { let id = s.id(); @@ -475,7 +475,7 @@ impl Host where Message: Send + Sync + Clone { pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> { let n = try!(Node::from_str(id)); - self.reserved_nodes.write().unwrap().remove(&n.id); + self.reserved_nodes.unwrapped_write().remove(&n.id); Ok(()) } @@ -485,11 +485,11 @@ impl Host where Message: Send + Sync + Clone { } pub fn external_url(&self) -> Option { - self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone()))) + self.info.unwrapped_read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.unwrapped_read().id().clone(), e.clone()))) } pub fn local_url(&self) -> String { - let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone())); + let r = format!("{}", Node::new(self.info.unwrapped_read().id().clone(), self.info.unwrapped_read().local_endpoint.clone())); println!("{}", r); r } @@ -497,7 +497,7 @@ impl Host where Message: Send + Sync + Clone { pub fn stop(&self, io: &IoContext>) -> Result<(), UtilError> { self.stopping.store(true, AtomicOrdering::Release); let mut to_kill = Vec::new(); - for e in self.sessions.write().unwrap().iter_mut() { + for e in self.sessions.unwrapped_write().iter_mut() { let mut s = e.locked(); s.disconnect(io, DisconnectReason::ClientQuit); to_kill.push(s.token()); @@ -512,16 +512,16 @@ impl Host where Message: Send + Sync + Clone { fn init_public_interface(&self, io: &IoContext>) -> Result<(), UtilError> { io.clear_timer(INIT_PUBLIC).unwrap(); - if self.info.read().unwrap().public_endpoint.is_some() { + if self.info.unwrapped_read().public_endpoint.is_some() { return Ok(()); } - let local_endpoint = self.info.read().unwrap().local_endpoint.clone(); - let public_address = self.info.read().unwrap().config.public_address.clone(); + let local_endpoint = self.info.unwrapped_read().local_endpoint.clone(); + let public_address = self.info.unwrapped_read().config.public_address.clone(); let public_endpoint = match public_address { None => { let public_address = select_public_address(local_endpoint.address.port()); let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port }; - if self.info.read().unwrap().config.nat_enabled { + if self.info.unwrapped_read().config.nat_enabled { match map_external_address(&local_endpoint) { Some(endpoint) => { info!("NAT mapped to external address {}", endpoint.address); @@ -536,7 +536,7 @@ impl Host where Message: Send + Sync + Clone { Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port } }; - self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone()); + self.info.unwrapped_write().public_endpoint = Some(public_endpoint.clone()); if let Some(url) = self.external_url() { io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); @@ -544,15 +544,15 @@ impl Host where Message: Send + Sync + Clone { // Initialize discovery. let discovery = { - let info = self.info.read().unwrap(); + let info = self.info.unwrapped_read(); if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) } else { None } }; if let Some(mut discovery) = discovery { - discovery.init_node_list(self.nodes.read().unwrap().unordered_entries()); - for n in self.nodes.read().unwrap().unordered_entries() { + discovery.init_node_list(self.nodes.unwrapped_read().unordered_entries()); + for n in self.nodes.unwrapped_read().unordered_entries() { discovery.add_node(n.clone()); } *self.discovery.locked() = Some(discovery); @@ -571,7 +571,7 @@ impl Host where Message: Send + Sync + Clone { } fn have_session(&self, id: &NodeId) -> bool { - self.sessions.read().unwrap().iter().any(|e| e.locked().info.id == Some(id.clone())) + self.sessions.unwrapped_read().iter().any(|e| e.locked().info.id == Some(id.clone())) } fn session_count(&self) -> usize { @@ -579,16 +579,16 @@ impl Host where Message: Send + Sync + Clone { } fn connecting_to(&self, id: &NodeId) -> bool { - self.sessions.read().unwrap().iter().any(|e| e.locked().id() == Some(id)) + self.sessions.unwrapped_read().iter().any(|e| e.locked().id() == Some(id)) } fn handshake_count(&self) -> usize { - self.sessions.read().unwrap().count() - self.session_count() + self.sessions.unwrapped_read().count() - self.session_count() } fn keep_alive(&self, io: &IoContext>) { let mut to_kill = Vec::new(); - for e in self.sessions.write().unwrap().iter_mut() { + for e in self.sessions.unwrapped_write().iter_mut() { let mut s = e.locked(); if !s.keep_alive(io) { s.disconnect(io, DisconnectReason::PingTimeout); @@ -603,7 +603,7 @@ impl Host where Message: Send + Sync + Clone { fn connect_peers(&self, io: &IoContext>) { let (ideal_peers, mut pin) = { - let info = self.info.read().unwrap(); + let info = self.info.unwrapped_read(); if info.capabilities.is_empty() { return; } @@ -613,7 +613,7 @@ impl Host where Message: Send + Sync + Clone { }; let session_count = self.session_count(); - let reserved_nodes = self.reserved_nodes.read().unwrap(); + let reserved_nodes = self.reserved_nodes.unwrapped_read(); if session_count >= ideal_peers as usize + reserved_nodes.len() { // check if all pinned nodes are connected. if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { @@ -634,7 +634,7 @@ impl Host where Message: Send + Sync + Clone { // iterate over all nodes, reserved ones coming first. // if we are pinned to only reserved nodes, ignore all others. let nodes = reserved_nodes.iter().cloned().chain(if !pin { - self.nodes.read().unwrap().nodes() + self.nodes.unwrapped_read().nodes() } else { Vec::new() }); @@ -662,7 +662,7 @@ impl Host where Message: Send + Sync + Clone { let socket = { let address = { - let mut nodes = self.nodes.write().unwrap(); + let mut nodes = self.nodes.unwrapped_write(); if let Some(node) = nodes.get_mut(id) { node.last_attempted = Some(::time::now()); node.endpoint.address @@ -687,10 +687,10 @@ impl Host where Message: Send + Sync + Clone { #[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))] fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext>) -> Result<(), UtilError> { - let nonce = self.info.write().unwrap().next_nonce(); - let mut sessions = self.sessions.write().unwrap(); + let nonce = self.info.unwrapped_write().next_nonce(); + let mut sessions = self.sessions.unwrapped_write(); let token = sessions.insert_with_opt(|token| { - match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read().unwrap()) { + match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.unwrapped_read()) { Ok(s) => Some(Arc::new(Mutex::new(s))), Err(e) => { debug!(target: "network", "Session create error: {:?}", e); @@ -726,10 +726,10 @@ impl Host where Message: Send + Sync + Clone { } fn session_writable(&self, token: StreamToken, io: &IoContext>) { - let session = { self.sessions.read().unwrap().get(token).cloned() }; + let session = { self.sessions.unwrapped_read().get(token).cloned() }; if let Some(session) = session { let mut s = session.locked(); - if let Err(e) = s.writable(io, &self.info.read().unwrap()) { + if let Err(e) = s.writable(io, &self.info.unwrapped_read()) { trace!(target: "network", "Session write error: {}: {:?}", token, e); } if s.done() { @@ -748,16 +748,16 @@ impl Host where Message: Send + Sync + Clone { let mut ready_data: Vec = Vec::new(); let mut packet_data: Vec<(ProtocolId, PacketId, Vec)> = Vec::new(); let mut kill = false; - let session = { self.sessions.read().unwrap().get(token).cloned() }; + let session = { self.sessions.unwrapped_read().get(token).cloned() }; if let Some(session) = session.clone() { let mut s = session.locked(); loop { - match s.readable(io, &self.info.read().unwrap()) { + match s.readable(io, &self.info.unwrapped_read()) { Err(e) => { trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e { if let Some(id) = s.id() { - self.nodes.write().unwrap().mark_as_useless(id); + self.nodes.unwrapped_write().mark_as_useless(id); } } kill = true; @@ -767,9 +767,9 @@ impl Host where Message: Send + Sync + Clone { self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); - let reserved_nodes = self.reserved_nodes.read().unwrap(); + let reserved_nodes = self.reserved_nodes.unwrapped_read(); let (ideal_peers, reserved_only) = { - let info = self.info.read().unwrap(); + let info = self.info.unwrapped_read(); (info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny) }; @@ -784,14 +784,14 @@ impl Host where Message: Send + Sync + Clone { // Add it no node table if let Ok(address) = s.remote_addr() { let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; - self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); + self.nodes.unwrapped_write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); let mut discovery = self.discovery.locked(); if let Some(ref mut discovery) = *discovery.deref_mut() { discovery.add_node(entry); } } } - for (p, _) in self.handlers.read().unwrap().iter() { + for (p, _) in self.handlers.unwrapped_read().iter() { if s.have_capability(p) { ready_data.push(p); } @@ -802,7 +802,7 @@ impl Host where Message: Send + Sync + Clone { protocol, packet_id, }) => { - match self.handlers.read().unwrap().get(protocol) { + match self.handlers.unwrapped_read().get(protocol) { None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) }, Some(_) => packet_data.push((protocol, packet_id, data)), } @@ -815,16 +815,16 @@ impl Host where Message: Send + Sync + Clone { if kill { self.kill_connection(token, io, true); } - let handlers = self.handlers.read().unwrap(); + let handlers = self.handlers.unwrapped_read(); for p in ready_data { let h = handlers.get(p).unwrap().clone(); self.stats.inc_sessions(); - let reserved = self.reserved_nodes.read().unwrap(); + let reserved = self.reserved_nodes.unwrapped_read(); h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); } for (p, packet_id, data) in packet_data { let h = handlers.get(p).unwrap().clone(); - let reserved = self.reserved_nodes.read().unwrap(); + let reserved = self.reserved_nodes.unwrapped_read(); h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); } } @@ -840,14 +840,14 @@ impl Host where Message: Send + Sync + Clone { let mut deregister = false; let mut expired_session = None; if let FIRST_SESSION ... LAST_SESSION = token { - let sessions = self.sessions.write().unwrap(); + let sessions = self.sessions.unwrapped_write(); if let Some(session) = sessions.get(token).cloned() { expired_session = Some(session.clone()); let mut s = session.locked(); if !s.expired() { if s.is_ready() { self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); - for (p, _) in self.handlers.read().unwrap().iter() { + for (p, _) in self.handlers.unwrapped_read().iter() { if s.have_capability(p) { to_disconnect.push(p); } @@ -861,12 +861,12 @@ impl Host where Message: Send + Sync + Clone { } if let Some(id) = failure_id { if remote { - self.nodes.write().unwrap().note_failure(&id); + self.nodes.unwrapped_write().note_failure(&id); } } for p in to_disconnect { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - let reserved = self.reserved_nodes.read().unwrap(); + let h = self.handlers.unwrapped_read().get(p).unwrap().clone(); + let reserved = self.reserved_nodes.unwrapped_read(); h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); } if deregister { @@ -877,7 +877,7 @@ impl Host where Message: Send + Sync + Clone { fn update_nodes(&self, io: &IoContext>, node_changes: TableUpdates) { let mut to_remove: Vec = Vec::new(); { - let sessions = self.sessions.write().unwrap(); + let sessions = self.sessions.unwrapped_write(); for c in sessions.iter() { let s = c.locked(); if let Some(id) = s.id() { @@ -891,7 +891,7 @@ impl Host where Message: Send + Sync + Clone { trace!(target: "network", "Removed from node table: {}", i); self.kill_connection(i, io, false); } - self.nodes.write().unwrap().update(node_changes); + self.nodes.unwrapped_write().update(node_changes); } } @@ -963,13 +963,13 @@ impl IoHandler> for Host where Messa }, NODE_TABLE => { trace!(target: "network", "Refreshing node table"); - self.nodes.write().unwrap().clear_useless(); + self.nodes.unwrapped_write().clear_useless(); }, - _ => match self.timers.read().unwrap().get(&token).cloned() { - Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { + _ => match self.timers.unwrapped_read().get(&token).cloned() { + Some(timer) => match self.handlers.unwrapped_read().get(timer.protocol).cloned() { None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, Some(h) => { - let reserved = self.reserved_nodes.read().unwrap(); + let reserved = self.reserved_nodes.unwrapped_read(); h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token); } }, @@ -989,10 +989,10 @@ impl IoHandler> for Host where Messa ref versions } => { let h = handler.clone(); - let reserved = self.reserved_nodes.read().unwrap(); + let reserved = self.reserved_nodes.unwrapped_read(); h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved)); - self.handlers.write().unwrap().insert(protocol, h); - let mut info = self.info.write().unwrap(); + self.handlers.unwrapped_write().insert(protocol, h); + let mut info = self.info.unwrapped_write(); for v in versions { info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } @@ -1003,17 +1003,17 @@ impl IoHandler> for Host where Messa ref token, } => { let handler_token = { - let mut timer_counter = self.timer_counter.write().unwrap(); + let mut timer_counter = self.timer_counter.unwrapped_write(); let counter = &mut *timer_counter; let handler_token = *counter; *counter += 1; handler_token }; - self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); + self.timers.unwrapped_write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e)); }, NetworkIoMessage::Disconnect(ref peer) => { - let session = { self.sessions.read().unwrap().get(*peer).cloned() }; + let session = { self.sessions.unwrapped_read().get(*peer).cloned() }; if let Some(session) = session { session.locked().disconnect(io, DisconnectReason::DisconnectRequested); } @@ -1021,19 +1021,19 @@ impl IoHandler> for Host where Messa self.kill_connection(*peer, io, false); }, NetworkIoMessage::DisablePeer(ref peer) => { - let session = { self.sessions.read().unwrap().get(*peer).cloned() }; + let session = { self.sessions.unwrapped_read().get(*peer).cloned() }; if let Some(session) = session { session.locked().disconnect(io, DisconnectReason::DisconnectRequested); if let Some(id) = session.locked().id() { - self.nodes.write().unwrap().mark_as_useless(id) + self.nodes.unwrapped_write().mark_as_useless(id) } } trace!(target: "network", "Disabling peer {}", peer); self.kill_connection(*peer, io, false); }, NetworkIoMessage::User(ref message) => { - let reserved = self.reserved_nodes.read().unwrap(); - for (p, h) in self.handlers.read().unwrap().iter() { + let reserved = self.reserved_nodes.unwrapped_read(); + for (p, h) in self.handlers.unwrapped_read().iter() { h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message); } } @@ -1044,7 +1044,7 @@ impl IoHandler> for Host where Messa fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>>) { match stream { FIRST_SESSION ... LAST_SESSION => { - let session = { self.sessions.read().unwrap().get(stream).cloned() }; + let session = { self.sessions.unwrapped_read().get(stream).cloned() }; if let Some(session) = session { session.locked().register_socket(reg, event_loop).expect("Error registering socket"); } @@ -1058,7 +1058,7 @@ impl IoHandler> for Host where Messa fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop>>) { match stream { FIRST_SESSION ... LAST_SESSION => { - let mut connections = self.sessions.write().unwrap(); + let mut connections = self.sessions.unwrapped_write(); if let Some(connection) = connections.get(stream).cloned() { connection.locked().deregister_socket(event_loop).expect("Error deregistering socket"); connections.remove(stream); @@ -1072,7 +1072,7 @@ impl IoHandler> for Host where Messa fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>>) { match stream { FIRST_SESSION ... LAST_SESSION => { - let connection = { self.sessions.read().unwrap().get(stream).cloned() }; + let connection = { self.sessions.unwrapped_read().get(stream).cloned() }; if let Some(connection) = connection { connection.locked().update_socket(reg, event_loop).expect("Error updating socket"); } diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 353a24bbe..711a8d860 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -17,6 +17,7 @@ use std::sync::*; use error::*; use panics::*; +use misc::RwLockable; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::NetworkError; use network::host::{Host, NetworkIoMessage, ProtocolId}; @@ -80,19 +81,19 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Returns external url if available. pub fn external_url(&self) -> Option { - let host = self.host.read().unwrap(); + let host = self.host.unwrapped_read(); host.as_ref().and_then(|h| h.external_url()) } /// Returns external url if available. pub fn local_url(&self) -> Option { - let host = self.host.read().unwrap(); + let host = self.host.unwrapped_read(); host.as_ref().map(|h| h.local_url()) } /// Start network IO pub fn start(&self) -> Result<(), UtilError> { - let mut host = self.host.write().unwrap(); + let mut host = self.host.unwrapped_write(); if host.is_none() { let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone()))); try!(self.io_service.register_handler(h.clone())); @@ -103,7 +104,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Stop network IO pub fn stop(&self) -> Result<(), UtilError> { - let mut host = self.host.write().unwrap(); + let mut host = self.host.unwrapped_write(); if let Some(ref host) = *host { let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host try!(host.stop(&io)); @@ -114,7 +115,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Try to add a reserved peer. pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { - let host = self.host.read().unwrap(); + let host = self.host.unwrapped_read(); if let Some(ref host) = *host { host.add_reserved_node(peer) } else { @@ -124,7 +125,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Try to remove a reserved peer. pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { - let host = self.host.read().unwrap(); + let host = self.host.unwrapped_read(); if let Some(ref host) = *host { host.remove_reserved_node(peer) } else { @@ -134,7 +135,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Set the non-reserved peer mode. pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) { - let host = self.host.read().unwrap(); + let host = self.host.unwrapped_read(); if let Some(ref host) = *host { let io_ctxt = IoContext::new(self.io_service.channel(), 0); host.set_non_reserved_mode(mode, &io_ctxt); diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index 5b6c863cb..85591b462 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::thread; use std::time::*; use common::*; +use misc::*; use network::*; use io::TimerToken; use crypto::KeyPair; diff --git a/util/src/panics.rs b/util/src/panics.rs index 368e6a9b4..7b6c92639 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -120,46 +120,49 @@ impl OnPanicListener for F #[ignore] // panic forwarding doesnt work on the same thread in beta fn should_notify_listeners_about_panic () { use std::sync::RwLock; + use misc::RwLockable; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new(); - p.on_panic(move |t| i.write().unwrap().push(t)); + p.on_panic(move |t| i.unwrapped_write().push(t)); // when p.catch_panic(|| panic!("Panic!")).unwrap_err(); // then - assert!(invocations.read().unwrap()[0] == "Panic!"); + assert!(invocations.unwrapped_read()[0] == "Panic!"); } #[test] #[ignore] // panic forwarding doesnt work on the same thread in beta fn should_notify_listeners_about_panic_when_string_is_dynamic () { use std::sync::RwLock; + use misc::RwLockable; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new(); - p.on_panic(move |t| i.write().unwrap().push(t)); + p.on_panic(move |t| i.unwrapped_write().push(t)); // when p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); // then - assert!(invocations.read().unwrap()[0] == "Panic: 1"); + assert!(invocations.unwrapped_read()[0] == "Panic: 1"); } #[test] fn should_notify_listeners_about_panic_in_other_thread () { use std::thread; use std::sync::RwLock; + use misc::RwLockable; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new(); - p.on_panic(move |t| i.write().unwrap().push(t)); + p.on_panic(move |t| i.unwrapped_write().push(t)); // when let t = thread::spawn(move || @@ -168,18 +171,20 @@ fn should_notify_listeners_about_panic_in_other_thread () { t.join().unwrap_err(); // then - assert!(invocations.read().unwrap()[0] == "Panic!"); + assert!(invocations.unwrapped_read()[0] == "Panic!"); } #[test] #[ignore] // panic forwarding doesnt work on the same thread in beta fn should_forward_panics () { use std::sync::RwLock; + use misc::RwLockable; + // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new_in_arc(); - p.on_panic(move |t| i.write().unwrap().push(t)); + p.on_panic(move |t| i.unwrapped_write().push(t)); let p2 = PanicHandler::new(); p.forward_from(&p2); @@ -188,5 +193,5 @@ use std::sync::RwLock; p2.catch_panic(|| panic!("Panic!")).unwrap_err(); // then - assert!(invocations.read().unwrap()[0] == "Panic!"); + assert!(invocations.unwrapped_read()[0] == "Panic!"); }