diff --git a/Cargo.lock b/Cargo.lock index 7559b7167..61927a47c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -664,6 +664,7 @@ dependencies = [ "kvdb-memorydb 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb-rocksdb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "len-caching-mutex 0.1.0", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "macros 0.1.0", @@ -1836,6 +1837,13 @@ name = "lazycell" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "len-caching-mutex" +version = "0.1.0" +dependencies = [ + "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "libc" version = "0.2.43" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index fc034a796..4d1f12058 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -70,6 +70,7 @@ kvdb-rocksdb = "0.1.3" serde = "1.0" serde_derive = "1.0" tempdir = {version="0.3", optional = true} +len-caching-mutex = { path = "../util/len-caching-mutex" } [target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "android"))'.dependencies] hardware-wallet = { path = "../hw" } diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index b3d77cbfa..852cf1e1c 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -112,6 +112,7 @@ extern crate journaldb; extern crate serde; #[cfg(any(test, feature = "json-tests", feature = "test-helpers"))] extern crate tempdir; +extern crate len_caching_mutex; #[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "android"))] extern crate hardware_wallet; diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index b9242f47b..f6b1f48dc 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -29,6 +29,7 @@ use io::*; use error::{BlockError, ImportErrorKind, ErrorKind, Error}; use engines::EthEngine; use client::ClientIoMessage; +use len_caching_mutex::LenCachingMutex; use self::kind::{BlockLike, Kind}; @@ -195,9 +196,9 @@ impl QueueSignal { struct Verification { // All locks must be captured in the order declared here. - unverified: Mutex>, - verifying: Mutex>>, - verified: Mutex>, + unverified: LenCachingMutex>, + verifying: LenCachingMutex>>, + verified: LenCachingMutex>, bad: Mutex>, sizes: Sizes, check_seal: bool, @@ -207,9 +208,9 @@ impl VerificationQueue { /// Creates a new queue instance. pub fn new(config: Config, engine: Arc, message_channel: IoChannel, check_seal: bool) -> Self { let verification = Arc::new(Verification { - unverified: Mutex::new(VecDeque::new()), - verifying: Mutex::new(VecDeque::new()), - verified: Mutex::new(VecDeque::new()), + unverified: LenCachingMutex::new(VecDeque::new()), + verifying: LenCachingMutex::new(VecDeque::new()), + verified: LenCachingMutex::new(VecDeque::new()), bad: Mutex::new(HashSet::new()), sizes: Sizes { unverified: AtomicUsize::new(0), @@ -332,7 +333,7 @@ impl VerificationQueue { return; } - wait.wait(&mut unverified); + wait.wait(unverified.inner_mut()); } if let State::Exit = *state.0.lock() { @@ -453,7 +454,7 @@ impl VerificationQueue { pub fn flush(&self) { let mut unverified = self.verification.unverified.lock(); while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() { - self.empty.wait(&mut unverified); + self.empty.wait(unverified.inner_mut()); } } @@ -595,18 +596,18 @@ impl VerificationQueue { use std::mem::size_of; let (unverified_len, unverified_bytes) = { - let len = self.verification.unverified.lock().len(); + let len = self.verification.unverified.load_len(); let size = self.verification.sizes.unverified.load(AtomicOrdering::Acquire); (len, size + len * size_of::()) }; let (verifying_len, verifying_bytes) = { - let len = self.verification.verifying.lock().len(); + let len = self.verification.verifying.load_len(); let size = self.verification.sizes.verifying.load(AtomicOrdering::Acquire); (len, size + len * size_of::>()) }; let (verified_len, verified_bytes) = { - let len = self.verification.verified.lock().len(); + let len = self.verification.verified.load_len(); let size = self.verification.sizes.verified.load(AtomicOrdering::Acquire); (len, size + len * size_of::()) }; diff --git a/util/len-caching-mutex/Cargo.toml b/util/len-caching-mutex/Cargo.toml new file mode 100644 index 000000000..b7d0de325 --- /dev/null +++ b/util/len-caching-mutex/Cargo.toml @@ -0,0 +1,10 @@ +[package] +description = "Mutex with cached len, for use with collections" +homepage = "http://parity.io" +license = "GPL-3.0" +name = "len-caching-mutex" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +parking_lot = "0.6" diff --git a/util/len-caching-mutex/src/lib.rs b/util/len-caching-mutex/src/lib.rs new file mode 100644 index 000000000..03a52c56c --- /dev/null +++ b/util/len-caching-mutex/src/lib.rs @@ -0,0 +1,138 @@ +// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . +extern crate parking_lot; + +use std::collections::VecDeque; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +use parking_lot::{Mutex, MutexGuard}; + +/// Implement to allow a type with a len() method to be used +/// with [`LenCachingMutex`](struct.LenCachingMutex.html) +pub trait Len { + fn len(&self) -> usize; +} + +impl Len for Vec { + fn len(&self) -> usize { self.len() } +} + +impl Len for VecDeque { + fn len(&self) -> usize { self.len() } +} + +/// Can be used in place of a `Mutex` where reading `T`'s `len()` without +/// needing to lock, is advantageous. +/// When the Guard is released, `T`'s `len()` will be cached. +/// The cached `len()` may be at most 1 lock behind current state. +pub struct LenCachingMutex { + data: Mutex, + len: AtomicUsize, +} + +impl LenCachingMutex { + pub fn new(data: T) -> LenCachingMutex { + LenCachingMutex { + len: AtomicUsize::new(data.len()), + data: Mutex::new(data), + } + } + + /// Load the most recent value returned from your `T`'s `len()` + pub fn load_len(&self) -> usize { + self.len.load(Ordering::SeqCst) + } + + pub fn lock(&self) -> Guard { + Guard { + mutex_guard: self.data.lock(), + len: &self.len, + } + } + + pub fn try_lock(&self) -> Option> { + Some( Guard { + mutex_guard: self.data.try_lock()?, + len: &self.len, + }) + } +} + +pub struct Guard<'a, T: Len + 'a> { + mutex_guard: MutexGuard<'a, T>, + len: &'a AtomicUsize, +} + +impl<'a, T: Len> Guard<'a, T> { + pub fn inner_mut(&mut self) -> &mut MutexGuard<'a, T> { + &mut self.mutex_guard + } + + pub fn inner(&self) -> &MutexGuard<'a, T> { + &self.mutex_guard + } +} + +impl<'a, T: Len> Drop for Guard<'a, T> { + fn drop(&mut self) { + self.len.store(self.mutex_guard.len(), Ordering::SeqCst); + } +} + +impl<'a, T: Len> Deref for Guard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + self.mutex_guard.deref() + } +} + +impl<'a, T: Len> DerefMut for Guard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.mutex_guard.deref_mut() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::VecDeque; + + #[test] + fn caches_len() { + let v = vec![1,2,3]; + let lcm = LenCachingMutex::new(v); + assert_eq!(lcm.load_len(), 3); + lcm.lock().push(4); + assert_eq!(lcm.load_len(), 4); + } + + #[test] + fn works_with_vec() { + let v: Vec = Vec::new(); + let lcm = LenCachingMutex::new(v); + assert!(lcm.lock().is_empty()); + } + + #[test] + fn works_with_vecdeque() { + let v: VecDeque = VecDeque::new(); + let lcm = LenCachingMutex::new(v); + lcm.lock().push_front(4); + assert_eq!(lcm.load_len(), 1); + } +}