diff --git a/Cargo.lock b/Cargo.lock index d952b7b98..28a08fb68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,6 +232,7 @@ name = "ethash" version = "1.3.0" dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "primal 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "sha3 0.1.0", ] @@ -403,6 +404,7 @@ dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", "nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)", "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", @@ -907,6 +909,17 @@ dependencies = [ "parity-dapps 0.3.0 (git+https://github.com/ethcore/parity-ui.git)", ] +[[package]] +name = "parking_lot" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "phf" version = "0.7.14" @@ -1174,6 +1187,11 @@ name = "slab" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "smallvec" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "solicit" version = "0.4.4" diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index a94cf1bde..133b4ebaa 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -70,7 +70,7 @@ mod url; use std::sync::{Arc, Mutex}; use std::net::SocketAddr; use std::collections::HashMap; -use ethcore_util::misc::Lockable; + use jsonrpc_core::{IoHandler, IoDelegate}; use router::auth::{Authorization, NoAuth, HttpBasicAuth}; use ethcore_rpc::Extendable; @@ -153,7 +153,7 @@ impl Server { /// Set callback for panics. pub fn set_panic_handler(&self, handler: F) where F : Fn() -> () + Send + 'static { - *self.panic_handler.locked() = Some(Box::new(handler)); + *self.panic_handler.lock().unwrap() = Some(Box::new(handler)); } } diff --git a/db/src/database.rs b/db/src/database.rs index 2360e69ca..777ec3bbc 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -17,7 +17,6 @@ //! Ethcore rocksdb ipc service use traits::*; -use misc::RwLockable; use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; use std::sync::{RwLock, Arc}; use std::convert::From; @@ -137,8 +136,8 @@ impl Database { } pub fn flush(&self) -> Result<(), Error> { - let mut cache_lock = self.write_cache.unwrapped_write(); - let db_lock = self.db.unwrapped_read(); + let mut cache_lock = self.write_cache.write(); + let db_lock = self.db.read(); if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().unwrap(); @@ -147,8 +146,8 @@ impl Database { } pub fn flush_all(&self) -> Result<(), Error> { - let mut cache_lock = self.write_cache.unwrapped_write(); - let db_lock = self.db.unwrapped_read(); + let mut cache_lock = self.write_cache.write(); + let db_lock = self.db.read(); if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().expect("we should have exited with Ok(()) on the previous step"); @@ -167,7 +166,7 @@ impl Drop for Database { #[derive(Ipc)] impl DatabaseService for Database { fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error> { - let mut db = self.db.unwrapped_write(); + let mut db = self.db.write(); if db.is_some() { return Err(Error::AlreadyOpen); } let mut opts = Options::new(); @@ -194,7 +193,7 @@ impl DatabaseService for Database { fn close(&self) -> Result<(), Error> { try!(self.flush_all()); - let mut db = self.db.unwrapped_write(); + let mut db = self.db.write(); if db.is_none() { return Err(Error::IsClosed); } *db = None; @@ -202,19 +201,19 @@ impl DatabaseService for Database { } fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - let mut cache_lock = self.write_cache.unwrapped_write(); + let mut cache_lock = self.write_cache.write(); cache_lock.write(key.to_vec(), value.to_vec()); Ok(()) } fn delete(&self, key: &[u8]) -> Result<(), Error> { - let mut cache_lock = self.write_cache.unwrapped_write(); + let mut cache_lock = self.write_cache.write(); cache_lock.remove(key.to_vec()); Ok(()) } fn write(&self, transaction: DBTransaction) -> Result<(), Error> { - let mut cache_lock = self.write_cache.unwrapped_write(); + let mut cache_lock = self.write_cache.write(); let mut writes = transaction.writes.borrow_mut(); for kv in writes.drain(..) { @@ -231,13 +230,13 @@ impl DatabaseService for Database { fn get(&self, key: &[u8]) -> Result>, Error> { { let key_vec = key.to_vec(); - let cache_hit = self.write_cache.unwrapped_read().get(&key_vec); + let cache_hit = self.write_cache.read().get(&key_vec); if cache_hit.is_some() { return Ok(Some(cache_hit.expect("cache_hit.is_some() = true, still there is none somehow here"))) } } - let db_lock = self.db.unwrapped_read(); + let db_lock = self.db.read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); match try!(db.get(key)) { @@ -249,7 +248,7 @@ impl DatabaseService for Database { } fn get_by_prefix(&self, prefix: &[u8]) -> Result>, Error> { - let db_lock = self.db.unwrapped_read(); + let db_lock = self.db.read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); let mut iter = db.iterator(IteratorMode::From(prefix, Direction::Forward)); @@ -261,17 +260,17 @@ impl DatabaseService for Database { } fn is_empty(&self) -> Result { - let db_lock = self.db.unwrapped_read(); + let db_lock = self.db.read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); Ok(db.iterator(IteratorMode::Start).next().is_none()) } fn iter(&self) -> Result { - let db_lock = self.db.unwrapped_read(); + let db_lock = self.db.read(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - let mut iterators = self.iterators.unwrapped_write(); + let mut iterators = self.iterators.write(); let next_iterator = iterators.keys().last().unwrap_or(&0) + 1; iterators.insert(next_iterator, db.iterator(IteratorMode::Start)); Ok(next_iterator) @@ -279,7 +278,7 @@ impl DatabaseService for Database { fn iter_next(&self, handle: IteratorHandle) -> Option { - let mut iterators = self.iterators.unwrapped_write(); + let mut iterators = self.iterators.write(); let mut iterator = match iterators.get_mut(&handle) { Some(some_iterator) => some_iterator, None => { return None; }, @@ -294,7 +293,7 @@ impl DatabaseService for Database { } fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error> { - let mut iterators = self.iterators.unwrapped_write(); + let mut iterators = self.iterators.write(); iterators.remove(&handle); Ok(()) } diff --git a/ethash/Cargo.toml b/ethash/Cargo.toml index 94a714d55..a0501e8d3 100644 --- a/ethash/Cargo.toml +++ b/ethash/Cargo.toml @@ -9,3 +9,4 @@ authors = ["arkpar io::Result<()> { - let seed_compute = self.seed_compute.lock().unwrap(); + let seed_compute = self.seed_compute.lock(); let path = Light::file_path(seed_compute.get_seedhash(self.block_number)); try!(fs::create_dir_all(path.parent().unwrap())); let mut file = try!(File::create(path)); diff --git a/ethash/src/lib.rs b/ethash/src/lib.rs index 3f67f2748..8fb2c43f6 100644 --- a/ethash/src/lib.rs +++ b/ethash/src/lib.rs @@ -18,6 +18,8 @@ //! See https://github.com/ethereum/wiki/wiki/Ethash extern crate primal; extern crate sha3; +extern crate parking_lot; + #[macro_use] extern crate log; mod compute; @@ -26,7 +28,8 @@ use std::mem; use compute::Light; pub use compute::{ETHASH_EPOCH_LENGTH, H256, ProofOfWork, SeedHashCompute, quick_get_difficulty}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use parking_lot::Mutex; struct LightCache { recent_epoch: Option, @@ -61,7 +64,7 @@ impl EthashManager { pub fn compute_light(&self, block_number: u64, header_hash: &H256, nonce: u64) -> ProofOfWork { let epoch = block_number / ETHASH_EPOCH_LENGTH; let light = { - let mut lights = self.cache.lock().unwrap(); + let mut lights = self.cache.lock(); let light = match lights.recent_epoch.clone() { Some(ref e) if *e == epoch => lights.recent.clone(), _ => match lights.prev_epoch.clone() { @@ -108,12 +111,12 @@ fn test_lru() { let hash = [0u8; 32]; ethash.compute_light(1, &hash, 1); ethash.compute_light(50000, &hash, 1); - assert_eq!(ethash.cache.lock().unwrap().recent_epoch.unwrap(), 1); - assert_eq!(ethash.cache.lock().unwrap().prev_epoch.unwrap(), 0); + assert_eq!(ethash.cache.lock().recent_epoch.unwrap(), 1); + assert_eq!(ethash.cache.lock().prev_epoch.unwrap(), 0); ethash.compute_light(1, &hash, 1); - assert_eq!(ethash.cache.lock().unwrap().recent_epoch.unwrap(), 0); - assert_eq!(ethash.cache.lock().unwrap().prev_epoch.unwrap(), 1); + assert_eq!(ethash.cache.lock().recent_epoch.unwrap(), 0); + assert_eq!(ethash.cache.lock().prev_epoch.unwrap(), 1); ethash.compute_light(70000, &hash, 1); - assert_eq!(ethash.cache.lock().unwrap().recent_epoch.unwrap(), 2); - assert_eq!(ethash.cache.lock().unwrap().prev_epoch.unwrap(), 0); + assert_eq!(ethash.cache.lock().recent_epoch.unwrap(), 2); + assert_eq!(ethash.cache.lock().prev_epoch.unwrap(), 0); } diff --git a/ethcore/src/account_provider.rs b/ethcore/src/account_provider.rs index 90d44bfba..332426cab 100644 --- a/ethcore/src/account_provider.rs +++ b/ethcore/src/account_provider.rs @@ -17,13 +17,13 @@ //! Account management. use std::fmt; -use std::sync::RwLock; use std::collections::HashMap; -use util::{Address as H160, H256, H520, RwLockable}; +use util::{Address as H160, H256, H520, RwLock}; use ethstore::{SecretStore, Error as SSError, SafeAccount, EthStore}; use ethstore::dir::{KeyDirectory}; use ethstore::ethkey::{Address as SSAddress, Message as SSMessage, Secret as SSSecret, Random, Generator}; + /// Type of unlock. #[derive(Clone)] enum Unlock { @@ -177,7 +177,7 @@ impl AccountProvider { // check if account is already unlocked pernamently, if it is, do nothing { - let unlocked = self.unlocked.unwrapped_read(); + let unlocked = self.unlocked.read(); if let Some(data) = unlocked.get(&account) { if let Unlock::Perm = data.unlock { return Ok(()) @@ -190,7 +190,7 @@ impl AccountProvider { password: password, }; - let mut unlocked = self.unlocked.unwrapped_write(); + let mut unlocked = self.unlocked.write(); unlocked.insert(account, data); Ok(()) } @@ -208,7 +208,7 @@ impl AccountProvider { /// Checks if given account is unlocked pub fn is_unlocked(&self, account: A) -> bool where Address: From { let account = Address::from(account).into(); - let unlocked = self.unlocked.unwrapped_read(); + let unlocked = self.unlocked.read(); unlocked.get(&account).is_some() } @@ -218,12 +218,12 @@ impl AccountProvider { let message = Message::from(message).into(); let data = { - let unlocked = self.unlocked.unwrapped_read(); + let unlocked = self.unlocked.read(); try!(unlocked.get(&account).ok_or(Error::NotUnlocked)).clone() }; if let Unlock::Temp = data.unlock { - let mut unlocked = self.unlocked.unwrapped_write(); + let mut unlocked = self.unlocked.write(); unlocked.remove(&account).expect("data exists: so key must exist: qed"); } diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 19f5d0a4c..623fab4a6 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -193,14 +193,14 @@ impl BlockQueue { fn verify(verification: Arc, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Acquire) { { - let mut unverified = verification.unverified.locked(); + let mut unverified = verification.unverified.lock(); - if unverified.is_empty() && verification.verifying.locked().is_empty() { + if unverified.is_empty() && verification.verifying.lock().is_empty() { empty.notify_all(); } while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { - unverified = wait.wait(unverified).unwrap(); + wait.wait(&mut unverified); } if deleting.load(AtomicOrdering::Acquire) { @@ -209,11 +209,11 @@ impl BlockQueue { } let block = { - let mut unverified = verification.unverified.locked(); + let mut unverified = verification.unverified.lock(); if unverified.is_empty() { continue; } - let mut verifying = verification.verifying.locked(); + let mut verifying = verification.verifying.lock(); let block = unverified.pop_front().unwrap(); verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); block @@ -222,7 +222,7 @@ impl BlockQueue { let block_hash = block.header.hash(); match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { Ok(verified) => { - let mut verifying = verification.verifying.locked(); + let mut verifying = verification.verifying.lock(); for e in verifying.iter_mut() { if e.hash == block_hash { e.block = Some(verified); @@ -231,16 +231,16 @@ impl BlockQueue { } if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash { // we're next! - let mut verified = verification.verified.locked(); - let mut bad = verification.bad.locked(); + let mut verified = verification.verified.lock(); + let mut bad = verification.bad.lock(); BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); ready.set(); } }, Err(err) => { - let mut verifying = verification.verifying.locked(); - let mut verified = verification.verified.locked(); - let mut bad = verification.bad.locked(); + let mut verifying = verification.verifying.lock(); + let mut verified = verification.verified.lock(); + let mut bad = verification.bad.lock(); warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); bad.insert(block_hash.clone()); verifying.retain(|e| e.hash != block_hash); @@ -265,29 +265,29 @@ impl BlockQueue { /// Clear the queue and stop verification activity. pub fn clear(&self) { - let mut unverified = self.verification.unverified.locked(); - let mut verifying = self.verification.verifying.locked(); - let mut verified = self.verification.verified.locked(); + let mut unverified = self.verification.unverified.lock(); + let mut verifying = self.verification.verifying.lock(); + let mut verified = self.verification.verified.lock(); unverified.clear(); verifying.clear(); verified.clear(); - self.processing.unwrapped_write().clear(); + self.processing.write().clear(); } /// Wait for unverified queue to be empty pub fn flush(&self) { - let mut unverified = self.verification.unverified.locked(); - while !unverified.is_empty() || !self.verification.verifying.locked().is_empty() { - unverified = self.empty.wait(unverified).unwrap(); + let mut unverified = self.verification.unverified.lock(); + while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() { + self.empty.wait(&mut unverified); } } /// Check if the block is currently in the queue pub fn block_status(&self, hash: &H256) -> BlockStatus { - if self.processing.unwrapped_read().contains(&hash) { + if self.processing.read().contains(&hash) { return BlockStatus::Queued; } - if self.verification.bad.locked().contains(&hash) { + if self.verification.bad.lock().contains(&hash) { return BlockStatus::Bad; } BlockStatus::Unknown @@ -298,11 +298,11 @@ impl BlockQueue { let header = BlockView::new(&bytes).header(); let h = header.hash(); { - if self.processing.unwrapped_read().contains(&h) { + if self.processing.read().contains(&h) { return Err(ImportError::AlreadyQueued.into()); } - let mut bad = self.verification.bad.locked(); + let mut bad = self.verification.bad.lock(); if bad.contains(&h) { return Err(ImportError::KnownBad.into()); } @@ -315,14 +315,14 @@ impl BlockQueue { match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { Ok(()) => { - self.processing.unwrapped_write().insert(h.clone()); - self.verification.unverified.locked().push_back(UnverifiedBlock { header: header, bytes: bytes }); + self.processing.write().insert(h.clone()); + self.verification.unverified.lock().push_back(UnverifiedBlock { header: header, bytes: bytes }); self.more_to_verify.notify_all(); Ok(h) }, Err(err) => { warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); - self.verification.bad.locked().insert(h.clone()); + self.verification.bad.lock().insert(h.clone()); Err(err) } } @@ -333,10 +333,10 @@ impl BlockQueue { if block_hashes.is_empty() { return; } - let mut verified_lock = self.verification.verified.locked(); + let mut verified_lock = self.verification.verified.lock(); let mut verified = verified_lock.deref_mut(); - let mut bad = self.verification.bad.locked(); - let mut processing = self.processing.unwrapped_write(); + let mut bad = self.verification.bad.lock(); + let mut processing = self.processing.write(); bad.reserve(block_hashes.len()); for hash in block_hashes { bad.insert(hash.clone()); @@ -360,7 +360,7 @@ impl BlockQueue { if block_hashes.is_empty() { return; } - let mut processing = self.processing.unwrapped_write(); + let mut processing = self.processing.write(); for hash in block_hashes { processing.remove(&hash); } @@ -368,7 +368,7 @@ impl BlockQueue { /// Removes up to `max` verified blocks from the queue pub fn drain(&self, max: usize) -> Vec { - let mut verified = self.verification.verified.locked(); + let mut verified = self.verification.verified.lock(); let count = min(max, verified.len()); let mut result = Vec::with_capacity(count); for _ in 0..count { @@ -385,15 +385,15 @@ impl BlockQueue { /// Get queue status. pub fn queue_info(&self) -> BlockQueueInfo { let (unverified_len, unverified_bytes) = { - let v = self.verification.unverified.locked(); + let v = self.verification.unverified.lock(); (v.len(), v.heap_size_of_children()) }; let (verifying_len, verifying_bytes) = { - let v = self.verification.verifying.locked(); + let v = self.verification.verifying.lock(); (v.len(), v.heap_size_of_children()) }; let (verified_len, verified_bytes) = { - let v = self.verification.verified.locked(); + let v = self.verification.verified.lock(); (v.len(), v.heap_size_of_children()) }; BlockQueueInfo { @@ -407,18 +407,18 @@ impl BlockQueue { + verifying_bytes + verified_bytes // TODO: https://github.com/servo/heapsize/pull/50 - //+ self.processing.unwrapped_read().heap_size_of_children(), + //+ self.processing.read().heap_size_of_children(), } } /// Optimise memory footprint of the heap fields. pub fn collect_garbage(&self) { { - self.verification.unverified.locked().shrink_to_fit(); - self.verification.verifying.locked().shrink_to_fit(); - self.verification.verified.locked().shrink_to_fit(); + self.verification.unverified.lock().shrink_to_fit(); + self.verification.verifying.lock().shrink_to_fit(); + self.verification.verified.lock().shrink_to_fit(); } - self.processing.unwrapped_write().shrink_to_fit(); + self.processing.write().shrink_to_fit(); } } diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 3ff030ca0..982288d92 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -170,7 +170,7 @@ impl BlockProvider for BlockChain { /// Get raw block data fn block(&self, hash: &H256) -> Option { { - let read = self.blocks.unwrapped_read(); + let read = self.blocks.read(); if let Some(v) = read.get(hash) { return Some(v.clone()); } @@ -184,7 +184,7 @@ impl BlockProvider for BlockChain { match opt { Some(b) => { let bytes: Bytes = b.to_vec(); - let mut write = self.blocks.unwrapped_write(); + let mut write = self.blocks.write(); write.insert(hash.clone(), bytes.clone()); Some(bytes) }, @@ -338,7 +338,7 @@ impl BlockChain { }; { - let mut best_block = bc.best_block.unwrapped_write(); + let mut best_block = bc.best_block.write(); best_block.number = bc.block_number(&best_block_hash).unwrap(); best_block.total_difficulty = bc.block_details(&best_block_hash).unwrap().total_difficulty; best_block.hash = best_block_hash; @@ -483,25 +483,25 @@ impl BlockChain { self.note_used(CacheID::BlockDetails(hash)); } - let mut write_details = self.block_details.unwrapped_write(); + let mut write_details = self.block_details.write(); batch.extend_with_cache(write_details.deref_mut(), update.block_details, CacheUpdatePolicy::Overwrite); } { - let mut write_receipts = self.block_receipts.unwrapped_write(); + let mut write_receipts = self.block_receipts.write(); batch.extend_with_cache(write_receipts.deref_mut(), update.block_receipts, CacheUpdatePolicy::Remove); } { - let mut write_blocks_blooms = self.blocks_blooms.unwrapped_write(); + let mut write_blocks_blooms = self.blocks_blooms.write(); batch.extend_with_cache(write_blocks_blooms.deref_mut(), update.blocks_blooms, CacheUpdatePolicy::Remove); } // These cached values must be updated last and togeterh { - let mut best_block = self.best_block.unwrapped_write(); - let mut write_hashes = self.block_hashes.unwrapped_write(); - let mut write_txs = self.transaction_addresses.unwrapped_write(); + let mut best_block = self.best_block.write(); + let mut write_hashes = self.block_hashes.write(); + let mut write_txs = self.transaction_addresses.write(); // update best block match update.info.location { @@ -728,33 +728,33 @@ impl BlockChain { /// Get best block hash. pub fn best_block_hash(&self) -> H256 { - self.best_block.unwrapped_read().hash.clone() + self.best_block.read().hash.clone() } /// Get best block number. pub fn best_block_number(&self) -> BlockNumber { - self.best_block.unwrapped_read().number + self.best_block.read().number } /// Get best block total difficulty. pub fn best_block_total_difficulty(&self) -> U256 { - self.best_block.unwrapped_read().total_difficulty + self.best_block.read().total_difficulty } /// Get current cache size. pub fn cache_size(&self) -> CacheSize { CacheSize { - blocks: self.blocks.unwrapped_read().heap_size_of_children(), - block_details: self.block_details.unwrapped_read().heap_size_of_children(), - transaction_addresses: self.transaction_addresses.unwrapped_read().heap_size_of_children(), - blocks_blooms: self.blocks_blooms.unwrapped_read().heap_size_of_children(), - block_receipts: self.block_receipts.unwrapped_read().heap_size_of_children(), + blocks: self.blocks.read().heap_size_of_children(), + block_details: self.block_details.read().heap_size_of_children(), + transaction_addresses: self.transaction_addresses.read().heap_size_of_children(), + blocks_blooms: self.blocks_blooms.read().heap_size_of_children(), + block_receipts: self.block_receipts.read().heap_size_of_children(), } } /// Let the cache system know that a cacheable item has been used. fn note_used(&self, id: CacheID) { - let mut cache_man = self.cache_man.unwrapped_write(); + let mut cache_man = self.cache_man.write(); if !cache_man.cache_usage[0].contains(&id) { cache_man.cache_usage[0].insert(id.clone()); if cache_man.in_use.contains(&id) { @@ -773,13 +773,13 @@ impl BlockChain { for _ in 0..COLLECTION_QUEUE_SIZE { { - let mut blocks = self.blocks.unwrapped_write(); - let mut block_details = self.block_details.unwrapped_write(); - let mut block_hashes = self.block_hashes.unwrapped_write(); - let mut transaction_addresses = self.transaction_addresses.unwrapped_write(); - let mut blocks_blooms = self.blocks_blooms.unwrapped_write(); - let mut block_receipts = self.block_receipts.unwrapped_write(); - let mut cache_man = self.cache_man.unwrapped_write(); + let mut blocks = self.blocks.write(); + let mut block_details = self.block_details.write(); + let mut block_hashes = self.block_hashes.write(); + let mut transaction_addresses = self.transaction_addresses.write(); + let mut blocks_blooms = self.blocks_blooms.write(); + let mut block_receipts = self.block_receipts.write(); + let mut cache_man = self.cache_man.write(); for id in cache_man.cache_usage.pop_back().unwrap().into_iter() { cache_man.in_use.remove(&id); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index f95df66fb..c92d82636 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -18,7 +18,7 @@ use std::collections::{HashSet, HashMap}; use std::ops::Deref; use std::mem; use std::collections::VecDeque; -use std::sync::*; +use std::sync::{Arc, Weak}; use std::path::{Path, PathBuf}; use std::fmt; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; @@ -30,12 +30,13 @@ use util::panics::*; use util::io::*; use util::rlp; use util::sha3::*; -use util::{Bytes, Lockable, RwLockable}; +use util::Bytes; use util::rlp::{RlpStream, Rlp, UntrustedRlp}; use util::journaldb; use util::journaldb::JournalDB; use util::kvdb::*; use util::{Applyable, Stream, View, PerfTimer, Itertools, Colour}; +use util::{Mutex, RwLock}; // other use views::BlockView; @@ -236,12 +237,12 @@ impl Client { /// Sets the actor to be notified on certain events pub fn set_notify(&self, target: &Arc) { - let mut write_lock = self.notify.unwrapped_write(); + let mut write_lock = self.notify.write(); *write_lock = Some(Arc::downgrade(target)); } fn notify(&self) -> Option> { - let read_lock = self.notify.unwrapped_read(); + let read_lock = self.notify.read(); read_lock.as_ref().and_then(|weak| weak.upgrade()) } @@ -293,7 +294,7 @@ impl Client { // Enact Verified Block let parent = chain_has_parent.unwrap(); let last_hashes = self.build_last_hashes(header.parent_hash.clone()); - let db = self.state_db.locked().boxed_clone(); + let db = self.state_db.lock().boxed_clone(); let enact_result = enact_verified(&block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone()); if let Err(e) = enact_result { @@ -369,7 +370,7 @@ impl Client { let route = self.commit_block(closed_block, &header.hash(), &block.bytes); import_results.push(route); - self.report.unwrapped_write().accrue_block(&block); + self.report.write().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } @@ -471,7 +472,7 @@ impl Client { }; self.block_header(id).and_then(|header| { - let db = self.state_db.locked().boxed_clone(); + let db = self.state_db.lock().boxed_clone(); // early exit for pruned blocks if db.is_pruned() && self.chain.best_block_number() >= block_number + HISTORY { @@ -487,7 +488,7 @@ impl Client { /// Get a copy of the best block's state. pub fn state(&self) -> State { State::from_existing( - self.state_db.locked().boxed_clone(), + self.state_db.lock().boxed_clone(), HeaderView::new(&self.best_block_header()).state_root(), self.engine.account_start_nonce(), self.trie_factory.clone()) @@ -501,8 +502,8 @@ impl Client { /// Get the report. pub fn report(&self) -> ClientReport { - let mut report = self.report.unwrapped_read().clone(); - report.state_db_mem = self.state_db.locked().mem_used(); + let mut report = self.report.read().clone(); + report.state_db_mem = self.state_db.lock().mem_used(); report } @@ -514,7 +515,7 @@ impl Client { match self.mode { Mode::Dark(timeout) => { - let mut ss = self.sleep_state.locked(); + let mut ss = self.sleep_state.lock(); if let Some(t) = ss.last_activity { if Instant::now() > t + timeout { self.sleep(); @@ -523,7 +524,7 @@ impl Client { } } Mode::Passive(timeout, wakeup_after) => { - let mut ss = self.sleep_state.locked(); + let mut ss = self.sleep_state.lock(); let now = Instant::now(); if let Some(t) = ss.last_activity { if now > t + timeout { @@ -600,14 +601,14 @@ impl Client { } else { trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing."); // TODO: Consider uncommenting. - //*self.last_activity.locked() = Some(Instant::now()); + //*self.last_activity.lock() = Some(Instant::now()); } } } /// Notify us that the network has been started. pub fn network_started(&self, url: &str) { - let mut previous_enode = self.previous_enode.locked(); + let mut previous_enode = self.previous_enode.lock(); if let Some(ref u) = *previous_enode { if u == url { return; @@ -661,7 +662,7 @@ impl BlockChainClient for Client { fn keep_alive(&self) { if self.mode != Mode::Active { self.wake_up(); - (*self.sleep_state.locked()).last_activity = Some(Instant::now()); + (*self.sleep_state.lock()).last_activity = Some(Instant::now()); } } @@ -785,7 +786,7 @@ impl BlockChainClient for Client { } fn state_data(&self, hash: &H256) -> Option { - self.state_db.locked().state(hash) + self.state_db.lock().state(hash) } fn block_receipts(&self, hash: &H256) -> Option { @@ -946,7 +947,7 @@ impl MiningBlockChainClient for Client { &self.vm_factory, self.trie_factory.clone(), false, // TODO: this will need to be parameterised once we want to do immediate mining insertion. - self.state_db.locked().boxed_clone(), + self.state_db.lock().boxed_clone(), &self.chain.block_header(&h).expect("h is best block hash: so it's header must exist: qed"), self.build_last_hashes(h.clone()), author, diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index f7387e6ab..b60d537a1 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -115,38 +115,38 @@ impl TestBlockChainClient { vm_factory: EvmFactory::new(VMType::Interpreter), }; client.add_blocks(1, EachBlockWith::Nothing); // add genesis block - client.genesis_hash = client.last_hash.unwrapped_read().clone(); + client.genesis_hash = client.last_hash.read().clone(); client } /// Set the transaction receipt result pub fn set_transaction_receipt(&self, id: TransactionID, receipt: LocalizedReceipt) { - self.receipts.unwrapped_write().insert(id, receipt); + self.receipts.write().insert(id, receipt); } /// Set the execution result. pub fn set_execution_result(&self, result: Executed) { - *self.execution_result.unwrapped_write() = Some(result); + *self.execution_result.write() = Some(result); } /// Set the balance of account `address` to `balance`. pub fn set_balance(&self, address: Address, balance: U256) { - self.balances.unwrapped_write().insert(address, balance); + self.balances.write().insert(address, balance); } /// Set nonce of account `address` to `nonce`. pub fn set_nonce(&self, address: Address, nonce: U256) { - self.nonces.unwrapped_write().insert(address, nonce); + self.nonces.write().insert(address, nonce); } /// Set `code` at `address`. pub fn set_code(&self, address: Address, code: Bytes) { - self.code.unwrapped_write().insert(address, code); + self.code.write().insert(address, code); } /// Set storage `position` to `value` for account `address`. pub fn set_storage(&self, address: Address, position: H256, value: H256) { - self.storage.unwrapped_write().insert((address, position), value); + self.storage.write().insert((address, position), value); } /// Set block queue size for testing @@ -156,11 +156,11 @@ impl TestBlockChainClient { /// Add blocks to test client. pub fn add_blocks(&self, count: usize, with: EachBlockWith) { - let len = self.numbers.unwrapped_read().len(); + let len = self.numbers.read().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); header.difficulty = From::from(n); - header.parent_hash = self.last_hash.unwrapped_read().clone(); + header.parent_hash = self.last_hash.read().clone(); header.number = n as BlockNumber; header.gas_limit = U256::from(1_000_000); let uncles = match with { @@ -168,7 +168,7 @@ impl TestBlockChainClient { let mut uncles = RlpStream::new_list(1); let mut uncle_header = BlockHeader::new(); uncle_header.difficulty = From::from(n); - uncle_header.parent_hash = self.last_hash.unwrapped_read().clone(); + uncle_header.parent_hash = self.last_hash.read().clone(); uncle_header.number = n as BlockNumber; uncles.append(&uncle_header); header.uncles_hash = uncles.as_raw().sha3(); @@ -181,7 +181,7 @@ impl TestBlockChainClient { let mut txs = RlpStream::new_list(1); let keypair = KeyPair::create().unwrap(); // Update nonces value - self.nonces.unwrapped_write().insert(keypair.address(), U256::one()); + self.nonces.write().insert(keypair.address(), U256::one()); let tx = Transaction { action: Action::Create, value: U256::from(100), @@ -214,7 +214,7 @@ impl TestBlockChainClient { rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(&rlp::NULL_RLP, 1); - self.blocks.unwrapped_write().insert(hash, rlp.out()); + self.blocks.write().insert(hash, rlp.out()); } /// Make a bad block by setting invalid parent hash. @@ -226,12 +226,12 @@ impl TestBlockChainClient { rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(&rlp::NULL_RLP, 1); - self.blocks.unwrapped_write().insert(hash, rlp.out()); + self.blocks.write().insert(hash, rlp.out()); } /// TODO: pub fn block_hash_delta_minus(&mut self, delta: usize) -> H256 { - let blocks_read = self.numbers.unwrapped_read(); + let blocks_read = self.numbers.read(); let index = blocks_read.len() - delta; blocks_read[&index].clone() } @@ -239,9 +239,9 @@ impl TestBlockChainClient { fn block_hash(&self, id: BlockID) -> Option { match id { BlockID::Hash(hash) => Some(hash), - BlockID::Number(n) => self.numbers.unwrapped_read().get(&(n as usize)).cloned(), - BlockID::Earliest => self.numbers.unwrapped_read().get(&0).cloned(), - BlockID::Latest => self.numbers.unwrapped_read().get(&(self.numbers.unwrapped_read().len() - 1)).cloned() + BlockID::Number(n) => self.numbers.read().get(&(n as usize)).cloned(), + BlockID::Earliest => self.numbers.read().get(&0).cloned(), + BlockID::Latest => self.numbers.read().get(&(self.numbers.read().len() - 1)).cloned() } } } @@ -288,7 +288,7 @@ impl MiningBlockChainClient for TestBlockChainClient { impl BlockChainClient for TestBlockChainClient { fn call(&self, _t: &SignedTransaction, _analytics: CallAnalytics) -> Result { - Ok(self.execution_result.unwrapped_read().clone().unwrap()) + Ok(self.execution_result.read().clone().unwrap()) } fn block_total_difficulty(&self, _id: BlockID) -> Option { @@ -301,7 +301,7 @@ impl BlockChainClient for TestBlockChainClient { fn nonce(&self, address: &Address, id: BlockID) -> Option { match id { - BlockID::Latest => Some(self.nonces.unwrapped_read().get(address).cloned().unwrap_or_else(U256::zero)), + BlockID::Latest => Some(self.nonces.read().get(address).cloned().unwrap_or_else(U256::zero)), _ => None, } } @@ -311,12 +311,12 @@ impl BlockChainClient for TestBlockChainClient { } fn code(&self, address: &Address) -> Option { - self.code.unwrapped_read().get(address).cloned() + self.code.read().get(address).cloned() } fn balance(&self, address: &Address, id: BlockID) -> Option { if let BlockID::Latest = id { - Some(self.balances.unwrapped_read().get(address).cloned().unwrap_or_else(U256::zero)) + Some(self.balances.read().get(address).cloned().unwrap_or_else(U256::zero)) } else { None } @@ -328,7 +328,7 @@ impl BlockChainClient for TestBlockChainClient { fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option { if let BlockID::Latest = id { - Some(self.storage.unwrapped_read().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new)) + Some(self.storage.read().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new)) } else { None } @@ -343,7 +343,7 @@ impl BlockChainClient for TestBlockChainClient { } fn transaction_receipt(&self, id: TransactionID) -> Option { - self.receipts.unwrapped_read().get(&id).cloned() + self.receipts.read().get(&id).cloned() } fn blocks_with_bloom(&self, _bloom: &H2048, _from_block: BlockID, _to_block: BlockID) -> Option> { @@ -359,11 +359,11 @@ impl BlockChainClient for TestBlockChainClient { } fn block_header(&self, id: BlockID) -> Option { - self.block_hash(id).and_then(|hash| self.blocks.unwrapped_read().get(&hash).map(|r| Rlp::new(r).at(0).as_raw().to_vec())) + self.block_hash(id).and_then(|hash| self.blocks.read().get(&hash).map(|r| Rlp::new(r).at(0).as_raw().to_vec())) } fn block_body(&self, id: BlockID) -> Option { - self.block_hash(id).and_then(|hash| self.blocks.unwrapped_read().get(&hash).map(|r| { + self.block_hash(id).and_then(|hash| self.blocks.read().get(&hash).map(|r| { let mut stream = RlpStream::new_list(2); stream.append_raw(Rlp::new(&r).at(1).as_raw(), 1); stream.append_raw(Rlp::new(&r).at(2).as_raw(), 1); @@ -372,13 +372,13 @@ impl BlockChainClient for TestBlockChainClient { } fn block(&self, id: BlockID) -> Option { - self.block_hash(id).and_then(|hash| self.blocks.unwrapped_read().get(&hash).cloned()) + self.block_hash(id).and_then(|hash| self.blocks.read().get(&hash).cloned()) } fn block_status(&self, id: BlockID) -> BlockStatus { match id { - BlockID::Number(number) if (number as usize) < self.blocks.unwrapped_read().len() => BlockStatus::InChain, - BlockID::Hash(ref hash) if self.blocks.unwrapped_read().get(hash).is_some() => BlockStatus::InChain, + BlockID::Number(number) if (number as usize) < self.blocks.read().len() => BlockStatus::InChain, + BlockID::Hash(ref hash) if self.blocks.read().get(hash).is_some() => BlockStatus::InChain, _ => BlockStatus::Unknown } } @@ -389,7 +389,7 @@ impl BlockChainClient for TestBlockChainClient { ancestor: H256::new(), index: 0, blocks: { - let numbers_read = self.numbers.unwrapped_read(); + let numbers_read = self.numbers.read(); let mut adding = false; let mut blocks = Vec::new(); @@ -446,11 +446,11 @@ impl BlockChainClient for TestBlockChainClient { let header = Rlp::new(&b).val_at::(0); let h = header.hash(); let number: usize = header.number as usize; - if number > self.blocks.unwrapped_read().len() { - panic!("Unexpected block number. Expected {}, got {}", self.blocks.unwrapped_read().len(), number); + if number > self.blocks.read().len() { + panic!("Unexpected block number. Expected {}, got {}", self.blocks.read().len(), number); } if number > 0 { - match self.blocks.unwrapped_read().get(&header.parent_hash) { + match self.blocks.read().get(&header.parent_hash) { Some(parent) => { let parent = Rlp::new(parent).val_at::(0); if parent.number != (header.number - 1) { @@ -462,27 +462,27 @@ impl BlockChainClient for TestBlockChainClient { } } } - let len = self.numbers.unwrapped_read().len(); + let len = self.numbers.read().len(); if number == len { { - let mut difficulty = self.difficulty.unwrapped_write(); + let mut difficulty = self.difficulty.write(); *difficulty.deref_mut() = *difficulty.deref() + header.difficulty; } - mem::replace(self.last_hash.unwrapped_write().deref_mut(), h.clone()); - self.blocks.unwrapped_write().insert(h.clone(), b); - self.numbers.unwrapped_write().insert(number, h.clone()); + mem::replace(self.last_hash.write().deref_mut(), h.clone()); + self.blocks.write().insert(h.clone(), b); + self.numbers.write().insert(number, h.clone()); let mut parent_hash = header.parent_hash; if number > 0 { let mut n = number - 1; - while n > 0 && self.numbers.unwrapped_read()[&n] != parent_hash { - *self.numbers.unwrapped_write().get_mut(&n).unwrap() = parent_hash.clone(); + while n > 0 && self.numbers.read()[&n] != parent_hash { + *self.numbers.write().get_mut(&n).unwrap() = parent_hash.clone(); n -= 1; - parent_hash = Rlp::new(&self.blocks.unwrapped_read()[&parent_hash]).val_at::(0).parent_hash; + parent_hash = Rlp::new(&self.blocks.read()[&parent_hash]).val_at::(0).parent_hash; } } } else { - self.blocks.unwrapped_write().insert(h.clone(), b.to_vec()); + self.blocks.write().insert(h.clone(), b.to_vec()); } Ok(h) } @@ -503,11 +503,11 @@ impl BlockChainClient for TestBlockChainClient { fn chain_info(&self) -> BlockChainInfo { BlockChainInfo { - total_difficulty: *self.difficulty.unwrapped_read(), - pending_total_difficulty: *self.difficulty.unwrapped_read(), + total_difficulty: *self.difficulty.read(), + pending_total_difficulty: *self.difficulty.read(), genesis_hash: self.genesis_hash.clone(), - best_block_hash: self.last_hash.unwrapped_read().clone(), - best_block_number: self.blocks.unwrapped_read().len() as BlockNumber - 1, + best_block_hash: self.last_hash.read().clone(), + best_block_number: self.blocks.read().len() as BlockNumber - 1, } } diff --git a/ethcore/src/db.rs b/ethcore/src/db.rs index 576aaef5b..57b4cfdc6 100644 --- a/ethcore/src/db.rs +++ b/ethcore/src/db.rs @@ -18,11 +18,11 @@ use std::ops::Deref; use std::hash::Hash; -use std::sync::RwLock; use std::collections::HashMap; -use util::{DBTransaction, Database, RwLockable}; +use util::{DBTransaction, Database, RwLock}; use util::rlp::{encode, Encodable, decode, Decodable}; + #[derive(Clone, Copy)] pub enum CacheUpdatePolicy { Overwrite, @@ -115,14 +115,14 @@ pub trait Readable { T: Clone + Decodable, C: Cache { { - let read = cache.unwrapped_read(); + let read = cache.read(); if let Some(v) = read.get(key) { return Some(v.clone()); } } self.read(key).map(|value: T|{ - let mut write = cache.unwrapped_write(); + let mut write = cache.write(); write.insert(key.clone(), value.clone()); value }) @@ -137,7 +137,7 @@ pub trait Readable { R: Deref, C: Cache { { - let read = cache.unwrapped_read(); + let read = cache.read(); if read.get(key).is_some() { return true; } diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 86e9bb288..b8de63adb 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -97,7 +97,6 @@ extern crate ethash; pub extern crate ethstore; extern crate semver; extern crate ethcore_ipc_nano as nanoipc; - extern crate ethcore_devtools as devtools; #[cfg(feature = "jit" )] extern crate evmjit; diff --git a/ethcore/src/miner/external.rs b/ethcore/src/miner/external.rs index 650df228e..ef6875930 100644 --- a/ethcore/src/miner/external.rs +++ b/ethcore/src/miner/external.rs @@ -15,8 +15,8 @@ // along with Parity. If not, see . use std::collections::HashMap; -use std::sync::{Arc, RwLock}; -use util::{RwLockable, U256, H256}; +use std::sync::Arc; +use util::{RwLock, U256, H256}; /// External miner interface. pub trait ExternalMinerService: Send + Sync { @@ -54,15 +54,15 @@ impl ExternalMiner { impl ExternalMinerService for ExternalMiner { fn submit_hashrate(&self, hashrate: U256, id: H256) { - self.hashrates.unwrapped_write().insert(id, hashrate); + self.hashrates.write().insert(id, hashrate); } fn hashrate(&self) -> U256 { - self.hashrates.unwrapped_read().iter().fold(U256::from(0), |sum, (_, v)| sum + *v) + self.hashrates.read().iter().fold(U256::from(0), |sum, (_, v)| sum + *v) } fn is_mining(&self) -> bool { - !self.hashrates.unwrapped_read().is_empty() + !self.hashrates.read().is_empty() } } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 800f8f492..4c1741959 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use rayon::prelude::*; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{self, AtomicBool}; use std::time::{Instant, Duration}; use util::*; @@ -26,7 +26,7 @@ use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptio use block::{ClosedBlock, IsBlock}; use error::*; use transaction::SignedTransaction; -use receipt::{Receipt}; +use receipt::Receipt; use spec::Spec; use engine::Engine; use miner::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionOrigin}; @@ -34,7 +34,6 @@ use miner::work_notify::WorkPoster; use client::TransactionImportResult; use miner::price_info::PriceInfo; - /// Different possible definitions for pending transaction set. #[derive(Debug)] pub enum PendingSet { @@ -236,16 +235,16 @@ impl Miner { { trace!(target: "miner", "recalibrating..."); let txq = self.transaction_queue.clone(); - self.gas_pricer.lock().unwrap().recalibrate(move |price| { + self.gas_pricer.lock().recalibrate(move |price| { trace!(target: "miner", "Got gas price! {}", price); - txq.lock().unwrap().set_minimal_gas_price(price); + txq.lock().set_minimal_gas_price(price); }); trace!(target: "miner", "done recalibration."); } let (transactions, mut open_block, original_work_hash) = { - let transactions = {self.transaction_queue.locked().top_transactions()}; - let mut sealing_work = self.sealing_work.locked(); + let transactions = {self.transaction_queue.lock().top_transactions()}; + let mut sealing_work = self.sealing_work.lock(); let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); let best_hash = chain.best_block_header().sha3(); /* @@ -315,7 +314,7 @@ impl Miner { }; { - let mut queue = self.transaction_queue.locked(); + let mut queue = self.transaction_queue.lock(); for hash in invalid_transactions.into_iter() { queue.remove_invalid(&hash, &fetch_account); } @@ -346,7 +345,7 @@ impl Miner { } let (work, is_new) = { - let mut sealing_work = self.sealing_work.locked(); + let mut sealing_work = self.sealing_work.lock(); let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); trace!(target: "miner", "Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash()); let (work, is_new) = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) { @@ -374,14 +373,14 @@ impl Miner { fn update_gas_limit(&self, chain: &MiningBlockChainClient) { let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit(); - let mut queue = self.transaction_queue.locked(); + let mut queue = self.transaction_queue.lock(); queue.set_gas_limit(gas_limit); } /// Returns true if we had to prepare new pending block fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool { trace!(target: "miner", "enable_and_prepare_sealing: entering"); - let have_work = self.sealing_work.locked().peek_last_ref().is_some(); + let have_work = self.sealing_work.lock().peek_last_ref().is_some(); trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work); if !have_work { // -------------------------------------------------------------------------- @@ -391,7 +390,7 @@ impl Miner { self.sealing_enabled.store(true, atomic::Ordering::Relaxed); self.prepare_sealing(chain); } - let mut sealing_block_last_request = self.sealing_block_last_request.locked(); + let mut sealing_block_last_request = self.sealing_block_last_request.lock(); let best_number = chain.chain_info().best_block_number; if *sealing_block_last_request != best_number { trace!(target: "miner", "enable_and_prepare_sealing: Miner received request (was {}, now {}) - waking up.", *sealing_block_last_request, best_number); @@ -416,7 +415,7 @@ impl Miner { } /// Are we allowed to do a non-mandatory reseal? - fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.locked() } + fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock() } } const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5; @@ -424,7 +423,7 @@ const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5; impl MinerService for Miner { fn clear_and_reset(&self, chain: &MiningBlockChainClient) { - self.transaction_queue.locked().clear(); + self.transaction_queue.lock().clear(); // -------------------------------------------------------------------------- // | NOTE Code below requires transaction_queue and sealing_work locks. | // | Make sure to release the locks before calling that method. | @@ -433,8 +432,8 @@ impl MinerService for Miner { } fn status(&self) -> MinerStatus { - let status = self.transaction_queue.locked().status(); - let sealing_work = self.sealing_work.locked(); + let status = self.transaction_queue.lock().status(); + let sealing_work = self.sealing_work.lock(); MinerStatus { transactions_in_pending_queue: status.pending, transactions_in_future_queue: status.future, @@ -443,7 +442,7 @@ impl MinerService for Miner { } fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction, analytics: CallAnalytics) -> Result { - let sealing_work = self.sealing_work.locked(); + let sealing_work = self.sealing_work.lock(); match sealing_work.peek_last_ref() { Some(work) => { let block = work.block(); @@ -490,7 +489,7 @@ impl MinerService for Miner { } fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 { - let sealing_work = self.sealing_work.locked(); + let sealing_work = self.sealing_work.lock(); sealing_work.peek_last_ref().map_or_else( || chain.latest_balance(address), |b| b.block().fields().state.balance(address) @@ -498,7 +497,7 @@ impl MinerService for Miner { } fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 { - let sealing_work = self.sealing_work.locked(); + let sealing_work = self.sealing_work.lock(); sealing_work.peek_last_ref().map_or_else( || chain.latest_storage_at(address, position), |b| b.block().fields().state.storage_at(address, position) @@ -506,79 +505,79 @@ impl MinerService for Miner { } fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 { - let sealing_work = self.sealing_work.locked(); + let sealing_work = self.sealing_work.lock(); sealing_work.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address)) } fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option { - let sealing_work = self.sealing_work.locked(); + let sealing_work = self.sealing_work.lock(); sealing_work.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address)) } fn set_author(&self, author: Address) { - *self.author.unwrapped_write() = author; + *self.author.write() = author; } fn set_extra_data(&self, extra_data: Bytes) { - *self.extra_data.unwrapped_write() = extra_data; + *self.extra_data.write() = extra_data; } /// Set the gas limit we wish to target when sealing a new block. fn set_gas_floor_target(&self, target: U256) { - self.gas_range_target.unwrapped_write().0 = target; + self.gas_range_target.write().0 = target; } fn set_gas_ceil_target(&self, target: U256) { - self.gas_range_target.unwrapped_write().1 = target; + self.gas_range_target.write().1 = target; } fn set_minimal_gas_price(&self, min_gas_price: U256) { - self.transaction_queue.locked().set_minimal_gas_price(min_gas_price); + self.transaction_queue.lock().set_minimal_gas_price(min_gas_price); } fn minimal_gas_price(&self) -> U256 { - *self.transaction_queue.locked().minimal_gas_price() + *self.transaction_queue.lock().minimal_gas_price() } fn sensible_gas_price(&self) -> U256 { // 10% above our minimum. - *self.transaction_queue.locked().minimal_gas_price() * 110.into() / 100.into() + *self.transaction_queue.lock().minimal_gas_price() * 110.into() / 100.into() } fn sensible_gas_limit(&self) -> U256 { - self.gas_range_target.unwrapped_read().0 / 5.into() + self.gas_range_target.read().0 / 5.into() } fn transactions_limit(&self) -> usize { - self.transaction_queue.locked().limit() + self.transaction_queue.lock().limit() } fn set_transactions_limit(&self, limit: usize) { - self.transaction_queue.locked().set_limit(limit) + self.transaction_queue.lock().set_limit(limit) } fn set_tx_gas_limit(&self, limit: U256) { - self.transaction_queue.locked().set_tx_gas_limit(limit) + self.transaction_queue.lock().set_tx_gas_limit(limit) } /// Get the author that we will seal blocks as. fn author(&self) -> Address { - *self.author.unwrapped_read() + *self.author.read() } /// Get the extra_data that we will seal blocks with. fn extra_data(&self) -> Bytes { - self.extra_data.unwrapped_read().clone() + self.extra_data.read().clone() } /// Get the gas limit we wish to target when sealing a new block. fn gas_floor_target(&self) -> U256 { - self.gas_range_target.unwrapped_read().0 + self.gas_range_target.read().0 } /// Get the gas limit we wish to target when sealing a new block. fn gas_ceil_target(&self) -> U256 { - self.gas_range_target.unwrapped_read().1 + self.gas_range_target.read().1 } fn import_external_transactions( @@ -588,7 +587,7 @@ impl MinerService for Miner { ) -> Vec> { let results = { - let mut transaction_queue = self.transaction_queue.locked(); + let mut transaction_queue = self.transaction_queue.lock(); self.add_transactions_to_queue( chain, transactions, TransactionOrigin::External, &mut transaction_queue ) @@ -615,7 +614,7 @@ impl MinerService for Miner { let imported = { // Be sure to release the lock before we call enable_and_prepare_sealing - let mut transaction_queue = self.transaction_queue.locked(); + let mut transaction_queue = self.transaction_queue.lock(); let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap(); match import { @@ -651,13 +650,13 @@ impl MinerService for Miner { } fn all_transactions(&self) -> Vec { - let queue = self.transaction_queue.locked(); + let queue = self.transaction_queue.lock(); queue.top_transactions() } fn pending_transactions(&self) -> Vec { - let queue = self.transaction_queue.locked(); - let sw = self.sealing_work.locked(); + let queue = self.transaction_queue.lock(); + let sw = self.sealing_work.lock(); // TODO: should only use the sealing_work when it's current (it could be an old block) let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { true => sw.peek_last_ref(), @@ -670,8 +669,8 @@ impl MinerService for Miner { } fn pending_transactions_hashes(&self) -> Vec { - let queue = self.transaction_queue.locked(); - let sw = self.sealing_work.locked(); + let queue = self.transaction_queue.lock(); + let sw = self.sealing_work.lock(); let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { true => sw.peek_last_ref(), false => None, @@ -683,8 +682,8 @@ impl MinerService for Miner { } fn transaction(&self, hash: &H256) -> Option { - let queue = self.transaction_queue.locked(); - let sw = self.sealing_work.locked(); + let queue = self.transaction_queue.lock(); + let sw = self.sealing_work.lock(); let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { true => sw.peek_last_ref(), false => None, @@ -696,7 +695,7 @@ impl MinerService for Miner { } fn pending_receipts(&self) -> BTreeMap { - match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.locked().peek_last_ref()) { + match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().peek_last_ref()) { (true, Some(pending)) => { let hashes = pending.transactions() .iter() @@ -711,14 +710,14 @@ impl MinerService for Miner { } fn last_nonce(&self, address: &Address) -> Option { - self.transaction_queue.locked().last_nonce(address) + self.transaction_queue.lock().last_nonce(address) } fn update_sealing(&self, chain: &MiningBlockChainClient) { if self.sealing_enabled.load(atomic::Ordering::Relaxed) { let current_no = chain.chain_info().best_block_number; - let has_local_transactions = self.transaction_queue.locked().has_local_pending_transactions(); - let last_request = *self.sealing_block_last_request.locked(); + let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions(); + let last_request = *self.sealing_block_last_request.lock(); let should_disable_sealing = !self.forced_sealing() && !has_local_transactions && current_no > last_request @@ -727,9 +726,9 @@ impl MinerService for Miner { if should_disable_sealing { trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request); self.sealing_enabled.store(false, atomic::Ordering::Relaxed); - self.sealing_work.locked().reset(); + self.sealing_work.lock().reset(); } else { - *self.next_allowed_reseal.locked() = Instant::now() + self.options.reseal_min_period; + *self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period; // -------------------------------------------------------------------------- // | NOTE Code below requires transaction_queue and sealing_work locks. | // | Make sure to release the locks before calling that method. | @@ -743,14 +742,14 @@ impl MinerService for Miner { trace!(target: "miner", "map_sealing_work: entering"); self.enable_and_prepare_sealing(chain); trace!(target: "miner", "map_sealing_work: sealing prepared"); - let mut sealing_work = self.sealing_work.locked(); + let mut sealing_work = self.sealing_work.lock(); let ret = sealing_work.use_last_ref(); trace!(target: "miner", "map_sealing_work: leaving use_last_ref={:?}", ret.as_ref().map(|b| b.block().fields().header.hash())); ret.map(f) } fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error> { - let result = if let Some(b) = self.sealing_work.locked().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { + let result = if let Some(b) = self.sealing_work.lock().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { b.lock().try_seal(self.engine(), seal).or_else(|_| { warn!(target: "miner", "Mined solution rejected: Invalid."); Err(Error::PowInvalid) @@ -797,7 +796,7 @@ impl MinerService for Miner { .par_iter() .map(|h| fetch_transactions(chain, h)); out_of_chain.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.locked(); + let mut transaction_queue = self.transaction_queue.lock(); let _ = self.add_transactions_to_queue( chain, txs, TransactionOrigin::External, &mut transaction_queue ); @@ -811,7 +810,7 @@ impl MinerService for Miner { .map(|h: &H256| fetch_transactions(chain, h)); in_chain.for_each(|mut txs| { - let mut transaction_queue = self.transaction_queue.locked(); + let mut transaction_queue = self.transaction_queue.lock(); let to_remove = txs.drain(..) .map(|tx| { diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index a72ae0a8f..4b8f01c0e 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -38,7 +38,7 @@ //! assert_eq!(miner.status().transactions_in_pending_queue, 0); //! //! // Check block for sealing -//! //assert!(miner.sealing_block(client.deref()).locked().is_some()); +//! //assert!(miner.sealing_block(client.deref()).lock().is_some()); //! } //! ``` diff --git a/ethcore/src/miner/price_info.rs b/ethcore/src/miner/price_info.rs index ac2f81cd0..ee4220c0c 100644 --- a/ethcore/src/miner/price_info.rs +++ b/ethcore/src/miner/price_info.rs @@ -80,14 +80,17 @@ impl PriceInfo { //#[ignore] #[test] fn should_get_price_info() { - use std::sync::{Condvar, Mutex, Arc}; + use std::sync::Arc; use std::time::Duration; use util::log::init_log; + use util::{Condvar, Mutex}; + init_log(); let done = Arc::new((Mutex::new(PriceInfo { ethusd: 0f32 }), Condvar::new())); let rdone = done.clone(); - PriceInfo::get(move |price| { let mut p = rdone.0.lock().unwrap(); *p = price; rdone.1.notify_one(); }).unwrap(); - let p = done.1.wait_timeout(done.0.lock().unwrap(), Duration::from_millis(10000)).unwrap(); - assert!(!p.1.timed_out()); - assert!(p.0.ethusd != 0f32); + PriceInfo::get(move |price| { let mut p = rdone.0.lock(); *p = price; rdone.1.notify_one(); }).unwrap(); + let mut p = done.0.lock(); + let t = done.1.wait_for(&mut p, Duration::from_millis(10000)); + assert!(!t.timed_out()); + assert!(p.ethusd != 0f32); } \ No newline at end of file diff --git a/ethcore/src/miner/work_notify.rs b/ethcore/src/miner/work_notify.rs index a1b6f0a8b..b9952e14b 100644 --- a/ethcore/src/miner/work_notify.rs +++ b/ethcore/src/miner/work_notify.rs @@ -61,13 +61,13 @@ impl WorkPoster { pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { // TODO: move this to engine let target = Ethash::difficulty_to_boundary(&difficulty); - let seed_hash = &self.seed_compute.locked().get_seedhash(number); + let seed_hash = &self.seed_compute.lock().get_seedhash(number); let seed_hash = H256::from_slice(&seed_hash[..]); let body = format!( r#"{{ "result": ["0x{}","0x{}","0x{}","0x{:x}"] }}"#, pow_hash.hex(), seed_hash.hex(), target.hex(), number ); - let mut client = self.client.locked(); + let mut client = self.client.lock(); for u in &self.urls { if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) { warn!("Error sending HTTP notification to {} : {}, retrying", u, e); diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index 1f7bd6fd8..e71d7d432 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -136,10 +136,10 @@ impl Spec { /// Return the state root for the genesis state, memoising accordingly. pub fn state_root(&self) -> H256 { - if self.state_root_memo.unwrapped_read().is_none() { - *self.state_root_memo.unwrapped_write() = Some(self.genesis_state.root()); + if self.state_root_memo.read().is_none() { + *self.state_root_memo.write() = Some(self.genesis_state.root()); } - self.state_root_memo.unwrapped_read().as_ref().unwrap().clone() + self.state_root_memo.read().as_ref().unwrap().clone() } /// Get the known knodes of the network in enode format. @@ -209,12 +209,12 @@ impl Spec { /// Alter the value of the genesis state. pub fn set_genesis_state(&mut self, s: PodState) { self.genesis_state = s; - *self.state_root_memo.unwrapped_write() = None; + *self.state_root_memo.write() = None; } /// Returns `false` if the memoized state root is invalid. `true` otherwise. pub fn is_state_root_valid(&self) -> bool { - self.state_root_memo.unwrapped_read().clone().map_or(true, |sr| sr == self.genesis_state.root()) + self.state_root_memo.read().clone().map_or(true, |sr| sr == self.genesis_state.root()) } /// Ensure that the given state DB has the trie nodes in for the genesis state. diff --git a/ethcore/src/trace/db.rs b/ethcore/src/trace/db.rs index a49d28924..cac96bcbe 100644 --- a/ethcore/src/trace/db.rs +++ b/ethcore/src/trace/db.rs @@ -18,17 +18,18 @@ use std::ptr; use std::ops::{Deref, DerefMut}; use std::collections::HashMap; -use std::sync::{RwLock, Arc}; +use std::sync::Arc; use std::path::Path; use bloomchain::{Number, Config as BloomConfig}; use bloomchain::group::{BloomGroupDatabase, BloomGroupChain, GroupPosition, BloomGroup}; -use util::{H256, H264, Database, DatabaseConfig, DBTransaction, RwLockable}; +use util::{H256, H264, Database, DatabaseConfig, DBTransaction, RwLock}; use header::BlockNumber; use trace::{BlockTraces, LocalizedTrace, Config, Switch, Filter, Database as TraceDatabase, ImportRequest, DatabaseExtras, Error}; use db::{Key, Writable, Readable, CacheUpdatePolicy}; use blooms; use super::flat::{FlatTrace, FlatBlockTraces, FlatTransactionTraces}; + const TRACE_DB_VER: &'static [u8] = b"1.0"; #[derive(Debug, Copy, Clone)] @@ -231,7 +232,7 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { // at first, let's insert new block traces { - let mut traces = self.traces.unwrapped_write(); + let mut traces = self.traces.write(); // it's important to use overwrite here, // cause this value might be queried by hash later batch.write_with_cache(traces.deref_mut(), request.block_hash, request.traces, CacheUpdatePolicy::Overwrite); @@ -259,7 +260,7 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { .map(|p| (From::from(p.0), From::from(p.1))) .collect::>(); - let mut blooms = self.blooms.unwrapped_write(); + let mut blooms = self.blooms.write(); batch.extend_with_cache(blooms.deref_mut(), blooms_to_insert, CacheUpdatePolicy::Remove); } diff --git a/parity/informant.rs b/parity/informant.rs index 006cfc575..1a78db79b 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -19,10 +19,9 @@ use self::ansi_term::Colour::{White, Yellow, Green, Cyan, Blue, Purple}; use self::ansi_term::Style; use std::time::{Instant, Duration}; -use std::sync::RwLock; use std::ops::{Deref, DerefMut}; use ethsync::SyncStatus; -use util::{Uint, RwLockable, NetworkConfiguration}; +use util::{Uint, RwLock, NetworkConfiguration}; use ethcore::client::*; use number_prefix::{binary_prefix, Standalone, Prefixed}; @@ -75,20 +74,21 @@ impl Informant { } } + #[cfg_attr(feature="dev", allow(match_bool))] pub fn tick(&self, client: &Client, maybe_status: Option<(SyncStatus, NetworkConfiguration)>) { - let elapsed = self.last_tick.unwrapped_read().elapsed(); + let elapsed = self.last_tick.read().elapsed(); if elapsed < Duration::from_secs(5) { return; } - *self.last_tick.unwrapped_write() = Instant::now(); + *self.last_tick.write() = Instant::now(); let chain_info = client.chain_info(); let queue_info = client.queue_info(); let cache_info = client.blockchain_cache_info(); - let mut write_report = self.report.unwrapped_write(); + let mut write_report = self.report.write(); let report = client.report(); let paint = |c: Style, t: String| match self.with_color { @@ -97,8 +97,8 @@ impl Informant { }; if let (_, _, &Some(ref last_report)) = ( - self.chain_info.unwrapped_read().deref(), - self.cache_info.unwrapped_read().deref(), + self.chain_info.read().deref(), + self.cache_info.read().deref(), write_report.deref() ) { println!("{} {} {} blk/s {} tx/s {} Mgas/s {}{}+{} Qed {} db {} chain {} queue{}", @@ -133,8 +133,8 @@ impl Informant { ); } - *self.chain_info.unwrapped_write().deref_mut() = Some(chain_info); - *self.cache_info.unwrapped_write().deref_mut() = Some(cache_info); + *self.chain_info.write().deref_mut() = Some(chain_info); + *self.cache_info.write().deref_mut() = Some(cache_info); *write_report.deref_mut() = Some(report); } } diff --git a/parity/main.rs b/parity/main.rs index 7314b9f99..c1bd6f1ce 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -74,7 +74,7 @@ mod url; use std::io::{Write, Read, BufReader, BufRead}; use std::ops::Deref; -use std::sync::{Arc, Mutex, Condvar}; +use std::sync::Arc; use std::path::Path; use std::fs::File; use std::str::{FromStr, from_utf8}; @@ -82,7 +82,7 @@ use std::thread::sleep; use std::time::Duration; use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; -use util::{Lockable, H256, ToPretty, PayloadInfo, Bytes, Colour, Applyable, version, journaldb}; +use util::{H256, ToPretty, PayloadInfo, Bytes, Colour, Applyable, version, journaldb}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, ChainNotify, Mode}; @@ -93,6 +93,7 @@ use ethsync::EthSync; use ethcore::miner::{Miner, MinerService, ExternalMiner}; use migration::migrate; use informant::Informant; +use util::{Mutex, Condvar}; use die::*; use cli::print_version; @@ -593,7 +594,7 @@ fn wait_for_exit( // Wait for signal let mutex = Mutex::new(()); - let _ = exit.wait(mutex.locked()).unwrap(); + let _ = exit.wait(&mut mutex.lock()); info!("Finishing work, please wait..."); } diff --git a/rpc/rpctest/src/main.rs b/rpc/rpctest/src/main.rs index ed812cd79..7078793c2 100644 --- a/rpc/rpctest/src/main.rs +++ b/rpc/rpctest/src/main.rs @@ -41,7 +41,6 @@ use rpc::v1::tests::helpers::{TestSyncProvider, Config as SyncConfig, TestMinerS use rpc::v1::{Eth, EthClient, EthFilter, EthFilterClient}; use util::panics::MayPanic; use util::hash::Address; -use util::Lockable; const USAGE: &'static str = r#" Parity rpctest client. @@ -138,7 +137,7 @@ impl Configuration { panic_handler.on_panic(move |_reason| { e.notify_all(); }); let mutex = Mutex::new(()); - let _ = exit.wait(mutex.locked()).unwrap(); + let _ = exit.wait(mutex.lock()).unwrap(); } } diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index d41114f96..8477ed4db 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -16,10 +16,10 @@ use std::thread; use std::time::{Instant, Duration}; -use std::sync::{mpsc, Mutex, RwLock, Arc}; +use std::sync::{mpsc, Arc}; use std::collections::HashMap; use jsonrpc_core; -use util::{U256, Lockable, RwLockable}; +use util::{Mutex, RwLock, U256}; use v1::helpers::{TransactionRequest, TransactionConfirmation}; /// Result that can be returned from JSON RPC. @@ -110,7 +110,7 @@ pub struct ConfirmationPromise { impl ConfirmationToken { /// Submit solution to all listeners fn resolve(&self, result: Option) { - let mut res = self.result.locked(); + let mut res = self.result.lock(); *res = result.map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h)); // Notify listener self.handle.unpark(); @@ -142,7 +142,7 @@ impl ConfirmationPromise { // Park thread (may wake up spuriously) thread::park_timeout(deadline - now); // Take confirmation result - let res = self.result.locked(); + let res = self.result.lock(); // Check the result match *res { ConfirmationResult::Rejected => return None, @@ -183,7 +183,7 @@ impl ConfirmationsQueue { /// This method can be used only once (only single consumer of events can exist). pub fn start_listening(&self, listener: F) -> Result<(), QueueError> where F: Fn(QueueEvent) -> () { - let recv = self.receiver.locked().take(); + let recv = self.receiver.lock().take(); if let None = recv { return Err(QueueError::AlreadyUsed); } @@ -208,13 +208,13 @@ impl ConfirmationsQueue { /// Notifies receiver about the event happening in this queue. fn notify(&self, message: QueueEvent) { // We don't really care about the result - let _ = self.sender.locked().send(message); + let _ = self.sender.lock().send(message); } /// Removes transaction from this queue and notifies `ConfirmationPromise` holders about the result. /// Notifies also a receiver about that event. fn remove(&self, id: U256, result: Option) -> Option { - let token = self.queue.unwrapped_write().remove(&id); + let token = self.queue.write().remove(&id); if let Some(token) = token { // notify receiver about the event @@ -241,13 +241,13 @@ impl SigningQueue for ConfirmationsQueue { fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise { // Increment id let id = { - let mut last_id = self.id.locked(); + let mut last_id = self.id.lock(); *last_id = *last_id + U256::from(1); *last_id }; // Add request to queue let res = { - let mut queue = self.queue.unwrapped_write(); + let mut queue = self.queue.write(); queue.insert(id, ConfirmationToken { result: Arc::new(Mutex::new(ConfirmationResult::Waiting)), handle: thread::current(), @@ -266,7 +266,7 @@ impl SigningQueue for ConfirmationsQueue { } fn peek(&self, id: &U256) -> Option { - self.queue.unwrapped_read().get(id).map(|token| token.request.clone()) + self.queue.read().get(id).map(|token| token.request.clone()) } fn request_rejected(&self, id: U256) -> Option { @@ -280,17 +280,17 @@ impl SigningQueue for ConfirmationsQueue { } fn requests(&self) -> Vec { - let queue = self.queue.unwrapped_read(); + let queue = self.queue.read(); queue.values().map(|token| token.request.clone()).collect() } fn len(&self) -> usize { - let queue = self.queue.unwrapped_read(); + let queue = self.queue.read(); queue.len() } fn is_empty(&self) -> bool { - let queue = self.queue.unwrapped_read(); + let queue = self.queue.read(); queue.is_empty() } } @@ -300,8 +300,8 @@ impl SigningQueue for ConfirmationsQueue { mod test { use std::time::Duration; use std::thread; - use std::sync::{Arc, Mutex}; - use util::{Address, U256, H256, Lockable}; + use std::sync::Arc; + use util::{Address, U256, H256, Mutex}; use v1::helpers::{SigningQueue, ConfirmationsQueue, QueueEvent, TransactionRequest}; use v1::types::H256 as NH256; use jsonrpc_core::to_value; @@ -354,7 +354,7 @@ mod test { let r = received.clone(); let handle = thread::spawn(move || { q.start_listening(move |notification| { - let mut v = r.locked(); + let mut v = r.lock(); *v = Some(notification); }).expect("Should be closed nicely.") }); @@ -363,7 +363,7 @@ mod test { // then handle.join().expect("Thread should finish nicely"); - let r = received.locked().take(); + let r = received.lock().take(); assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1)))); } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 7cfe251fd..9133393e9 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -20,7 +20,7 @@ extern crate ethash; use std::thread; use std::time::{Instant, Duration}; -use std::sync::{Arc, Weak, Mutex}; +use std::sync::{Arc, Weak}; use std::ops::Deref; use ethsync::{SyncProvider, SyncState}; use ethcore::miner::{MinerService, ExternalMinerService}; @@ -28,7 +28,7 @@ use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; use util::rlp::{encode, decode, UntrustedRlp, View}; -use util::Lockable; +use util::Mutex; use ethcore::account_provider::AccountProvider; use ethcore::client::{MiningBlockChainClient, BlockID, TransactionID, UncleID}; use ethcore::header::Header as BlockHeader; @@ -562,7 +562,7 @@ impl Eth for EthClient where miner.map_sealing_work(client.deref(), |b| { let pow_hash = b.hash(); let target = Ethash::difficulty_to_boundary(b.block().header().difficulty()); - let seed_hash = self.seed_compute.locked().get_seedhash(b.block().header().number()); + let seed_hash = self.seed_compute.lock().get_seedhash(b.block().header().number()); let block_number = RpcU256::from(b.block().header().number()); to_value(&(RpcH256::from(pow_hash), RpcH256::from(seed_hash), RpcH256::from(target), block_number)) }).unwrap_or(Err(Error::internal_error())) // no work found. diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index c12783c1b..6eb0c60bb 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -17,19 +17,18 @@ //! Eth Filter RPC implementation use std::ops::Deref; -use std::sync::{Arc, Weak, Mutex}; +use std::sync::{Arc, Weak}; use std::collections::HashSet; use jsonrpc_core::*; -use util::Lockable; use ethcore::miner::MinerService; use ethcore::filter::Filter as EthcoreFilter; use ethcore::client::{BlockChainClient, BlockID}; +use util::Mutex; use v1::traits::EthFilter; use v1::types::{BlockNumber, Index, Filter, Log, H256 as RpcH256, U256 as RpcU256}; use v1::helpers::{PollFilter, PollManager}; use v1::impls::eth::pending_logs; - /// Eth filter rpc implementation. pub struct EthFilterClient where C: BlockChainClient, @@ -68,7 +67,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); from_params::<(Filter,)>(params) .and_then(|(filter,)| { - let mut polls = self.polls.locked(); + let mut polls = self.polls.lock(); let block_number = take_weak!(self.client).chain_info().best_block_number; let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); to_value(&RpcU256::from(id)) @@ -79,7 +78,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); match params { Params::None => { - let mut polls = self.polls.locked(); + let mut polls = self.polls.lock(); let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); to_value(&RpcU256::from(id)) }, @@ -91,7 +90,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); match params { Params::None => { - let mut polls = self.polls.locked(); + let mut polls = self.polls.lock(); let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); @@ -106,7 +105,7 @@ impl EthFilter for EthFilterClient where let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { - let mut polls = self.polls.locked(); + let mut polls = self.polls.lock(); match polls.poll_mut(&index.value()) { None => Ok(Value::Array(vec![] as Vec)), Some(filter) => match *filter { @@ -196,7 +195,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); from_params::<(Index,)>(params) .and_then(|(index,)| { - let mut polls = self.polls.locked(); + let mut polls = self.polls.lock(); match polls.poll(&index.value()) { Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { let include_pending = filter.to_block == Some(BlockNumber::Pending); @@ -222,7 +221,7 @@ impl EthFilter for EthFilterClient where try!(self.active()); from_params::<(Index,)>(params) .and_then(|(index,)| { - self.polls.locked().remove_poll(&index.value()); + self.polls.lock().remove_poll(&index.value()); to_value(&true) }) } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index b0a4f1115..8b67b8e09 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -16,7 +16,7 @@ //! Test implementation of miner service. -use util::{Address, H256, Bytes, U256, FixedHash, Uint, Lockable, RwLockable}; +use util::{Address, H256, Bytes, U256, FixedHash, Uint}; use util::standard::*; use ethcore::error::{Error, ExecutionError}; use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics}; @@ -76,68 +76,68 @@ impl MinerService for TestMinerService { } fn set_author(&self, author: Address) { - *self.author.unwrapped_write() = author; + *self.author.write() = author; } fn set_extra_data(&self, extra_data: Bytes) { - *self.extra_data.unwrapped_write() = extra_data; + *self.extra_data.write() = extra_data; } /// Set the lower gas limit we wish to target when sealing a new block. fn set_gas_floor_target(&self, target: U256) { - self.gas_range_target.unwrapped_write().0 = target; + self.gas_range_target.write().0 = target; } /// Set the upper gas limit we wish to target when sealing a new block. fn set_gas_ceil_target(&self, target: U256) { - self.gas_range_target.unwrapped_write().1 = target; + self.gas_range_target.write().1 = target; } fn set_minimal_gas_price(&self, min_gas_price: U256) { - *self.min_gas_price.unwrapped_write() = min_gas_price; + *self.min_gas_price.write() = min_gas_price; } fn set_transactions_limit(&self, limit: usize) { - *self.limit.unwrapped_write() = limit; + *self.limit.write() = limit; } fn set_tx_gas_limit(&self, limit: U256) { - *self.tx_gas_limit.unwrapped_write() = limit; + *self.tx_gas_limit.write() = limit; } fn transactions_limit(&self) -> usize { - *self.limit.unwrapped_read() + *self.limit.read() } fn author(&self) -> Address { - *self.author.unwrapped_read() + *self.author.read() } fn minimal_gas_price(&self) -> U256 { - *self.min_gas_price.unwrapped_read() + *self.min_gas_price.read() } fn extra_data(&self) -> Bytes { - self.extra_data.unwrapped_read().clone() + self.extra_data.read().clone() } fn gas_floor_target(&self) -> U256 { - self.gas_range_target.unwrapped_read().0 + self.gas_range_target.read().0 } fn gas_ceil_target(&self) -> U256 { - self.gas_range_target.unwrapped_read().1 + self.gas_range_target.read().1 } /// Imports transactions to transaction queue. fn import_external_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec) -> Vec> { // lets assume that all txs are valid - self.imported_transactions.locked().extend_from_slice(&transactions); + self.imported_transactions.lock().extend_from_slice(&transactions); for sender in transactions.iter().filter_map(|t| t.sender().ok()) { let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests"); - self.last_nonces.unwrapped_write().insert(sender, nonce + U256::from(1)); + self.last_nonces.write().insert(sender, nonce + U256::from(1)); } transactions .iter() @@ -152,11 +152,11 @@ impl MinerService for TestMinerService { // keep the pending nonces up to date if let Ok(ref sender) = transaction.sender() { let nonce = self.last_nonce(sender).unwrap_or(chain.latest_nonce(sender)); - self.last_nonces.unwrapped_write().insert(sender.clone(), nonce + U256::from(1)); + self.last_nonces.write().insert(sender.clone(), nonce + U256::from(1)); } // lets assume that all txs are valid - self.imported_transactions.locked().push(transaction); + self.imported_transactions.lock().push(transaction); Ok(TransactionImportResult::Current) } @@ -186,23 +186,23 @@ impl MinerService for TestMinerService { } fn transaction(&self, hash: &H256) -> Option { - self.pending_transactions.locked().get(hash).cloned() + self.pending_transactions.lock().get(hash).cloned() } fn all_transactions(&self) -> Vec { - self.pending_transactions.locked().values().cloned().collect() + self.pending_transactions.lock().values().cloned().collect() } fn pending_transactions(&self) -> Vec { - self.pending_transactions.locked().values().cloned().collect() + self.pending_transactions.lock().values().cloned().collect() } fn pending_receipts(&self) -> BTreeMap { - self.pending_receipts.locked().clone() + self.pending_receipts.lock().clone() } fn last_nonce(&self, address: &Address) -> Option { - self.last_nonces.unwrapped_read().get(address).cloned() + self.last_nonces.read().get(address).cloned() } /// Submit `seal` as a valid solution for the header of `pow_hash`. @@ -212,7 +212,7 @@ impl MinerService for TestMinerService { } fn balance(&self, _chain: &MiningBlockChainClient, address: &Address) -> U256 { - self.latest_closed_block.locked().as_ref().map_or_else(U256::zero, |b| b.block().fields().state.balance(address).clone()) + self.latest_closed_block.lock().as_ref().map_or_else(U256::zero, |b| b.block().fields().state.balance(address).clone()) } fn call(&self, _chain: &MiningBlockChainClient, _t: &SignedTransaction, _analytics: CallAnalytics) -> Result { @@ -220,7 +220,7 @@ impl MinerService for TestMinerService { } fn storage_at(&self, _chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 { - self.latest_closed_block.locked().as_ref().map_or_else(H256::default, |b| b.block().fields().state.storage_at(address, position).clone()) + self.latest_closed_block.lock().as_ref().map_or_else(H256::default, |b| b.block().fields().state.storage_at(address, position).clone()) } fn nonce(&self, _chain: &MiningBlockChainClient, address: &Address) -> U256 { @@ -230,7 +230,7 @@ impl MinerService for TestMinerService { } fn code(&self, _chain: &MiningBlockChainClient, address: &Address) -> Option { - self.latest_closed_block.locked().as_ref().map_or(None, |b| b.block().fields().state.code(address).clone()) + self.latest_closed_block.lock().as_ref().map_or(None, |b| b.block().fields().state.code(address).clone()) } } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 9dfb3ba27..94f7b4893 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,9 +16,8 @@ //! Test implementation of SyncProvider. -use util::{U256, RwLockable}; +use util::{RwLock, U256}; use ethsync::{SyncProvider, SyncStatus, SyncState}; -use std::sync::RwLock; /// TestSyncProvider config. pub struct Config { @@ -57,7 +56,7 @@ impl TestSyncProvider { impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { - self.status.unwrapped_read().clone() + self.status.read().clone() } } diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index 112624baf..688a43b24 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -16,11 +16,11 @@ use std::str::FromStr; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use jsonrpc_core::IoHandler; -use util::RwLockable; use util::hash::{Address, H256, FixedHash}; use util::numbers::{Uint, U256}; +use util::RwLock; use ethcore::account_provider::AccountProvider; use ethcore::client::{TestBlockChainClient, EachBlockWith, Executed, TransactionID}; use ethcore::log_entry::{LocalizedLogEntry, LogEntry}; @@ -104,13 +104,13 @@ fn rpc_eth_syncing() { assert_eq!(tester.io.handle_request(request), Some(false_res.to_owned())); { - let mut status = tester.sync.status.unwrapped_write(); + let mut status = tester.sync.status.write(); status.state = SyncState::Blocks; status.highest_block_number = Some(2500); // "sync" to 1000 blocks. // causes TestBlockChainClient to return 1000 for its best block number. - let mut blocks = tester.client.blocks.unwrapped_write(); + let mut blocks = tester.client.blocks.write(); for i in 0..1000 { blocks.insert(H256::from(i), Vec::new()); } @@ -121,7 +121,7 @@ fn rpc_eth_syncing() { { // finish "syncing" - let mut blocks = tester.client.blocks.unwrapped_write(); + let mut blocks = tester.client.blocks.write(); for i in 0..1500 { blocks.insert(H256::from(i + 1000), Vec::new()); } @@ -133,9 +133,9 @@ fn rpc_eth_syncing() { #[test] fn rpc_eth_hashrate() { let tester = EthTester::default(); - tester.hashrates.unwrapped_write().insert(H256::from(0), U256::from(0xfffa)); - tester.hashrates.unwrapped_write().insert(H256::from(0), U256::from(0xfffb)); - tester.hashrates.unwrapped_write().insert(H256::from(1), U256::from(0x1)); + tester.hashrates.write().insert(H256::from(0), U256::from(0xfffa)); + tester.hashrates.write().insert(H256::from(0), U256::from(0xfffb)); + tester.hashrates.write().insert(H256::from(1), U256::from(0x1)); let request = r#"{"jsonrpc": "2.0", "method": "eth_hashrate", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0xfffc","id":1}"#; @@ -158,7 +158,7 @@ fn rpc_eth_submit_hashrate() { let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; assert_eq!(tester.io.handle_request(request), Some(response.to_owned())); - assert_eq!(tester.hashrates.unwrapped_read().get(&H256::from("0x59daa26581d0acd1fce254fb7e85952f4c09d0915afd33d3886cd914bc7d283c")).cloned(), + assert_eq!(tester.hashrates.read().get(&H256::from("0x59daa26581d0acd1fce254fb7e85952f4c09d0915afd33d3886cd914bc7d283c")).cloned(), Some(U256::from(0x500_000))); } @@ -215,7 +215,7 @@ fn rpc_eth_mining() { let response = r#"{"jsonrpc":"2.0","result":false,"id":1}"#; assert_eq!(tester.io.handle_request(request), Some(response.to_owned())); - tester.hashrates.unwrapped_write().insert(H256::from(1), U256::from(0x1)); + tester.hashrates.write().insert(H256::from(1), U256::from(0x1)); let request = r#"{"jsonrpc": "2.0", "method": "eth_mining", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -364,7 +364,7 @@ fn rpc_eth_pending_transaction_by_hash() { let tester = EthTester::default(); { let tx: SignedTransaction = decode(&FromHex::from_hex("f85f800182520894095e7baea6a6c7c4c2dfeb977efac326af552d870a801ba048b55bfa915ac795c431978d8a6a992b628d557da5ff759b307d495a36649353a0efffd310ac743f371de3b9f7f9cb56c0b28ad43601b4ab949f53faa07bd2c804").unwrap()); - tester.miner.pending_transactions.locked().insert(H256::zero(), tx); + tester.miner.pending_transactions.lock().insert(H256::zero(), tx); } let response = r#"{"jsonrpc":"2.0","result":{"blockHash":null,"blockNumber":null,"creates":null,"from":"0x0f65fe9276bc9a24ae7083ae28e2660ef72df99e","gas":"0x5208","gasPrice":"0x01","hash":"0x41df922fd0d4766fcc02e161f8295ec28522f329ae487f14d811e4b64c8d6e31","input":"0x","nonce":"0x00","to":"0x095e7baea6a6c7c4c2dfeb977efac326af552d87","transactionIndex":null,"value":"0x0a"},"id":1}"#; @@ -591,7 +591,7 @@ fn rpc_eth_send_transaction() { assert_eq!(tester.io.handle_request(&request), Some(response)); - tester.miner.last_nonces.unwrapped_write().insert(address.clone(), U256::zero()); + tester.miner.last_nonces.write().insert(address.clone(), U256::zero()); let t = Transaction { nonce: U256::one(), @@ -749,7 +749,7 @@ fn returns_error_if_can_mine_and_no_closed_block() { use ethsync::{SyncState}; let eth_tester = EthTester::default(); - eth_tester.sync.status.unwrapped_write().state = SyncState::Idle; + eth_tester.sync.status.write().state = SyncState::Idle; let request = r#"{"jsonrpc": "2.0", "method": "eth_getWork", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","error":{"code":-32603,"message":"Internal error","data":null},"id":1}"#; diff --git a/rpc/src/v1/tests/mocked/eth_signing.rs b/rpc/src/v1/tests/mocked/eth_signing.rs index 5f3e75d35..794d5fc93 100644 --- a/rpc/src/v1/tests/mocked/eth_signing.rs +++ b/rpc/src/v1/tests/mocked/eth_signing.rs @@ -88,7 +88,7 @@ fn should_add_transaction_to_queue() { } #[test] -fn should_dispatch_transaction_if_account_is_unlocked() { +fn should_dispatch_transaction_if_account_is_unlock() { // given let tester = eth_signing(); let acc = tester.accounts.new_account("test").unwrap(); diff --git a/rpc/src/v1/tests/mocked/personal.rs b/rpc/src/v1/tests/mocked/personal.rs index 6cd3ae583..3b299728c 100644 --- a/rpc/src/v1/tests/mocked/personal.rs +++ b/rpc/src/v1/tests/mocked/personal.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::str::FromStr; use jsonrpc_core::IoHandler; use util::numbers::*; -use util::RwLockable; use ethcore::account_provider::AccountProvider; use v1::{PersonalClient, Personal}; use v1::tests::helpers::TestMinerService; @@ -175,7 +174,7 @@ fn sign_and_send_transaction() { assert_eq!(tester.io.handle_request(request.as_ref()), Some(response)); - tester.miner.last_nonces.unwrapped_write().insert(address.clone(), U256::zero()); + tester.miner.last_nonces.write().insert(address.clone(), U256::zero()); let t = Transaction { nonce: U256::one(), diff --git a/rpc/src/v1/tests/mocked/personal_signer.rs b/rpc/src/v1/tests/mocked/personal_signer.rs index 374c5dfa0..ced9a228a 100644 --- a/rpc/src/v1/tests/mocked/personal_signer.rs +++ b/rpc/src/v1/tests/mocked/personal_signer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::str::FromStr; use jsonrpc_core::IoHandler; use util::numbers::*; -use util::Lockable; use ethcore::account_provider::AccountProvider; use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Transaction, Action}; @@ -113,7 +112,7 @@ fn should_reject_transaction_from_queue_without_dispatching() { // then assert_eq!(tester.io.handle_request(&request), Some(response.to_owned())); assert_eq!(tester.queue.requests().len(), 0); - assert_eq!(tester.miner.imported_transactions.locked().len(), 0); + assert_eq!(tester.miner.imported_transactions.lock().len(), 0); } #[test] @@ -182,6 +181,6 @@ fn should_confirm_transaction_and_dispatch() { // then assert_eq!(tester.io.handle_request(&request), Some(response.to_owned())); assert_eq!(tester.queue.requests().len(), 0); - assert_eq!(tester.miner.imported_transactions.locked().len(), 1); + assert_eq!(tester.miner.imported_transactions.lock().len(), 1); } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index c96ba8b34..17f9e94cc 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1142,7 +1142,7 @@ impl ChainSync { |e| format!("Error sending nodes: {:?}", e)), _ => { - sync.unwrapped_write().on_packet(io, peer, packet_id, data); + sync.write().on_packet(io, peer, packet_id, data); Ok(()) } }; diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 3b252bfb7..4464fa8bc 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -73,9 +73,9 @@ extern crate rand; extern crate heapsize; use std::ops::*; -use std::sync::*; +use std::sync::Arc; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkConfiguration}; -use util::{TimerToken, U256, H256, RwLockable, UtilError}; +use util::{TimerToken, U256, H256, RwLock, UtilError}; use ethcore::client::{Client, ChainNotify}; use io::NetSyncIo; use chain::ChainSync; @@ -140,7 +140,7 @@ impl EthSync { impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> SyncStatus { - self.handler.sync.unwrapped_read().status() + self.handler.sync.read().status() } } @@ -161,16 +161,16 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.unwrapped_write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.unwrapped_write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.sync.unwrapped_write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); - self.sync.unwrapped_write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); } } @@ -184,7 +184,7 @@ impl ChainNotify for EthSync { { self.network.with_context(ETH_PROTOCOL, |context| { let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); - self.handler.sync.unwrapped_write().chain_new_blocks( + self.handler.sync.write().chain_new_blocks( &mut sync_io, &imported, &invalid, @@ -241,7 +241,7 @@ impl ManageNetwork for EthSync { fn stop_network(&self) { self.network.with_context(ETH_PROTOCOL, |context| { let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); - self.handler.sync.unwrapped_write().abort(&mut sync_io); + self.handler.sync.write().abort(&mut sync_io); }); self.stop(); } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index e5a8b3fb9..84e25429d 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -27,7 +27,7 @@ fn two_peers() { net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); assert!(net.peer(0).chain.block(BlockID::Number(1000)).is_some()); - assert_eq!(net.peer(0).chain.blocks.unwrapped_read().deref(), net.peer(1).chain.blocks.unwrapped_read().deref()); + assert_eq!(net.peer(0).chain.blocks.read().deref(), net.peer(1).chain.blocks.read().deref()); } #[test] @@ -37,7 +37,7 @@ fn long_chain() { net.peer_mut(1).chain.add_blocks(50000, EachBlockWith::Nothing); net.sync(); assert!(net.peer(0).chain.block(BlockID::Number(50000)).is_some()); - assert_eq!(net.peer(0).chain.blocks.unwrapped_read().deref(), net.peer(1).chain.blocks.unwrapped_read().deref()); + assert_eq!(net.peer(0).chain.blocks.read().deref(), net.peer(1).chain.blocks.read().deref()); } #[test] @@ -47,7 +47,7 @@ fn status_after_sync() { net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); - let status = net.peer(0).sync.unwrapped_read().status(); + let status = net.peer(0).sync.read().status(); assert_eq!(status.state, SyncState::Idle); } @@ -71,7 +71,7 @@ fn empty_blocks() { } net.sync(); assert!(net.peer(0).chain.block(BlockID::Number(1000)).is_some()); - assert_eq!(net.peer(0).chain.blocks.unwrapped_read().deref(), net.peer(1).chain.blocks.unwrapped_read().deref()); + assert_eq!(net.peer(0).chain.blocks.read().deref(), net.peer(1).chain.blocks.read().deref()); } #[test] @@ -87,12 +87,12 @@ fn forked() { net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2 net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); // peer 1 has the best chain of 601 blocks - let peer1_chain = net.peer(1).chain.numbers.unwrapped_read().clone(); + let peer1_chain = net.peer(1).chain.numbers.read().clone(); net.sync(); - assert_eq!(net.peer(0).chain.difficulty.unwrapped_read().deref(), net.peer(1).chain.difficulty.unwrapped_read().deref()); - assert_eq!(net.peer(0).chain.numbers.unwrapped_read().deref(), &peer1_chain); - assert_eq!(net.peer(1).chain.numbers.unwrapped_read().deref(), &peer1_chain); - assert_eq!(net.peer(2).chain.numbers.unwrapped_read().deref(), &peer1_chain); + assert_eq!(net.peer(0).chain.difficulty.read().deref(), net.peer(1).chain.difficulty.read().deref()); + assert_eq!(net.peer(0).chain.numbers.read().deref(), &peer1_chain); + assert_eq!(net.peer(1).chain.numbers.read().deref(), &peer1_chain); + assert_eq!(net.peer(2).chain.numbers.read().deref(), &peer1_chain); } #[test] @@ -107,14 +107,14 @@ fn restart() { assert!(net.peer(0).chain.chain_info().best_block_number > 100); net.restart_peer(0); - let status = net.peer(0).sync.unwrapped_read().status(); + let status = net.peer(0).sync.read().status(); assert_eq!(status.state, SyncState::ChainHead); } #[test] fn status_empty() { let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.unwrapped_read().status().state, SyncState::Idle); + assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle); } #[test] diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index f3a5c6c4c..d5fda2e70 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -118,7 +118,7 @@ impl TestNet { for client in 0..self.peers.len() { if peer != client { let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.unwrapped_write().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); + p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); } } } @@ -133,18 +133,18 @@ impl TestNet { trace!("----------------"); } let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.unwrapped_write().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + p.sync.write().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); } } pub fn sync_step_peer(&mut self, peer_num: usize) { let mut peer = self.peer_mut(peer_num); - peer.sync.unwrapped_write().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.write().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn restart_peer(&mut self, i: usize) { let peer = self.peer_mut(i); - peer.sync.unwrapped_write().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.write().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn sync(&mut self) -> u32 { @@ -173,6 +173,6 @@ impl TestNet { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.unwrapped_write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]); + peer.sync.write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]); } } diff --git a/util/Cargo.toml b/util/Cargo.toml index 9a38cabba..db8a1501f 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -35,6 +35,7 @@ vergen = "0.1" target_info = "0.1" bigint = { path = "bigint" } chrono = "0.2" +parking_lot = "0.2.6" using_queue = { path = "using_queue" } table = { path = "table" } ansi_term = "0.7" diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 7f80d50d7..bfd63b04c 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -14,18 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::*; +use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::collections::HashMap; use mio::*; use crossbeam::sync::chase_lev; use slab::Slab; use error::*; -use misc::*; use io::{IoError, IoHandler}; use io::worker::{Worker, Work, WorkType}; use panics::*; +use parking_lot::{Condvar, RwLock, Mutex}; + /// Timer ID pub type TimerToken = usize; /// Timer ID @@ -228,7 +229,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; if let Some(handler) = self.handlers.get(handler_index) { - if let Some(timer) = self.timers.unwrapped_read().get(&token.as_usize()) { + if let Some(timer) = self.timers.read().get(&token.as_usize()) { event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); self.work_ready.notify_all(); @@ -250,7 +251,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync // TODO: flush event loop self.handlers.remove(handler_id); // unregister timers - let mut timers = self.timers.unwrapped_write(); + let mut timers = self.timers.write(); let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect(); for timer_id in to_remove { let timer = timers.remove(&timer_id).expect("to_remove only contains keys from timers; qed"); @@ -260,11 +261,11 @@ impl Handler for IoManager where Message: Send + Clone + Sync IoMessage::AddTimer { handler_id, token, delay } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer"); - self.timers.unwrapped_write().insert(timer_id, UserTimer { delay: delay, timeout: timeout }); + self.timers.write().insert(timer_id, UserTimer { delay: delay, timeout: timeout }); }, IoMessage::RemoveTimer { handler_id, token } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; - if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) { + if let Some(timer) = self.timers.write().remove(&timer_id) { event_loop.clear_timeout(timer.timeout); } }, @@ -278,7 +279,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync handler.deregister_stream(token, event_loop); // unregister a timer associated with the token (if any) let timer_id = token + handler_id * TOKENS_PER_HANDLER; - if let Some(timer) = self.timers.unwrapped_write().remove(&timer_id) { + if let Some(timer) = self.timers.write().remove(&timer_id) { event_loop.clear_timeout(timer.timeout); } } diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 333acb6af..0ceadcd92 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::*; +use std::sync::Arc; use std::mem; use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; @@ -22,7 +22,8 @@ use crossbeam::sync::chase_lev; use io::service::{HandlerId, IoChannel, IoContext}; use io::{IoHandler}; use panics::*; -use misc::Lockable; + +use parking_lot::{Condvar, Mutex}; pub enum WorkType { Readable, @@ -82,11 +83,11 @@ impl Worker { where Message: Send + Sync + Clone + 'static { loop { { - let lock = wait_mutex.locked(); + let mut lock = wait_mutex.lock(); if deleting.load(AtomicOrdering::Acquire) { return; } - let _ = wait.wait(lock).unwrap(); + let _ = wait.wait(&mut lock); } if deleting.load(AtomicOrdering::Acquire) { diff --git a/util/src/journaldb/earlymergedb.rs b/util/src/journaldb/earlymergedb.rs index abc6fb2e0..e976576bc 100644 --- a/util/src/journaldb/earlymergedb.rs +++ b/util/src/journaldb/earlymergedb.rs @@ -20,7 +20,6 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use misc::RwLockable; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY}; use super::traits::JournalDB; use kvdb::{Database, DBTransaction, DatabaseConfig}; @@ -226,7 +225,7 @@ impl EarlyMergeDB { #[cfg(test)] fn can_reconstruct_refs(&self) -> bool { let (latest_era, reconstructed) = Self::read_refs(&self.backing); - let refs = self.refs.as_ref().unwrap().unwrapped_write(); + let refs = self.refs.as_ref().unwrap().write(); if *refs != reconstructed || latest_era != self.latest_era { let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::>(); let clean_recon = reconstructed.into_iter().filter_map(|(k, v)| if refs.get(&k) == Some(&v) {None} else {Some((k.clone(), v.clone()))}).collect::>(); @@ -334,7 +333,7 @@ impl JournalDB for EarlyMergeDB { fn mem_used(&self) -> usize { self.overlay.mem_used() + match self.refs { - Some(ref c) => c.unwrapped_read().heap_size_of_children(), + Some(ref c) => c.read().heap_size_of_children(), None => 0 } } @@ -390,7 +389,7 @@ impl JournalDB for EarlyMergeDB { // // record new commit's details. - let mut refs = self.refs.as_ref().unwrap().unwrapped_write(); + let mut refs = self.refs.as_ref().unwrap().write(); let batch = DBTransaction::new(); let trace = false; { diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 269b7e457..fcc537d53 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -20,7 +20,6 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use misc::RwLockable; use super::{DB_PREFIX_LEN, LATEST_ERA_KEY, VERSION_KEY}; use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] @@ -137,7 +136,7 @@ impl OverlayRecentDB { #[cfg(test)] fn can_reconstruct_refs(&self) -> bool { let reconstructed = Self::read_overlay(&self.backing); - let journal_overlay = self.journal_overlay.unwrapped_read(); + let journal_overlay = self.journal_overlay.read(); *journal_overlay == reconstructed } @@ -207,7 +206,7 @@ impl JournalDB for OverlayRecentDB { fn mem_used(&self) -> usize { let mut mem = self.transaction_overlay.mem_used(); - let overlay = self.journal_overlay.unwrapped_read(); + let overlay = self.journal_overlay.read(); mem += overlay.backing_overlay.mem_used(); mem += overlay.journal.heap_size_of_children(); mem @@ -217,17 +216,17 @@ impl JournalDB for OverlayRecentDB { self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() } - fn latest_era(&self) -> Option { self.journal_overlay.unwrapped_read().latest_era } + fn latest_era(&self) -> Option { self.journal_overlay.read().latest_era } fn state(&self, key: &H256) -> Option { - let v = self.journal_overlay.unwrapped_read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); + let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); v.or_else(|| self.backing.get_by_prefix(&key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) } fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // record new commit's details. trace!("commit: #{} ({}), end era: {:?}", now, id, end); - let mut journal_overlay = self.journal_overlay.unwrapped_write(); + let mut journal_overlay = self.journal_overlay.write(); let batch = DBTransaction::new(); { let mut r = RlpStream::new_list(3); @@ -334,7 +333,7 @@ impl HashDB for OverlayRecentDB { match k { Some(&(ref d, rc)) if rc > 0 => Some(d), _ => { - let v = self.journal_overlay.unwrapped_read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); + let v = self.journal_overlay.read().backing_overlay.get(&OverlayRecentDB::to_short_key(key)).map(|v| v.to_vec()); match v { Some(x) => { Some(&self.transaction_overlay.denote(key, x).0) diff --git a/util/src/lib.rs b/util/src/lib.rs index ee6b53138..bcd9df971 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -116,6 +116,7 @@ extern crate libc; extern crate target_info; extern crate bigint; extern crate chrono; +extern crate parking_lot; pub extern crate using_queue; pub extern crate table; extern crate ansi_term; diff --git a/util/src/log.rs b/util/src/log.rs index 785737591..b4169f91c 100644 --- a/util/src/log.rs +++ b/util/src/log.rs @@ -18,14 +18,14 @@ use std::env; use std::borrow::Cow; -use rlog::{LogLevelFilter}; +use rlog::LogLevelFilter; use env_logger::LogBuilder; -use std::sync::{RwLock, RwLockReadGuard}; use std::sync::atomic::{Ordering, AtomicBool}; use arrayvec::ArrayVec; -use misc::RwLockable; pub use ansi_term::{Colour, Style}; +use parking_lot::{RwLock, RwLockReadGuard}; + lazy_static! { static ref USE_COLOR: AtomicBool = AtomicBool::new(false); } @@ -91,7 +91,7 @@ impl RotatingLogger { /// Append new log entry pub fn append(&self, log: String) { - self.logs.unwrapped_write().insert(0, log); + self.logs.write().insert(0, log); } /// Return levels @@ -101,7 +101,7 @@ impl RotatingLogger { /// Return logs pub fn logs(&self) -> RwLockReadGuard> { - self.logs.unwrapped_read() + self.logs.read() } } diff --git a/util/src/misc.rs b/util/src/misc.rs index fff03259a..62e8542db 100644 --- a/util/src/misc.rs +++ b/util/src/misc.rs @@ -64,29 +64,4 @@ pub fn version_data() -> Bytes { s.append(&rustc_version()); s.append(&&Target::os()[0..2]); s.out() -} - -/// Object can be locked directly into a `MutexGuard`. -pub trait Lockable { - /// Lock object directly into a `MutexGuard`. - fn locked(&self) -> MutexGuard; -} - -impl Lockable for Mutex { - fn locked(&self) -> MutexGuard { self.lock().unwrap() } -} - -/// Object can be read or write locked directly into a guard. -pub trait RwLockable { - /// Read-lock object directly into a `ReadGuard`. - fn unwrapped_read(&self) -> RwLockReadGuard; - - /// Write-lock object directly into a `WriteGuard`. - fn unwrapped_write(&self) -> RwLockWriteGuard; -} - -impl RwLockable for RwLock { - fn unwrapped_read(&self) -> RwLockReadGuard { self.read().unwrap() } - fn unwrapped_write(&self) -> RwLockWriteGuard { self.write().unwrap() } -} - +} \ No newline at end of file diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 9ee045c87..9963d94b7 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -96,13 +96,13 @@ impl GenericConnection { } }, Ok(_) => return Ok(None), - Err(e) => { + Err(e) => { debug!(target:"network", "Read error {} ({})", self.token, e); return Err(e) } } } - } + } /// Add a packet to send queue. pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone { @@ -490,7 +490,7 @@ pub fn test_encryption() { #[cfg(test)] mod tests { use super::*; - use std::sync::*; + use std::sync::Arc; use std::sync::atomic::AtomicBool; use super::super::stats::*; use std::io::{Read, Write, Error, Cursor, ErrorKind}; diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 701437ab3..0612e4444 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use std::sync::*; +use std::sync::Arc; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::ops::*; use std::cmp::min; @@ -42,6 +42,7 @@ use network::error::{NetworkError, DisconnectReason}; use network::discovery::{Discovery, TableUpdates, NodeEntry}; use network::ip_utils::{map_external_address, select_public_address}; use path::restrict_permissions_owner; +use parking_lot::{Mutex, RwLock}; type Slab = ::slab::Slab; @@ -201,7 +202,7 @@ impl<'s> NetworkContext<'s> { protocol: ProtocolId, session: Option, sessions: Arc>>, reserved_peers: &'s HashSet) -> NetworkContext<'s> { - let id = session.as_ref().map(|s| s.locked().token()); + let id = session.as_ref().map(|s| s.lock().token()); NetworkContext { io: io, protocol: protocol, @@ -215,7 +216,7 @@ impl<'s> NetworkContext<'s> { fn resolve_session(&self, peer: PeerId) -> Option { match self.session_id { Some(id) if id == peer => self.session.clone(), - _ => self.sessions.unwrapped_read().get(peer).cloned(), + _ => self.sessions.read().get(peer).cloned(), } } @@ -223,7 +224,7 @@ impl<'s> NetworkContext<'s> { pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { let session = self.resolve_session(peer); if let Some(session) = session { - try!(session.locked().send_packet(self.io, self.protocol, packet_id as u8, &data)); + try!(session.lock().send_packet(self.io, self.protocol, packet_id as u8, &data)); } else { trace!(target: "network", "Send: Peer no longer exist") } @@ -256,7 +257,7 @@ impl<'s> NetworkContext<'s> { /// Check if the session is still active. pub fn is_expired(&self) -> bool { - self.session.as_ref().map_or(false, |s| s.locked().expired()) + self.session.as_ref().map_or(false, |s| s.lock().expired()) } /// Register a new IO timer. 'IoHandler::timeout' will be called with the token. @@ -273,7 +274,7 @@ impl<'s> NetworkContext<'s> { pub fn peer_info(&self, peer: PeerId) -> String { let session = self.resolve_session(peer); if let Some(session) = session { - return session.locked().info.client_version.clone() + return session.lock().info.client_version.clone() } "unknown".to_owned() } @@ -416,8 +417,8 @@ impl Host { Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - self.nodes.unwrapped_write().add_node(n); - if let Some(ref mut discovery) = *self.discovery.locked() { + self.nodes.write().add_node(n); + if let Some(ref mut discovery) = *self.discovery.lock() { discovery.add_node(entry); } } @@ -428,9 +429,9 @@ impl Host { let n = try!(Node::from_str(id)); let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - self.reserved_nodes.unwrapped_write().insert(n.id.clone()); + self.reserved_nodes.write().insert(n.id.clone()); - if let Some(ref mut discovery) = *self.discovery.locked() { + if let Some(ref mut discovery) = *self.discovery.lock() { discovery.add_node(entry); } @@ -438,17 +439,17 @@ impl Host { } pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext) { - let mut info = self.info.unwrapped_write(); + let mut info = self.info.write(); if info.config.non_reserved_mode != mode { info.config.non_reserved_mode = mode.clone(); drop(info); if let NonReservedPeerMode::Deny = mode { // disconnect all non-reserved peers here. - let reserved: HashSet = self.reserved_nodes.unwrapped_read().clone(); + let reserved: HashSet = self.reserved_nodes.read().clone(); let mut to_kill = Vec::new(); - for e in self.sessions.unwrapped_write().iter_mut() { - let mut s = e.locked(); + for e in self.sessions.write().iter_mut() { + let mut s = e.lock(); { let id = s.id(); if id.is_some() && reserved.contains(id.unwrap()) { @@ -469,7 +470,7 @@ impl Host { pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> { let n = try!(Node::from_str(id)); - self.reserved_nodes.unwrapped_write().remove(&n.id); + self.reserved_nodes.write().remove(&n.id); Ok(()) } @@ -479,11 +480,11 @@ impl Host { } pub fn external_url(&self) -> Option { - self.info.unwrapped_read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.unwrapped_read().id().clone(), e.clone()))) + self.info.read().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().id().clone(), e.clone()))) } pub fn local_url(&self) -> String { - let r = format!("{}", Node::new(self.info.unwrapped_read().id().clone(), self.info.unwrapped_read().local_endpoint.clone())); + let r = format!("{}", Node::new(self.info.read().id().clone(), self.info.read().local_endpoint.clone())); println!("{}", r); r } @@ -491,8 +492,8 @@ impl Host { pub fn stop(&self, io: &IoContext) -> Result<(), UtilError> { self.stopping.store(true, AtomicOrdering::Release); let mut to_kill = Vec::new(); - for e in self.sessions.unwrapped_write().iter_mut() { - let mut s = e.locked(); + for e in self.sessions.write().iter_mut() { + let mut s = e.lock(); s.disconnect(io, DisconnectReason::ClientQuit); to_kill.push(s.token()); } @@ -505,16 +506,16 @@ impl Host { } fn init_public_interface(&self, io: &IoContext) -> Result<(), UtilError> { - if self.info.unwrapped_read().public_endpoint.is_some() { + if self.info.read().public_endpoint.is_some() { return Ok(()); } - let local_endpoint = self.info.unwrapped_read().local_endpoint.clone(); - let public_address = self.info.unwrapped_read().config.public_address.clone(); + let local_endpoint = self.info.read().local_endpoint.clone(); + let public_address = self.info.read().config.public_address.clone(); let public_endpoint = match public_address { None => { let public_address = select_public_address(local_endpoint.address.port()); let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port }; - if self.info.unwrapped_read().config.nat_enabled { + if self.info.read().config.nat_enabled { match map_external_address(&local_endpoint) { Some(endpoint) => { info!("NAT mapped to external address {}", endpoint.address); @@ -529,7 +530,7 @@ impl Host { Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port } }; - self.info.unwrapped_write().public_endpoint = Some(public_endpoint.clone()); + self.info.write().public_endpoint = Some(public_endpoint.clone()); if let Some(url) = self.external_url() { io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); @@ -537,7 +538,7 @@ impl Host { // Initialize discovery. let discovery = { - let info = self.info.unwrapped_read(); + let info = self.info.read(); if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { let mut udp_addr = local_endpoint.address.clone(); udp_addr.set_port(local_endpoint.udp_port); @@ -546,11 +547,11 @@ impl Host { }; if let Some(mut discovery) = discovery { - discovery.init_node_list(self.nodes.unwrapped_read().unordered_entries()); - for n in self.nodes.unwrapped_read().unordered_entries() { + discovery.init_node_list(self.nodes.read().unordered_entries()); + for n in self.nodes.read().unordered_entries() { discovery.add_node(n.clone()); } - *self.discovery.locked() = Some(discovery); + *self.discovery.lock() = Some(discovery); io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); @@ -566,7 +567,7 @@ impl Host { } fn have_session(&self, id: &NodeId) -> bool { - self.sessions.unwrapped_read().iter().any(|e| e.locked().info.id == Some(id.clone())) + self.sessions.read().iter().any(|e| e.lock().info.id == Some(id.clone())) } fn session_count(&self) -> usize { @@ -574,17 +575,17 @@ impl Host { } fn connecting_to(&self, id: &NodeId) -> bool { - self.sessions.unwrapped_read().iter().any(|e| e.locked().id() == Some(id)) + self.sessions.read().iter().any(|e| e.lock().id() == Some(id)) } fn handshake_count(&self) -> usize { - self.sessions.unwrapped_read().count() - self.session_count() + self.sessions.read().count() - self.session_count() } fn keep_alive(&self, io: &IoContext) { let mut to_kill = Vec::new(); - for e in self.sessions.unwrapped_write().iter_mut() { - let mut s = e.locked(); + for e in self.sessions.write().iter_mut() { + let mut s = e.lock(); if !s.keep_alive(io) { s.disconnect(io, DisconnectReason::PingTimeout); to_kill.push(s.token()); @@ -598,7 +599,7 @@ impl Host { fn connect_peers(&self, io: &IoContext) { let (ideal_peers, mut pin) = { - let info = self.info.unwrapped_read(); + let info = self.info.read(); if info.capabilities.is_empty() { return; } @@ -608,7 +609,7 @@ impl Host { }; let session_count = self.session_count(); - let reserved_nodes = self.reserved_nodes.unwrapped_read(); + let reserved_nodes = self.reserved_nodes.read(); if session_count >= ideal_peers as usize + reserved_nodes.len() { // check if all pinned nodes are connected. if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { @@ -629,7 +630,7 @@ impl Host { // iterate over all nodes, reserved ones coming first. // if we are pinned to only reserved nodes, ignore all others. let nodes = reserved_nodes.iter().cloned().chain(if !pin { - self.nodes.unwrapped_read().nodes() + self.nodes.read().nodes() } else { Vec::new() }); @@ -657,7 +658,7 @@ impl Host { let socket = { let address = { - let mut nodes = self.nodes.unwrapped_write(); + let mut nodes = self.nodes.write(); if let Some(node) = nodes.get_mut(id) { node.last_attempted = Some(::time::now()); node.endpoint.address @@ -682,11 +683,11 @@ impl Host { #[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))] fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext) -> Result<(), UtilError> { - let nonce = self.info.unwrapped_write().next_nonce(); - let mut sessions = self.sessions.unwrapped_write(); + let nonce = self.info.write().next_nonce(); + let mut sessions = self.sessions.write(); let token = sessions.insert_with_opt(|token| { - match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.unwrapped_read()) { + match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read()) { Ok(s) => Some(Arc::new(Mutex::new(s))), Err(e) => { debug!(target: "network", "Session create error: {:?}", e); @@ -707,7 +708,7 @@ impl Host { fn accept(&self, io: &IoContext) { trace!(target: "network", "Accepting incoming connection"); loop { - let socket = match self.tcp_listener.locked().accept() { + let socket = match self.tcp_listener.lock().accept() { Ok(None) => break, Ok(Some((sock, _addr))) => sock, Err(e) => { @@ -722,11 +723,11 @@ impl Host { } fn session_writable(&self, token: StreamToken, io: &IoContext) { - let session = { self.sessions.unwrapped_read().get(token).cloned() }; + let session = { self.sessions.read().get(token).cloned() }; if let Some(session) = session { - let mut s = session.locked(); - if let Err(e) = s.writable(io, &self.info.unwrapped_read()) { + let mut s = session.lock(); + if let Err(e) = s.writable(io, &self.info.read()) { trace!(target: "network", "Session write error: {}: {:?}", token, e); } if s.done() { @@ -745,16 +746,16 @@ impl Host { let mut ready_data: Vec = Vec::new(); let mut packet_data: Vec<(ProtocolId, PacketId, Vec)> = Vec::new(); let mut kill = false; - let session = { self.sessions.unwrapped_read().get(token).cloned() }; + let session = { self.sessions.read().get(token).cloned() }; if let Some(session) = session.clone() { - let mut s = session.locked(); + let mut s = session.lock(); loop { - match s.readable(io, &self.info.unwrapped_read()) { + match s.readable(io, &self.info.read()) { Err(e) => { trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e { if let Some(id) = s.id() { - self.nodes.unwrapped_write().mark_as_useless(id); + self.nodes.write().mark_as_useless(id); } } kill = true; @@ -764,9 +765,9 @@ impl Host { self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); - let reserved_nodes = self.reserved_nodes.unwrapped_read(); + let reserved_nodes = self.reserved_nodes.read(); let (ideal_peers, reserved_only) = { - let info = self.info.unwrapped_read(); + let info = self.info.read(); (info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny) }; @@ -781,14 +782,14 @@ impl Host { // Add it no node table if let Ok(address) = s.remote_addr() { let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; - self.nodes.unwrapped_write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); - let mut discovery = self.discovery.locked(); + self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); + let mut discovery = self.discovery.lock(); if let Some(ref mut discovery) = *discovery.deref_mut() { discovery.add_node(entry); } } } - for (p, _) in self.handlers.unwrapped_read().iter() { + for (p, _) in self.handlers.read().iter() { if s.have_capability(p) { ready_data.push(p); } @@ -799,7 +800,7 @@ impl Host { protocol, packet_id, }) => { - match self.handlers.unwrapped_read().get(protocol) { + match self.handlers.read().get(protocol) { None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) }, Some(_) => packet_data.push((protocol, packet_id, data)), } @@ -812,16 +813,16 @@ impl Host { if kill { self.kill_connection(token, io, true); } - let handlers = self.handlers.unwrapped_read(); + let handlers = self.handlers.read(); for p in ready_data { let h = handlers.get(p).unwrap().clone(); self.stats.inc_sessions(); - let reserved = self.reserved_nodes.unwrapped_read(); + let reserved = self.reserved_nodes.read(); h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); } for (p, packet_id, data) in packet_data { let h = handlers.get(p).unwrap().clone(); - let reserved = self.reserved_nodes.unwrapped_read(); + let reserved = self.reserved_nodes.read(); h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); } } @@ -837,14 +838,14 @@ impl Host { let mut deregister = false; let mut expired_session = None; if let FIRST_SESSION ... LAST_SESSION = token { - let sessions = self.sessions.unwrapped_write(); + let sessions = self.sessions.write(); if let Some(session) = sessions.get(token).cloned() { expired_session = Some(session.clone()); - let mut s = session.locked(); + let mut s = session.lock(); if !s.expired() { if s.is_ready() { self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); - for (p, _) in self.handlers.unwrapped_read().iter() { + for (p, _) in self.handlers.read().iter() { if s.have_capability(p) { to_disconnect.push(p); } @@ -858,12 +859,12 @@ impl Host { } if let Some(id) = failure_id { if remote { - self.nodes.unwrapped_write().note_failure(&id); + self.nodes.write().note_failure(&id); } } for p in to_disconnect { - let h = self.handlers.unwrapped_read().get(p).unwrap().clone(); - let reserved = self.reserved_nodes.unwrapped_read(); + let h = self.handlers.read().get(p).unwrap().clone(); + let reserved = self.reserved_nodes.read(); h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); } if deregister { @@ -874,9 +875,9 @@ impl Host { fn update_nodes(&self, io: &IoContext, node_changes: TableUpdates) { let mut to_remove: Vec = Vec::new(); { - let sessions = self.sessions.unwrapped_write(); + let sessions = self.sessions.write(); for c in sessions.iter() { - let s = c.locked(); + let s = c.lock(); if let Some(id) = s.id() { if node_changes.removed.contains(id) { to_remove.push(s.token()); @@ -888,11 +889,11 @@ impl Host { trace!(target: "network", "Removed from node table: {}", i); self.kill_connection(i, io, false); } - self.nodes.unwrapped_write().update(node_changes); + self.nodes.write().update(node_changes); } pub fn with_context(&self, protocol: ProtocolId, io: &IoContext, action: F) where F: Fn(&NetworkContext) { - let reserved = { self.reserved_nodes.unwrapped_read() }; + let reserved = { self.reserved_nodes.read() }; let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); action(&context); @@ -922,7 +923,7 @@ impl IoHandler for Host { match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), DISCOVERY => { - let node_changes = { self.discovery.locked().as_mut().unwrap().readable(io) }; + let node_changes = { self.discovery.lock().as_mut().unwrap().readable(io) }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -939,7 +940,7 @@ impl IoHandler for Host { match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), DISCOVERY => { - self.discovery.locked().as_mut().unwrap().writable(io); + self.discovery.lock().as_mut().unwrap().writable(io); } _ => panic!("Received unknown writable token"), } @@ -953,11 +954,11 @@ impl IoHandler for Host { IDLE => self.maintain_network(io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { - self.discovery.locked().as_mut().unwrap().refresh(); + self.discovery.lock().as_mut().unwrap().refresh(); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, DISCOVERY_ROUND => { - let node_changes = { self.discovery.locked().as_mut().unwrap().round() }; + let node_changes = { self.discovery.lock().as_mut().unwrap().round() }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -965,13 +966,13 @@ impl IoHandler for Host { }, NODE_TABLE => { trace!(target: "network", "Refreshing node table"); - self.nodes.unwrapped_write().clear_useless(); + self.nodes.write().clear_useless(); }, - _ => match self.timers.unwrapped_read().get(&token).cloned() { - Some(timer) => match self.handlers.unwrapped_read().get(timer.protocol).cloned() { + _ => match self.timers.read().get(&token).cloned() { + Some(timer) => match self.handlers.read().get(timer.protocol).cloned() { None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, Some(h) => { - let reserved = self.reserved_nodes.unwrapped_read(); + let reserved = self.reserved_nodes.read(); h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token); } }, @@ -991,10 +992,10 @@ impl IoHandler for Host { ref versions } => { let h = handler.clone(); - let reserved = self.reserved_nodes.unwrapped_read(); + let reserved = self.reserved_nodes.read(); h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved)); - self.handlers.unwrapped_write().insert(protocol, h); - let mut info = self.info.unwrapped_write(); + self.handlers.write().insert(protocol, h); + let mut info = self.info.write(); for v in versions { info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 }); } @@ -1005,29 +1006,29 @@ impl IoHandler for Host { ref token, } => { let handler_token = { - let mut timer_counter = self.timer_counter.unwrapped_write(); + let mut timer_counter = self.timer_counter.write(); let counter = &mut *timer_counter; let handler_token = *counter; *counter += 1; handler_token }; - self.timers.unwrapped_write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); + self.timers.write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e)); }, NetworkIoMessage::Disconnect(ref peer) => { - let session = { self.sessions.unwrapped_read().get(*peer).cloned() }; + let session = { self.sessions.read().get(*peer).cloned() }; if let Some(session) = session { - session.locked().disconnect(io, DisconnectReason::DisconnectRequested); + session.lock().disconnect(io, DisconnectReason::DisconnectRequested); } trace!(target: "network", "Disconnect requested {}", peer); self.kill_connection(*peer, io, false); }, NetworkIoMessage::DisablePeer(ref peer) => { - let session = { self.sessions.unwrapped_read().get(*peer).cloned() }; + let session = { self.sessions.read().get(*peer).cloned() }; if let Some(session) = session { - session.locked().disconnect(io, DisconnectReason::DisconnectRequested); - if let Some(id) = session.locked().id() { - self.nodes.unwrapped_write().mark_as_useless(id) + session.lock().disconnect(io, DisconnectReason::DisconnectRequested); + if let Some(id) = session.lock().id() { + self.nodes.write().mark_as_useless(id) } } trace!(target: "network", "Disabling peer {}", peer); @@ -1042,13 +1043,13 @@ impl IoHandler for Host { fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>) { match stream { FIRST_SESSION ... LAST_SESSION => { - let session = { self.sessions.unwrapped_read().get(stream).cloned() }; + let session = { self.sessions.read().get(stream).cloned() }; if let Some(session) = session { - session.locked().register_socket(reg, event_loop).expect("Error registering socket"); + session.lock().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.locked().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), + TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -1056,9 +1057,9 @@ impl IoHandler for Host { fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop>) { match stream { FIRST_SESSION ... LAST_SESSION => { - let mut connections = self.sessions.unwrapped_write(); + let mut connections = self.sessions.write(); if let Some(connection) = connections.get(stream).cloned() { - connection.locked().deregister_socket(event_loop).expect("Error deregistering socket"); + connection.lock().deregister_socket(event_loop).expect("Error deregistering socket"); connections.remove(stream); } } @@ -1070,13 +1071,13 @@ impl IoHandler for Host { fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>) { match stream { FIRST_SESSION ... LAST_SESSION => { - let connection = { self.sessions.unwrapped_read().get(stream).cloned() }; + let connection = { self.sessions.read().get(stream).cloned() }; if let Some(connection) = connection { - connection.locked().update_socket(reg, event_loop).expect("Error updating socket"); + connection.lock().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.locked().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), + TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } } diff --git a/util/src/network/service.rs b/util/src/network/service.rs index ece8324c7..d95db5842 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -14,16 +14,18 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::*; use error::*; use panics::*; -use misc::RwLockable; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::NetworkError; use network::host::{Host, NetworkContext, NetworkIoMessage, ProtocolId}; use network::stats::NetworkStats; use io::*; +use std::sync::Arc; + +use parking_lot::RwLock; + /// IO Service with networking /// `Message` defines a notification data type. pub struct NetworkService { @@ -86,19 +88,19 @@ impl NetworkService { /// Returns external url if available. pub fn external_url(&self) -> Option { - let host = self.host.unwrapped_read(); + let host = self.host.read(); host.as_ref().and_then(|h| h.external_url()) } /// Returns external url if available. pub fn local_url(&self) -> Option { - let host = self.host.unwrapped_read(); + let host = self.host.read(); host.as_ref().map(|h| h.local_url()) } /// Start network IO pub fn start(&self) -> Result<(), UtilError> { - let mut host = self.host.unwrapped_write(); + let mut host = self.host.write(); if host.is_none() { let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone()))); try!(self.io_service.register_handler(h.clone())); @@ -109,7 +111,7 @@ impl NetworkService { /// Stop network IO pub fn stop(&self) -> Result<(), UtilError> { - let mut host = self.host.unwrapped_write(); + let mut host = self.host.write(); if let Some(ref host) = *host { let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host try!(host.stop(&io)); @@ -120,7 +122,7 @@ impl NetworkService { /// Try to add a reserved peer. pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { - let host = self.host.unwrapped_read(); + let host = self.host.read(); if let Some(ref host) = *host { host.add_reserved_node(peer) } else { @@ -130,7 +132,7 @@ impl NetworkService { /// Try to remove a reserved peer. pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { - let host = self.host.unwrapped_read(); + let host = self.host.read(); if let Some(ref host) = *host { host.remove_reserved_node(peer) } else { @@ -140,7 +142,7 @@ impl NetworkService { /// Set the non-reserved peer mode. pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) { - let host = self.host.unwrapped_read(); + let host = self.host.read(); if let Some(ref host) = *host { let io_ctxt = IoContext::new(self.io_service.channel(), 0); host.set_non_reserved_mode(mode, &io_ctxt); @@ -150,7 +152,7 @@ impl NetworkService { /// Executes action in the network context pub fn with_context(&self, protocol: ProtocolId, action: F) where F: Fn(&NetworkContext) { let io = IoContext::new(self.io_service.channel(), 0); - let host = self.host.unwrapped_read(); + let host = self.host.read(); if let Some(ref host) = host.as_ref() { host.with_context(protocol, &io, action); }; diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index b428a0a8e..450b5dd00 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -18,7 +18,6 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::thread; use std::time::*; use common::*; -use misc::*; use network::*; use io::TimerToken; use crypto::KeyPair; @@ -47,7 +46,7 @@ impl TestProtocol { } pub fn got_packet(&self) -> bool { - self.packet.locked().deref()[..] == b"hello"[..] + self.packet.lock().deref()[..] == b"hello"[..] } pub fn got_timeout(&self) -> bool { @@ -66,7 +65,7 @@ impl NetworkProtocolHandler for TestProtocol { fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { assert_eq!(packet_id, 33); - self.packet.locked().extend(data); + self.packet.lock().extend(data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { diff --git a/util/src/panics.rs b/util/src/panics.rs index 7b6c92639..8db875bdf 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -18,9 +18,10 @@ use std::thread; use std::ops::DerefMut; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::default::Default; -use misc::Lockable; + +use parking_lot::Mutex; /// Thread-safe closure for handling possible panics pub trait OnPanicListener: Send + Sync + 'static { @@ -89,7 +90,7 @@ impl PanicHandler { /// Notifies all listeners in case there is a panic. /// You should use `catch_panic` instead of calling this method explicitly. pub fn notify_all(&self, r: String) { - let mut listeners = self.listeners.locked(); + let mut listeners = self.listeners.lock(); for listener in listeners.deref_mut() { listener.call(&r); } @@ -98,7 +99,7 @@ impl PanicHandler { impl MayPanic for PanicHandler { fn on_panic(&self, closure: F) where F: OnPanicListener { - self.listeners.locked().push(Box::new(closure)); + self.listeners.lock().push(Box::new(closure)); } } @@ -119,50 +120,46 @@ impl OnPanicListener for F #[test] #[ignore] // panic forwarding doesnt work on the same thread in beta fn should_notify_listeners_about_panic () { - use std::sync::RwLock; - use misc::RwLockable; + use parking_lot::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new(); - p.on_panic(move |t| i.unwrapped_write().push(t)); + p.on_panic(move |t| i.write().push(t)); // when p.catch_panic(|| panic!("Panic!")).unwrap_err(); // then - assert!(invocations.unwrapped_read()[0] == "Panic!"); + assert!(invocations.read()[0] == "Panic!"); } #[test] #[ignore] // panic forwarding doesnt work on the same thread in beta fn should_notify_listeners_about_panic_when_string_is_dynamic () { - use std::sync::RwLock; - use misc::RwLockable; + use parking_lot::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new(); - p.on_panic(move |t| i.unwrapped_write().push(t)); + p.on_panic(move |t| i.write().push(t)); // when p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); // then - assert!(invocations.unwrapped_read()[0] == "Panic: 1"); + assert!(invocations.read()[0] == "Panic: 1"); } #[test] fn should_notify_listeners_about_panic_in_other_thread () { use std::thread; - use std::sync::RwLock; - use misc::RwLockable; - + use parking_lot::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new(); - p.on_panic(move |t| i.unwrapped_write().push(t)); + p.on_panic(move |t| i.write().push(t)); // when let t = thread::spawn(move || @@ -171,20 +168,18 @@ fn should_notify_listeners_about_panic_in_other_thread () { t.join().unwrap_err(); // then - assert!(invocations.unwrapped_read()[0] == "Panic!"); + assert!(invocations.read()[0] == "Panic!"); } #[test] #[ignore] // panic forwarding doesnt work on the same thread in beta fn should_forward_panics () { -use std::sync::RwLock; - use misc::RwLockable; - + use parking_lot::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); let p = PanicHandler::new_in_arc(); - p.on_panic(move |t| i.unwrapped_write().push(t)); + p.on_panic(move |t| i.write().push(t)); let p2 = PanicHandler::new(); p.forward_from(&p2); @@ -193,5 +188,5 @@ use std::sync::RwLock; p2.catch_panic(|| panic!("Panic!")).unwrap_err(); // then - assert!(invocations.unwrapped_read()[0] == "Panic!"); + assert!(invocations.read()[0] == "Panic!"); } diff --git a/util/src/standard.rs b/util/src/standard.rs index c588e9d53..ac41ef50d 100644 --- a/util/src/standard.rs +++ b/util/src/standard.rs @@ -36,7 +36,7 @@ pub use std::error::Error as StdError; pub use std::ops::*; pub use std::cmp::*; -pub use std::sync::*; +pub use std::sync::Arc; pub use std::cell::*; pub use std::collections::*; @@ -46,3 +46,5 @@ pub use rustc_serialize::hex::{FromHex, FromHexError}; pub use heapsize::HeapSizeOf; pub use itertools::Itertools; + +pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; \ No newline at end of file