From a43ca9ae34d7dcd299cd82bc1ee36ac15464f9bb Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 25 Jan 2016 19:20:34 +0100 Subject: [PATCH] blockqueue flush --- src/block_queue.rs | 22 +++++++++++++++++++--- src/client.rs | 7 +------ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/block_queue.rs b/src/block_queue.rs index 36539bfff..a0e46193b 100644 --- a/src/block_queue.rs +++ b/src/block_queue.rs @@ -35,6 +35,7 @@ pub struct BlockQueue { verifiers: Vec>, deleting: Arc, ready_signal: Arc, + empty: Arc, processing: HashSet } @@ -79,6 +80,7 @@ impl BlockQueue { let more_to_verify = Arc::new(Condvar::new()); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); + let empty = Arc::new(Condvar::new()); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; @@ -87,8 +89,9 @@ impl BlockQueue { let engine = engine.clone(); let more_to_verify = more_to_verify.clone(); let ready_signal = ready_signal.clone(); + let empty = empty.clone(); let deleting = deleting.clone(); - verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting)) + verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)) .expect("Error starting block verification thread")); } BlockQueue { @@ -99,14 +102,20 @@ impl BlockQueue { verifiers: verifiers, deleting: deleting.clone(), processing: HashSet::new(), + empty: empty.clone(), } } - fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc) { + fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc, empty: Arc) { while !deleting.load(AtomicOrdering::Relaxed) { - { let mut lock = verification.lock().unwrap(); + + if lock.unverified.is_empty() && lock.verifying.is_empty() { + empty.notify_all(); + } + + while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { lock = wait.wait(lock).unwrap(); } @@ -176,6 +185,13 @@ impl BlockQueue { verification.verifying.clear(); } + /// Wait for queue to be empty + pub fn flush(&mut self) { + let mutex: Mutex<()> = Mutex::new(()); + let lock = mutex.lock().unwrap(); + let _ = self.empty.wait(lock).unwrap(); + } + /// Add a block to the queue. pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { let header = BlockView::new(&bytes).header(); diff --git a/src/client.rs b/src/client.rs index 795bca546..f05819370 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,3 @@ -use std::thread; -use std::time; use util::*; use rocksdb::{Options, DB}; use blockchain::{BlockChain, BlockProvider, CacheSize}; @@ -184,10 +182,7 @@ impl Client { /// Flush the block import queue. pub fn flush_queue(&self) { flushln!("Flushing queue {:?}", self.block_queue.read().unwrap().queue_info()); - while self.block_queue.read().unwrap().queue_info().unverified_queue_size > 0 { - thread::sleep(time::Duration::from_millis(20)); - flushln!("Flushing queue [waited] {:?}", self.block_queue.read().unwrap().queue_info()); - } + self.block_queue.write().unwrap().flush(); } /// This is triggered by a message coming from a block queue when the block is ready for insertion