From 2d907f3322afb90b2c93bedff5c3d5d4553cd08c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 3 Oct 2016 19:41:00 +0200 Subject: [PATCH 01/11] auto-adjust number of verification threads --- ethcore/src/verification/queue/mod.rs | 164 +++++++++++++++++++++----- 1 file changed, 137 insertions(+), 27 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 3f81d53ce..b35b95e2b 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -61,6 +61,24 @@ impl Default for Config { } } +struct VerifierHandle { + deleting: Arc, + thread: JoinHandle<()>, +} + +impl VerifierHandle { + // signal to the verifier thread that it should conclude its + // operations. + fn conclude(&self) { + self.deleting.store(true, AtomicOrdering::Release); + } + + // join the verifier thread. + fn join(self) { + self.thread.join().unwrap(); + } +} + /// An item which is in the process of being verified. pub struct Verifying { hash: H256, @@ -90,7 +108,7 @@ pub struct VerificationQueue { engine: Arc, more_to_verify: Arc, verification: Arc>, - verifiers: Vec>, + verifiers: Mutex>, deleting: Arc, ready_signal: Arc, empty: Arc, @@ -157,7 +175,7 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let mut verifiers: Vec> = Vec::new(); + let mut verifiers: Vec = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; for i in 0..thread_count { let verification = verification.clone(); @@ -165,29 +183,30 @@ impl VerificationQueue { let more_to_verify = more_to_verify.clone(); let ready_signal = ready_signal.clone(); let empty = empty.clone(); - let deleting = deleting.clone(); + let deleting = Arc::new(AtomicBool::new(false)); let panic_handler = panic_handler.clone(); - verifiers.push( - thread::Builder::new() - .name(format!("Verifier #{}", i)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) - }).unwrap() - }) - .expect("Error starting block verification thread") - ); + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + }).unwrap() + }) + .expect("Error starting block verification thread") + }); } VerificationQueue { engine: engine, panic_handler: panic_handler, - ready_signal: ready_signal.clone(), - more_to_verify: more_to_verify.clone(), - verification: verification.clone(), - verifiers: verifiers, - deleting: deleting.clone(), + ready_signal: ready_signal, + more_to_verify: more_to_verify, + verification: verification, + verifiers: Mutex::new(verifiers), + deleting: deleting, processing: RwLock::new(HashSet::new()), - empty: empty.clone(), + empty: empty, max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), } @@ -420,14 +439,93 @@ impl VerificationQueue { } } - /// Optimise memory footprint of the heap fields. + /// Optimise memory footprint of the heap fields, and adjust the number of threads + /// to better suit the workload. pub fn collect_garbage(&self) { - { - self.verification.unverified.lock().shrink_to_fit(); + // thresholds for adding and removing verifier threads + // these are unbalanced since having all blocks verified + // is the desirable position. + const ADD_THREAD_THRESHOLD: usize = 10; + const DEL_THREAD_THRESHOLD: usize = 20; + + // TODO: sample over 5 or so ticks. + let (u_len, v_len) = { + let u_len = { + let mut v = self.verification.unverified.lock(); + v.shrink_to_fit(); + v.len() + }; + self.verification.verifying.lock().shrink_to_fit(); - self.verification.verified.lock().shrink_to_fit(); - } + + let v_len = { + let mut v = self.verification.verified.lock(); + v.shrink_to_fit(); + v.len() + }; + + (u_len, v_len) + }; self.processing.write().shrink_to_fit(); + + // more than 10x as many unverified as verified. + if v_len * ADD_THREAD_THRESHOLD < u_len { + self.add_verifier(); + } + + // more than 20x as many verified as unverified. + if u_len * DEL_THREAD_THRESHOLD < v_len { + self.remove_verifier(); + } + } + + // add a verifier thread if possible. + fn add_verifier(&self) { + let mut verifiers = self.verifiers.lock(); + let len = verifiers.len(); + if len == ::num_cpus::get() { + return; + } + + debug!(target: "verification", "Adding verification thread #{}", len); + + let deleting = Arc::new(AtomicBool::new(false)); + let panic_handler = self.panic_handler.clone(); + let verification = self.verification.clone(); + let engine = self.engine.clone(); + let wait = self.more_to_verify.clone(); + let ready = self.ready_signal.clone(); + let empty = self.empty.clone(); + + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", len)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, wait, ready, deleting, empty) + }).unwrap() + }) + .expect("Failed to create verifier thread.") + }); + } + + // remove a verifier thread if possible. + fn remove_verifier(&self) { + let mut verifiers = self.verifiers.lock(); + let len = verifiers.len(); + + if len == 1 { + return; + } + + debug!(target: "verification", "Removing verification thread #{}", len); + + if let Some(handle) = verifiers.pop() { + handle.conclude(); + self.more_to_verify.notify_all(); // to ensure it's joinable immediately. + handle.join(); + } } } @@ -442,10 +540,22 @@ impl Drop for VerificationQueue { trace!(target: "shutdown", "[VerificationQueue] Closing..."); self.clear(); self.deleting.store(true, AtomicOrdering::Release); - self.more_to_verify.notify_all(); - for t in self.verifiers.drain(..) { - t.join().unwrap(); + + let mut verifiers = self.verifiers.lock(); + + // first pass to signal conclusion. must be done before + // notify or deadlock possible. + for handle in verifiers.iter() { + handle.conclude(); } + + self.more_to_verify.notify_all(); + + // second pass to join. + for handle in verifiers.drain(..) { + handle.join(); + } + trace!(target: "shutdown", "[VerificationQueue] Closed."); } } From a7b5dff2520c461ea5873e7b7b250f72ca8344d1 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 3 Oct 2016 19:47:07 +0200 Subject: [PATCH 02/11] ethash unsafety cleanup --- ethcore/src/ethereum/ethash.rs | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/ethcore/src/ethereum/ethash.rs b/ethcore/src/ethereum/ethash.rs index 982698a50..1d5d2448e 100644 --- a/ethcore/src/ethereum/ethash.rs +++ b/ethcore/src/ethereum/ethash.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethash::{quick_get_difficulty, slow_get_seedhash, EthashManager, H256 as EH256}; +use ethash::{quick_get_difficulty, slow_get_seedhash, EthashManager}; use common::*; use block::*; use spec::CommonParams; @@ -182,8 +182,8 @@ impl Engine for Ethash { // Commit state so that we can actually figure out the state root. if let Err(e) = fields.state.commit() { - warn!("Encountered error on state commit: {}", e); - } + warn!("Encountered error on state commit: {}", e); + } } fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> result::Result<(), Error> { @@ -202,10 +202,10 @@ impl Engine for Ethash { return Err(From::from(BlockError::DifficultyOutOfBounds(OutOfBounds { min: Some(min_difficulty), max: None, found: header.difficulty().clone() }))) } - let difficulty = Ethash::boundary_to_difficulty(&Ethash::from_ethash(quick_get_difficulty( - &Ethash::to_ethash(header.bare_hash()), + let difficulty = Ethash::boundary_to_difficulty(&H256(quick_get_difficulty( + &header.bare_hash().0, header.nonce().low_u64(), - &Ethash::to_ethash(header.mix_hash()) + &header.mix_hash().0 ))); if &difficulty < header.difficulty() { return Err(From::from(BlockError::InvalidProofOfWork(OutOfBounds { min: Some(header.difficulty().clone()), max: None, found: difficulty }))); @@ -230,10 +230,10 @@ impl Engine for Ethash { Mismatch { expected: self.seal_fields(), found: header.seal().len() } ))); } - let result = self.pow.compute_light(header.number() as u64, &Ethash::to_ethash(header.bare_hash()), header.nonce().low_u64()); - let mix = Ethash::from_ethash(result.mix_hash); - let difficulty = Ethash::boundary_to_difficulty(&Ethash::from_ethash(result.value)); - trace!(target: "miner", "num: {}, seed: {}, h: {}, non: {}, mix: {}, res: {}" , header.number() as u64, Ethash::from_ethash(slow_get_seedhash(header.number() as u64)), header.bare_hash(), header.nonce().low_u64(), Ethash::from_ethash(result.mix_hash), Ethash::from_ethash(result.value)); + let result = self.pow.compute_light(header.number() as u64, &header.bare_hash().0, header.nonce().low_u64()); + let mix = H256(result.mix_hash); + let difficulty = Ethash::boundary_to_difficulty(&H256(result.value)); + trace!(target: "miner", "num: {}, seed: {}, h: {}, non: {}, mix: {}, res: {}" , header.number() as u64, H256(slow_get_seedhash(header.number() as u64)), header.bare_hash(), header.nonce().low_u64(), H256(result.mix_hash), H256(result.value)); if mix != header.mix_hash() { return Err(From::from(BlockError::MismatchedH256SealElement(Mismatch { expected: mix, found: header.mix_hash() }))); } @@ -275,7 +275,7 @@ impl Engine for Ethash { } } -#[cfg_attr(feature="dev", allow(wrong_self_convention))] // to_ethash should take self +#[cfg_attr(feature="dev", allow(wrong_self_convention))] impl Ethash { fn calculate_difficulty(&self, header: &Header, parent: &Header) -> U256 { const EXP_DIFF_PERIOD: u64 = 100000; @@ -337,14 +337,6 @@ impl Ethash { (((U256::one() << 255) / *difficulty) << 1).into() } } - - fn to_ethash(hash: H256) -> EH256 { - unsafe { mem::transmute(hash) } - } - - fn from_ethash(hash: EH256) -> H256 { - unsafe { mem::transmute(hash) } - } } impl Header { From 5e382602dd7a18cebcb6bf47ce24aade482d40f5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 3 Oct 2016 20:09:57 +0200 Subject: [PATCH 03/11] fix logging accuracy --- ethcore/src/verification/queue/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index b35b95e2b..3db7135a6 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -515,11 +515,12 @@ impl VerificationQueue { let mut verifiers = self.verifiers.lock(); let len = verifiers.len(); + // never remove the last thread. if len == 1 { return; } - debug!(target: "verification", "Removing verification thread #{}", len); + debug!(target: "verification", "Removing verification thread #{}", len + 1); if let Some(handle) = verifiers.pop() { handle.conclude(); From 2d28c703d6a98586df5744e52a1de77aa59501c7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 3 Oct 2016 20:36:49 +0200 Subject: [PATCH 04/11] reuse add_verifier instrumentation, rolling sample of 5 ticks --- ethcore/src/verification/queue/mod.rs | 53 ++++++++++++++------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 3db7135a6..ae0555dfa 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -113,6 +113,7 @@ pub struct VerificationQueue { ready_signal: Arc, empty: Arc, processing: RwLock>, + rolling_sample: Mutex>, max_queue_size: usize, max_mem_use: usize, } @@ -175,41 +176,27 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let mut verifiers: Vec = Vec::new(); - let thread_count = max(::num_cpus::get(), 3) - 2; - for i in 0..thread_count { - let verification = verification.clone(); - let engine = engine.clone(); - let more_to_verify = more_to_verify.clone(); - let ready_signal = ready_signal.clone(); - let empty = empty.clone(); - let deleting = Arc::new(AtomicBool::new(false)); - let panic_handler = panic_handler.clone(); - verifiers.push(VerifierHandle { - deleting: deleting.clone(), - thread: thread::Builder::new() - .name(format!("Verifier #{}", i)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) - }).unwrap() - }) - .expect("Error starting block verification thread") - }); - } - VerificationQueue { + let queue = VerificationQueue { engine: engine, panic_handler: panic_handler, ready_signal: ready_signal, more_to_verify: more_to_verify, verification: verification, - verifiers: Mutex::new(verifiers), + verifiers: Mutex::new(Vec::with_capacity(::num_cpus::get())), deleting: deleting, processing: RwLock::new(HashSet::new()), empty: empty, + rolling_sample: Mutex::new(VecDeque::new()), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), + }; + + let thread_count = max(::num_cpus::get(), 3) - 2; + for _ in 0..thread_count { + queue.add_verifier(); } + + queue } fn verify(verification: Arc>, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { @@ -448,7 +435,10 @@ impl VerificationQueue { const ADD_THREAD_THRESHOLD: usize = 10; const DEL_THREAD_THRESHOLD: usize = 20; - // TODO: sample over 5 or so ticks. + // number of ticks to average queue stats over + // when deciding whether to change the number of verifiers. + const SAMPLE_SIZE: usize = 5; + let (u_len, v_len) = { let u_len = { let mut v = self.verification.unverified.lock(); @@ -468,6 +458,17 @@ impl VerificationQueue { }; self.processing.write().shrink_to_fit(); + let (u_len, v_len) = { + let mut sample = self.rolling_sample.lock(); + sample.push_back((u_len, v_len)); + + if sample.len() > SAMPLE_SIZE { + let _ = sample.pop_front(); + } + + sample.iter().cloned().fold((0, 0), |(u_t, v_t), (u_i, v_i)| (u_t + u_i, v_t + v_i)) + }; + // more than 10x as many unverified as verified. if v_len * ADD_THREAD_THRESHOLD < u_len { self.add_verifier(); @@ -520,7 +521,7 @@ impl VerificationQueue { return; } - debug!(target: "verification", "Removing verification thread #{}", len + 1); + debug!(target: "verification", "Removing verification thread #{}", len - 1); if let Some(handle) = verifiers.pop() { handle.conclude(); From abbf3b3c5840ba18872cd7ac442f7cf39def4a0a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 4 Oct 2016 20:09:54 +0200 Subject: [PATCH 05/11] verification-rate based thread scaling --- ethcore/src/verification/queue/mod.rs | 89 +++++++++++++++------------ 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index ae0555dfa..ac0dbf592 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -18,7 +18,7 @@ //! Sorts them ready for blockchain insertion. use std::thread::{JoinHandle, self}; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use util::*; use io::*; @@ -113,7 +113,7 @@ pub struct VerificationQueue { ready_signal: Arc, empty: Arc, processing: RwLock>, - rolling_sample: Mutex>, + ticks_since_adjustment: AtomicUsize, max_queue_size: usize, max_mem_use: usize, } @@ -152,6 +152,8 @@ struct Verification { bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, + verified_count: AtomicUsize, + drained: AtomicUsize, } impl VerificationQueue { @@ -164,7 +166,8 @@ impl VerificationQueue { bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), - + verified_count: AtomicUsize::new(0), + drained: AtomicUsize::new(0), }); let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); @@ -186,7 +189,7 @@ impl VerificationQueue { deleting: deleting, processing: RwLock::new(HashSet::new()), empty: empty, - rolling_sample: Mutex::new(VecDeque::new()), + ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), }; @@ -248,7 +251,7 @@ impl VerificationQueue { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count); ready.set(); } }, @@ -261,7 +264,7 @@ impl VerificationQueue { verifying.retain(|e| e.hash != hash); if verifying.front().map_or(false, |x| x.output.is_some()) { - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count); ready.set(); } } @@ -269,7 +272,13 @@ impl VerificationQueue { } } - fn drain_verifying(verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet) { + fn drain_verifying( + verifying: &mut VecDeque>, + verified: &mut VecDeque, + bad: &mut HashSet, + v_count: &AtomicUsize + ) { + let start_len = verified.len(); while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { assert!(verifying.pop_front().is_some()); @@ -279,6 +288,8 @@ impl VerificationQueue { verified.push_back(output); } } + + v_count.fetch_add(verified.len() - start_len, AtomicOrdering::AcqRel); } /// Clear the queue and stop verification activity. @@ -389,6 +400,8 @@ impl VerificationQueue { let count = min(max, verified.len()); let result = verified.drain(..count).collect::>(); + self.verification.drained.fetch_add(count, AtomicOrdering::AcqRel); + self.ready_signal.reset(); if !verified.is_empty() { self.ready_signal.set(); @@ -429,53 +442,49 @@ impl VerificationQueue { /// Optimise memory footprint of the heap fields, and adjust the number of threads /// to better suit the workload. pub fn collect_garbage(&self) { - // thresholds for adding and removing verifier threads - // these are unbalanced since having all blocks verified - // is the desirable position. - const ADD_THREAD_THRESHOLD: usize = 10; - const DEL_THREAD_THRESHOLD: usize = 20; - // number of ticks to average queue stats over // when deciding whether to change the number of verifiers. - const SAMPLE_SIZE: usize = 5; - - let (u_len, v_len) = { - let u_len = { - let mut v = self.verification.unverified.lock(); - v.shrink_to_fit(); - v.len() - }; + const READJUSTMENT_PERIOD: usize = 5; + { + self.verification.unverified.lock().shrink_to_fit(); self.verification.verifying.lock().shrink_to_fit(); + self.verification.verified.lock().shrink_to_fit(); + } - let v_len = { - let mut v = self.verification.verified.lock(); - v.shrink_to_fit(); - v.len() - }; - - (u_len, v_len) - }; self.processing.write().shrink_to_fit(); - let (u_len, v_len) = { - let mut sample = self.rolling_sample.lock(); - sample.push_back((u_len, v_len)); + if self.ticks_since_adjustment.load(AtomicOrdering::SeqCst) == READJUSTMENT_PERIOD { + self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); + } else { + self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst); + return; + } - if sample.len() > SAMPLE_SIZE { - let _ = sample.pop_front(); - } + let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire); + let drained = self.verification.drained.load(AtomicOrdering::Acquire); - sample.iter().cloned().fold((0, 0), |(u_t, v_t), (u_i, v_i)| (u_t + u_i, v_t + v_i)) + self.verification.verified_count.store(0, AtomicOrdering::Release); + self.verification.drained.store(0, AtomicOrdering::Release); + + // compute the average rate of verification per thread and determine + // how many are necessary to match the rate of draining. + let num_verifiers = self.verifiers.lock().len(); + let v_count_per = v_count as f64 / num_verifiers as f64; + let needed = if v_count < 20 { + 1 + } else { + (drained as f64 / v_count_per as f64).ceil() as usize }; - // more than 10x as many unverified as verified. - if v_len * ADD_THREAD_THRESHOLD < u_len { + trace!(target: "verification", "v_rate_per={}, drained={}, scaling to {} verifiers", + v_count_per, drained, needed); + + for _ in num_verifiers..needed { self.add_verifier(); } - // more than 20x as many verified as unverified. - if u_len * DEL_THREAD_THRESHOLD < v_len { + for _ in needed..num_verifiers { self.remove_verifier(); } } From 8a5576d133a1c770bd4717235f77ba6aae0aafd2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 5 Oct 2016 12:10:28 +0200 Subject: [PATCH 06/11] balance rates of draining and importing --- ethcore/src/verification/queue/mod.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index ac0dbf592..8c6b1e5ca 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -154,6 +154,7 @@ struct Verification { empty: SMutex<()>, verified_count: AtomicUsize, drained: AtomicUsize, + imported: AtomicUsize, } impl VerificationQueue { @@ -168,6 +169,7 @@ impl VerificationQueue { empty: SMutex::new(()), verified_count: AtomicUsize::new(0), drained: AtomicUsize::new(0), + imported: AtomicUsize::new(0), }); let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); @@ -345,6 +347,7 @@ impl VerificationQueue { Ok(item) => { self.processing.write().insert(h.clone()); self.verification.unverified.lock().push_back(item); + self.verification.imported.fetch_add(1, AtomicOrdering::AcqRel); self.more_to_verify.notify_all(); Ok(h) }, @@ -463,9 +466,14 @@ impl VerificationQueue { let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire); let drained = self.verification.drained.load(AtomicOrdering::Acquire); + let imported = self.verification.imported.load(AtomicOrdering::Acquire); self.verification.verified_count.store(0, AtomicOrdering::Release); self.verification.drained.store(0, AtomicOrdering::Release); + self.verification.imported.store(0, AtomicOrdering::Release); + + // select which side of the queue is the bottleneck. + let target = min(drained, imported); // compute the average rate of verification per thread and determine // how many are necessary to match the rate of draining. @@ -474,11 +482,11 @@ impl VerificationQueue { let needed = if v_count < 20 { 1 } else { - (drained as f64 / v_count_per as f64).ceil() as usize + (target as f64 / v_count_per as f64).ceil() as usize }; - trace!(target: "verification", "v_rate_per={}, drained={}, scaling to {} verifiers", - v_count_per, drained, needed); + trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers", + v_count_per, target, needed); for _ in num_verifiers..needed { self.add_verifier(); From 44dcd6bc3b56ae80dc8ef6a9a0d81c4d7bb438dd Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 17 Nov 2016 13:10:33 +0100 Subject: [PATCH 07/11] increase readjustment period --- ethcore/src/verification/queue/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 2376215f0..23f82b730 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -518,7 +518,7 @@ impl VerificationQueue { pub fn collect_garbage(&self) { // number of ticks to average queue stats over // when deciding whether to change the number of verifiers. - const READJUSTMENT_PERIOD: usize = 5; + const READJUSTMENT_PERIOD: usize = 12; { self.verification.unverified.lock().shrink_to_fit(); From 546cd0065925279c38d82220d1d6a2de0dc21214 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 17 Nov 2016 16:00:23 +0100 Subject: [PATCH 08/11] allocate verifiers up front, hibernate when not needed --- ethcore/src/verification/queue/mod.rs | 166 ++++++++++++++++---------- 1 file changed, 103 insertions(+), 63 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 23f82b730..f4a9287d6 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -35,6 +35,9 @@ pub mod kind; const MIN_MEM_LIMIT: usize = 16384; const MIN_QUEUE_LIMIT: usize = 512; +// maximum possible number of verification threads. +const MAX_VERIFIERS: usize = 8; + /// Type alias for block queue convenience. pub type BlockQueue = VerificationQueue; @@ -63,13 +66,26 @@ impl Default for Config { struct VerifierHandle { deleting: Arc, + sleep: Arc<(Mutex, Condvar)>, thread: JoinHandle<()>, } impl VerifierHandle { + // signal to the verifier thread that it should sleep. + fn sleep(&self) { + *self.sleep.0.lock() = true; + } + + // signal to the verifier thread that it should wake up. + fn wake_up(&self) { + *self.sleep.0.lock() = false; + self.sleep.1.notify_all(); + } + // signal to the verifier thread that it should conclude its // operations. fn conclude(&self) { + self.wake_up(); self.deleting.store(true, AtomicOrdering::Release); } @@ -115,7 +131,7 @@ pub struct VerificationQueue { engine: Arc, more_to_verify: Arc, verification: Arc>, - verifiers: Mutex>, + verifiers: Mutex<(Vec, usize)>, deleting: Arc, ready_signal: Arc, empty: Arc, @@ -212,31 +228,83 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let panic_handler = PanicHandler::new_in_arc(); - let queue = VerificationQueue { + let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS); + let default_amount = max(::num_cpus::get(), 3) - 2; + let mut verifiers = Vec::with_capacity(max_verifiers); + + debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount); + + for i in 0..max_verifiers { + debug!(target: "verification", "Adding verification thread #{}", i); + + let deleting = deleting.clone(); + let panic_handler = panic_handler.clone(); + let verification = verification.clone(); + let engine = engine.clone(); + let wait = more_to_verify.clone(); + let ready = ready_signal.clone(); + let empty = empty.clone(); + + // enable only the first few verifiers. + let sleep = if i < default_amount { + Arc::new((Mutex::new(false), Condvar::new())) + } else { + Arc::new((Mutex::new(true), Condvar::new())) + }; + + verifiers.push(VerifierHandle { + deleting: deleting.clone(), + sleep: sleep.clone(), + thread: thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + panic_handler.catch_panic(move || { + VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep) + }).unwrap() + }) + .expect("Failed to create verifier thread.") + }); + } + + VerificationQueue { engine: engine, panic_handler: panic_handler, ready_signal: ready_signal, more_to_verify: more_to_verify, verification: verification, - verifiers: Mutex::new(Vec::with_capacity(::num_cpus::get())), + verifiers: Mutex::new((verifiers, default_amount)), deleting: deleting, processing: RwLock::new(HashSet::new()), empty: empty, ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), - }; - - let thread_count = max(::num_cpus::get(), 3) - 2; - for _ in 0..thread_count { - queue.add_verifier(); } - - queue } - fn verify(verification: Arc>, engine: Arc, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { + fn verify( + verification: Arc>, + engine: Arc, + wait: Arc, + ready: Arc, + deleting: Arc, + empty: Arc, + sleep: Arc<(Mutex, Condvar)>, + ) { while !deleting.load(AtomicOrdering::Acquire) { + { + let mut should_sleep = sleep.0.lock(); + while *should_sleep { + trace!(target: "verification", "Verifier sleeping"); + sleep.1.wait(&mut should_sleep); + trace!(target: "verification", "Verifier waking up"); + + if deleting.load(AtomicOrdering::Acquire) { + return; + } + } + } + { let mut more_to_verify = verification.more_to_verify.lock().unwrap(); @@ -548,7 +616,7 @@ impl VerificationQueue { // compute the average rate of verification per thread and determine // how many are necessary to match the rate of draining. - let num_verifiers = self.verifiers.lock().len(); + let num_verifiers = self.verifiers.lock().1; let v_count_per = v_count as f64 / num_verifiers as f64; let needed = if v_count < 20 { 1 @@ -559,63 +627,34 @@ impl VerificationQueue { trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers", v_count_per, target, needed); - for _ in num_verifiers..needed { - self.add_verifier(); - } - - for _ in needed..num_verifiers { - self.remove_verifier(); - } + self.scale_verifiers(needed); } - // add a verifier thread if possible. - fn add_verifier(&self) { + // wake up or sleep verifiers to get as close to the target as + // possible, never going over the amount of initially allocated threads + // or below 1. + fn scale_verifiers(&self, target: usize) { let mut verifiers = self.verifiers.lock(); - let len = verifiers.len(); - if len == ::num_cpus::get() { - return; + let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers; + + let target = min(verifiers.capacity(), target); + let target = max(1, target); + + debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target); + + // scaling up + for i in *verifier_count..target { + debug!(target: "verification", "Waking up verifier {}", i); + verifiers[i].wake_up(); } - debug!(target: "verification", "Adding verification thread #{}", len); - - let deleting = Arc::new(AtomicBool::new(false)); - let panic_handler = self.panic_handler.clone(); - let verification = self.verification.clone(); - let engine = self.engine.clone(); - let wait = self.more_to_verify.clone(); - let ready = self.ready_signal.clone(); - let empty = self.empty.clone(); - - verifiers.push(VerifierHandle { - deleting: deleting.clone(), - thread: thread::Builder::new() - .name(format!("Verifier #{}", len)) - .spawn(move || { - panic_handler.catch_panic(move || { - VerificationQueue::verify(verification, engine, wait, ready, deleting, empty) - }).unwrap() - }) - .expect("Failed to create verifier thread.") - }); - } - - // remove a verifier thread if possible. - fn remove_verifier(&self) { - let mut verifiers = self.verifiers.lock(); - let len = verifiers.len(); - - // never remove the last thread. - if len == 1 { - return; + // scaling down. + for i in target..*verifier_count { + debug!(target: "verification", "Putting verifier {} to sleep", i); + verifiers[i].sleep(); } - debug!(target: "verification", "Removing verification thread #{}", len - 1); - - if let Some(handle) = verifiers.pop() { - handle.conclude(); - self.more_to_verify.notify_all(); // to ensure it's joinable immediately. - handle.join(); - } + *verifier_count = target; } } @@ -631,7 +670,8 @@ impl Drop for VerificationQueue { self.clear(); self.deleting.store(true, AtomicOrdering::Release); - let mut verifiers = self.verifiers.lock(); + let mut verifiers = self.verifiers.get_mut(); + let mut verifiers = &mut verifiers.0; // first pass to signal conclusion. must be done before // notify or deadlock possible. From 8c0e511ebe0a5541fa804ad2a8393f12afec1d6e Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 17 Nov 2016 18:10:09 +0100 Subject: [PATCH 09/11] rewrite scaling logic --- ethcore/src/verification/queue/mod.rs | 72 ++++++++++++--------------- 1 file changed, 31 insertions(+), 41 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index f4a9287d6..b4f2ab5a2 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -191,9 +191,6 @@ struct Verification { bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, - verified_count: AtomicUsize, - drained: AtomicUsize, - imported: AtomicUsize, sizes: Sizes, check_seal: bool, } @@ -208,9 +205,6 @@ impl VerificationQueue { bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), - verified_count: AtomicUsize::new(0), - drained: AtomicUsize::new(0), - imported: AtomicUsize::new(0), sizes: Sizes { unverified: AtomicUsize::new(0), verifying: AtomicUsize::new(0), @@ -355,7 +349,7 @@ impl VerificationQueue { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count, &verification.sizes); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); true } else { false @@ -370,7 +364,7 @@ impl VerificationQueue { verifying.retain(|e| e.hash != hash); if verifying.front().map_or(false, |x| x.output.is_some()) { - VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.verified_count, &verification.sizes); + VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); true } else { false @@ -388,10 +382,8 @@ impl VerificationQueue { verifying: &mut VecDeque>, verified: &mut VecDeque, bad: &mut HashSet, - v_count: &AtomicUsize, sizes: &Sizes, ) { - let start_len = verified.len(); let mut removed_size = 0; let mut inserted_size = 0; @@ -408,7 +400,6 @@ impl VerificationQueue { } } - v_count.fetch_add(verified.len() - start_len, AtomicOrdering::AcqRel); sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst); sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst); } @@ -474,7 +465,6 @@ impl VerificationQueue { self.processing.write().insert(h.clone()); self.verification.unverified.lock().push_back(item); - self.verification.imported.fetch_add(1, AtomicOrdering::AcqRel); self.more_to_verify.notify_all(); Ok(h) }, @@ -536,8 +526,6 @@ impl VerificationQueue { let count = min(max, verified.len()); let result = verified.drain(..count).collect::>(); - self.verification.drained.fetch_add(result.len(), AtomicOrdering::AcqRel); - let drained_size = result.iter().map(HeapSizeOf::heap_size_of_children).fold(0, |a, c| a + c); self.verification.sizes.verified.fetch_sub(drained_size, AtomicOrdering::SeqCst); @@ -588,11 +576,22 @@ impl VerificationQueue { // when deciding whether to change the number of verifiers. const READJUSTMENT_PERIOD: usize = 12; - { - self.verification.unverified.lock().shrink_to_fit(); + let (u_len, v_len) = { + let u_len = { + let mut q = self.verification.unverified.lock(); + q.shrink_to_fit(); + q.len() + }; self.verification.verifying.lock().shrink_to_fit(); - self.verification.verified.lock().shrink_to_fit(); - } + + let v_len = { + let mut q = self.verification.verified.lock(); + q.shrink_to_fit(); + q.len() + }; + + (u_len as isize, v_len as isize) + }; self.processing.write().shrink_to_fit(); @@ -603,31 +602,22 @@ impl VerificationQueue { return; } - let v_count = self.verification.verified_count.load(AtomicOrdering::Acquire); - let drained = self.verification.drained.load(AtomicOrdering::Acquire); - let imported = self.verification.imported.load(AtomicOrdering::Acquire); + let current = self.verifiers.lock().1; - self.verification.verified_count.store(0, AtomicOrdering::Release); - self.verification.drained.store(0, AtomicOrdering::Release); - self.verification.imported.store(0, AtomicOrdering::Release); + let diff = (v_len - u_len).abs(); + let total = v_len + u_len; - // select which side of the queue is the bottleneck. - let target = min(drained, imported); - - // compute the average rate of verification per thread and determine - // how many are necessary to match the rate of draining. - let num_verifiers = self.verifiers.lock().1; - let v_count_per = v_count as f64 / num_verifiers as f64; - let needed = if v_count < 20 { - 1 - } else { - (target as f64 / v_count_per as f64).ceil() as usize - }; - - trace!(target: "verification", "v_rate_per={}, target={}, scaling to {} verifiers", - v_count_per, target, needed); - - self.scale_verifiers(needed); + self.scale_verifiers( + if u_len < 20 { + 1 + } else if diff <= total / 10 { + current + } else if v_len > u_len { + current - 1 + } else { + current + 1 + } + ); } // wake up or sleep verifiers to get as close to the target as From 133796b7ffb446916f1105cfab08d84b25557cc1 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 21 Nov 2016 14:23:34 +0100 Subject: [PATCH 10/11] queue: verifier scaling tests --- ethcore/src/verification/queue/mod.rs | 61 +++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index b4f2ab5a2..0b39dd769 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -574,8 +574,12 @@ impl VerificationQueue { pub fn collect_garbage(&self) { // number of ticks to average queue stats over // when deciding whether to change the number of verifiers. + #[cfg(not(test))] const READJUSTMENT_PERIOD: usize = 12; + #[cfg(test)] + const READJUSTMENT_PERIOD: usize = 1; + let (u_len, v_len) = { let u_len = { let mut q = self.verification.unverified.lock(); @@ -595,10 +599,9 @@ impl VerificationQueue { self.processing.write().shrink_to_fit(); - if self.ticks_since_adjustment.load(AtomicOrdering::SeqCst) == READJUSTMENT_PERIOD { + if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD { self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst); } else { - self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst); return; } @@ -627,7 +630,7 @@ impl VerificationQueue { let mut verifiers = self.verifiers.lock(); let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers; - let target = min(verifiers.capacity(), target); + let target = min(verifiers.len(), target); let target = max(1, target); debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target); @@ -774,4 +777,56 @@ mod tests { } assert!(queue.queue_info().is_full()); } + + #[test] + fn scaling_limits() { + use super::MAX_VERIFIERS; + + let queue = get_test_queue(); + queue.scale_verifiers(MAX_VERIFIERS + 1); + + assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1); + + queue.scale_verifiers(0); + + assert!(queue.verifiers.lock().1 == 1); + } + + #[test] + fn readjust_verifiers() { + let queue = get_test_queue(); + + // put all the verifiers to sleep to ensure + // the test isn't timing sensitive. + let num_verifiers = { + let verifiers = queue.verifiers.lock(); + for i in 0..verifiers.1 { + verifiers.0[i].sleep(); + } + + verifiers.1 + }; + + for block in get_good_dummy_block_seq(5000) { + queue.import(Unverified::new(block)).expect("Block good by definition; qed"); + } + + // almost all unverified == bump verifier count. + queue.collect_garbage(); + assert_eq!(queue.verifiers.lock().1, num_verifiers + 1); + + // wake them up again and verify everything. + { + let verifiers = queue.verifiers.lock(); + for i in 0..verifiers.1 { + verifiers.0[i].wake_up(); + } + } + + queue.flush(); + + // nothing to verify == use minimum number of verifiers. + queue.collect_garbage(); + assert_eq!(queue.verifiers.lock().1, 1); + } } From 53afb8d22d5e2b799114891cf4888cae7a47081c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 21 Nov 2016 14:48:25 +0100 Subject: [PATCH 11/11] queue: park directly instead of through condvar --- ethcore/src/verification/queue/mod.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 0b39dd769..686a1d093 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -66,20 +66,20 @@ impl Default for Config { struct VerifierHandle { deleting: Arc, - sleep: Arc<(Mutex, Condvar)>, + sleep: Arc, thread: JoinHandle<()>, } impl VerifierHandle { // signal to the verifier thread that it should sleep. fn sleep(&self) { - *self.sleep.0.lock() = true; + self.sleep.store(true, AtomicOrdering::SeqCst); } // signal to the verifier thread that it should wake up. fn wake_up(&self) { - *self.sleep.0.lock() = false; - self.sleep.1.notify_all(); + self.sleep.store(false, AtomicOrdering::SeqCst); + self.thread.thread().unpark(); } // signal to the verifier thread that it should conclude its @@ -91,7 +91,7 @@ impl VerifierHandle { // join the verifier thread. fn join(self) { - self.thread.join().unwrap(); + self.thread.join().expect("Verifier thread panicked"); } } @@ -241,9 +241,9 @@ impl VerificationQueue { // enable only the first few verifiers. let sleep = if i < default_amount { - Arc::new((Mutex::new(false), Condvar::new())) + Arc::new(AtomicBool::new(false)) } else { - Arc::new((Mutex::new(true), Condvar::new())) + Arc::new(AtomicBool::new(true)) }; verifiers.push(VerifierHandle { @@ -283,14 +283,13 @@ impl VerificationQueue { ready: Arc, deleting: Arc, empty: Arc, - sleep: Arc<(Mutex, Condvar)>, + sleep: Arc, ) { while !deleting.load(AtomicOrdering::Acquire) { { - let mut should_sleep = sleep.0.lock(); - while *should_sleep { + while sleep.load(AtomicOrdering::SeqCst) { trace!(target: "verification", "Verifier sleeping"); - sleep.1.wait(&mut should_sleep); + ::std::thread::park(); trace!(target: "verification", "Verifier waking up"); if deleting.load(AtomicOrdering::Acquire) {