Incrementally calculate verification queue heap size (#2749)

* incrementally calculate queue heap size

* query the correct queue sizes
This commit is contained in:
Robert Habermeier 2016-10-20 17:19:31 +02:00 committed by Arkadiy Paronyan
parent ae853a7557
commit 7359af8588

View File

@ -18,7 +18,7 @@
//! Sorts them ready for blockchain insertion. //! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self}; use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*; use util::*;
use io::*; use io::*;
@ -83,6 +83,13 @@ pub enum Status {
Unknown, Unknown,
} }
// the internal queue sizes.
struct Sizes {
unverified: AtomicUsize,
verifying: AtomicUsize,
verified: AtomicUsize,
}
/// A queue of items to be verified. Sits between network or other I/O and the `BlockChain`. /// A queue of items to be verified. Sits between network or other I/O and the `BlockChain`.
/// Keeps them in the same order as inserted, minus invalid items. /// Keeps them in the same order as inserted, minus invalid items.
pub struct VerificationQueue<K: Kind> { pub struct VerificationQueue<K: Kind> {
@ -147,6 +154,7 @@ struct Verification<K: Kind> {
bad: Mutex<HashSet<H256>>, bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>, more_to_verify: SMutex<()>,
empty: SMutex<()>, empty: SMutex<()>,
sizes: Sizes,
} }
impl<K: Kind> VerificationQueue<K> { impl<K: Kind> VerificationQueue<K> {
@ -159,7 +167,11 @@ impl<K: Kind> VerificationQueue<K> {
bad: Mutex::new(HashSet::new()), bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()), more_to_verify: SMutex::new(()),
empty: SMutex::new(()), empty: SMutex::new(()),
sizes: Sizes {
unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0),
verified: AtomicUsize::new(0),
}
}); });
let more_to_verify = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
@ -235,6 +247,7 @@ impl<K: Kind> VerificationQueue<K> {
None => continue, None => continue,
}; };
verification.sizes.unverified.fetch_sub(item.heap_size_of_children(), AtomicOrdering::SeqCst);
verifying.push_back(Verifying { hash: item.hash(), output: None }); verifying.push_back(Verifying { hash: item.hash(), output: None });
item item
}; };
@ -247,6 +260,8 @@ impl<K: Kind> VerificationQueue<K> {
for (i, e) in verifying.iter_mut().enumerate() { for (i, e) in verifying.iter_mut().enumerate() {
if e.hash == hash { if e.hash == hash {
idx = Some(i); idx = Some(i);
verification.sizes.verifying.fetch_add(verified.heap_size_of_children(), AtomicOrdering::SeqCst);
e.output = Some(verified); e.output = Some(verified);
break; break;
} }
@ -256,7 +271,7 @@ impl<K: Kind> VerificationQueue<K> {
// we're next! // we're next!
let mut verified = verification.verified.lock(); let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock(); let mut bad = verification.bad.lock();
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
true true
} else { } else {
false false
@ -271,7 +286,7 @@ impl<K: Kind> VerificationQueue<K> {
verifying.retain(|e| e.hash != hash); verifying.retain(|e| e.hash != hash);
if verifying.front().map_or(false, |x| x.output.is_some()) { if verifying.front().map_or(false, |x| x.output.is_some()) {
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
true true
} else { } else {
false false
@ -285,16 +300,24 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>) { fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>, sizes: &Sizes) {
let mut removed_size = 0;
let mut inserted_size = 0;
while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
assert!(verifying.pop_front().is_some()); assert!(verifying.pop_front().is_some());
let size = output.heap_size_of_children();
removed_size += size;
if bad.contains(&output.parent_hash()) { if bad.contains(&output.parent_hash()) {
bad.insert(output.hash()); bad.insert(output.hash());
} else { } else {
inserted_size += size;
verified.push_back(output); verified.push_back(output);
} }
} }
sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst);
sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst);
} }
/// Clear the queue and stop verification activity. /// Clear the queue and stop verification activity.
@ -305,6 +328,12 @@ impl<K: Kind> VerificationQueue<K> {
unverified.clear(); unverified.clear();
verifying.clear(); verifying.clear();
verified.clear(); verified.clear();
let sizes = &self.verification.sizes;
sizes.unverified.store(0, AtomicOrdering::Release);
sizes.verifying.store(0, AtomicOrdering::Release);
sizes.verified.store(0, AtomicOrdering::Release);
self.processing.write().clear(); self.processing.write().clear();
} }
@ -348,6 +377,8 @@ impl<K: Kind> VerificationQueue<K> {
match K::create(input, &*self.engine) { match K::create(input, &*self.engine) {
Ok(item) => { Ok(item) => {
self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst);
self.processing.write().insert(h.clone()); self.processing.write().insert(h.clone());
self.verification.unverified.lock().push_back(item); self.verification.unverified.lock().push_back(item);
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
@ -377,14 +408,18 @@ impl<K: Kind> VerificationQueue<K> {
} }
let mut new_verified = VecDeque::new(); let mut new_verified = VecDeque::new();
let mut removed_size = 0;
for output in verified.drain(..) { for output in verified.drain(..) {
if bad.contains(&output.parent_hash()) { if bad.contains(&output.parent_hash()) {
removed_size += output.heap_size_of_children();
bad.insert(output.hash()); bad.insert(output.hash());
processing.remove(&output.hash()); processing.remove(&output.hash());
} else { } else {
new_verified.push_back(output); new_verified.push_back(output);
} }
} }
self.verification.sizes.verified.fetch_sub(removed_size, AtomicOrdering::SeqCst);
*verified = new_verified; *verified = new_verified;
} }
@ -407,6 +442,9 @@ impl<K: Kind> VerificationQueue<K> {
let count = min(max, verified.len()); let count = min(max, verified.len());
let result = verified.drain(..count).collect::<Vec<_>>(); let result = verified.drain(..count).collect::<Vec<_>>();
let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c);
self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst);
self.ready_signal.reset(); self.ready_signal.reset();
if !verified.is_empty() { if !verified.is_empty() {
self.ready_signal.set_async(); self.ready_signal.set_async();
@ -416,17 +454,23 @@ impl<K: Kind> VerificationQueue<K> {
/// Get queue status. /// Get queue status.
pub fn queue_info(&self) -> QueueInfo { pub fn queue_info(&self) -> QueueInfo {
use std::mem::size_of;
let (unverified_len, unverified_bytes) = { let (unverified_len, unverified_bytes) = {
let v = self.verification.unverified.lock(); let len = self.verification.unverified.lock().len();
(v.len(), v.heap_size_of_children()) let size = self.verification.sizes.unverified.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<K::Unverified>())
}; };
let (verifying_len, verifying_bytes) = { let (verifying_len, verifying_bytes) = {
let v = self.verification.verifying.lock(); let len = self.verification.verifying.lock().len();
(v.len(), v.heap_size_of_children()) let size = self.verification.sizes.verifying.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<Verifying<K>>())
}; };
let (verified_len, verified_bytes) = { let (verified_len, verified_bytes) = {
let v = self.verification.verified.lock(); let len = self.verification.verified.lock().len();
(v.len(), v.heap_size_of_children()) let size = self.verification.sizes.verified.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<K::Verified>())
}; };
QueueInfo { QueueInfo {