diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 313ab02a6..cf6ca3f53 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -18,7 +18,7 @@ //! Sorts them ready for blockchain insertion. use std::thread::{JoinHandle, self}; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use util::*; use io::*; @@ -83,6 +83,13 @@ pub enum Status { Unknown, } +// the internal queue sizes. +struct Sizes { + unverified: AtomicUsize, + verifying: AtomicUsize, + verified: AtomicUsize, +} + /// A queue of items to be verified. Sits between network or other I/O and the `BlockChain`. /// Keeps them in the same order as inserted, minus invalid items. pub struct VerificationQueue { @@ -147,6 +154,7 @@ struct Verification { bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, + sizes: Sizes, } impl VerificationQueue { @@ -159,7 +167,11 @@ impl VerificationQueue { bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), - + sizes: Sizes { + unverified: AtomicUsize::new(0), + verifying: AtomicUsize::new(0), + verified: AtomicUsize::new(0), + } }); let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); @@ -235,6 +247,7 @@ impl VerificationQueue { None => continue, }; + verification.sizes.unverified.fetch_sub(item.heap_size_of_children(), AtomicOrdering::SeqCst); verifying.push_back(Verifying { hash: item.hash(), output: None }); item }; @@ -247,6 +260,8 @@ impl VerificationQueue { for (i, e) in verifying.iter_mut().enumerate() { if e.hash == hash { idx = Some(i); + + verification.sizes.verifying.fetch_add(verified.heap_size_of_children(), AtomicOrdering::SeqCst); e.output = Some(verified); break; } @@ -256,7 +271,7 @@ impl VerificationQueue { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); true } else { false @@ -271,7 +286,7 @@ impl VerificationQueue { verifying.retain(|e| e.hash != hash); if verifying.front().map_or(false, |x| x.output.is_some()) { - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); true } else { false @@ -285,16 +300,24 @@ impl VerificationQueue { } } - fn drain_verifying(verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet) { + fn drain_verifying(verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet, sizes: &Sizes) { + let mut removed_size = 0; + let mut inserted_size = 0; while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { assert!(verifying.pop_front().is_some()); + let size = output.heap_size_of_children(); + removed_size += size; if bad.contains(&output.parent_hash()) { bad.insert(output.hash()); } else { + inserted_size += size; verified.push_back(output); } } + + sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst); + sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst); } /// Clear the queue and stop verification activity. @@ -305,6 +328,12 @@ impl VerificationQueue { unverified.clear(); verifying.clear(); verified.clear(); + + let sizes = &self.verification.sizes; + sizes.unverified.store(0, AtomicOrdering::Release); + sizes.verifying.store(0, AtomicOrdering::Release); + sizes.verified.store(0, AtomicOrdering::Release); + self.processing.write().clear(); } @@ -348,6 +377,8 @@ impl VerificationQueue { match K::create(input, &*self.engine) { Ok(item) => { + self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst); + self.processing.write().insert(h.clone()); self.verification.unverified.lock().push_back(item); self.more_to_verify.notify_all(); @@ -377,14 +408,18 @@ impl VerificationQueue { } let mut new_verified = VecDeque::new(); + let mut removed_size = 0; for output in verified.drain(..) { if bad.contains(&output.parent_hash()) { + removed_size += output.heap_size_of_children(); bad.insert(output.hash()); processing.remove(&output.hash()); } else { new_verified.push_back(output); } } + + self.verification.sizes.verified.fetch_sub(removed_size, AtomicOrdering::SeqCst); *verified = new_verified; } @@ -407,6 +442,9 @@ impl VerificationQueue { let count = min(max, verified.len()); let result = verified.drain(..count).collect::>(); + let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c); + self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst); + self.ready_signal.reset(); if !verified.is_empty() { self.ready_signal.set_async(); @@ -416,17 +454,23 @@ impl VerificationQueue { /// Get queue status. pub fn queue_info(&self) -> QueueInfo { + use std::mem::size_of; + let (unverified_len, unverified_bytes) = { - let v = self.verification.unverified.lock(); - (v.len(), v.heap_size_of_children()) + let len = self.verification.unverified.lock().len(); + let size = self.verification.sizes.unverified.load(AtomicOrdering::Acquire); + + (len, size + len * size_of::()) }; let (verifying_len, verifying_bytes) = { - let v = self.verification.verifying.lock(); - (v.len(), v.heap_size_of_children()) + let len = self.verification.verifying.lock().len(); + let size = self.verification.sizes.verifying.load(AtomicOrdering::Acquire); + (len, size + len * size_of::>()) }; let (verified_len, verified_bytes) = { - let v = self.verification.verified.lock(); - (v.len(), v.heap_size_of_children()) + let len = self.verification.verified.lock().len(); + let size = self.verification.sizes.verified.load(AtomicOrdering::Acquire); + (len, size + len * size_of::()) }; QueueInfo {