blockqueue flush

This commit is contained in:
arkpar 2016-01-25 19:20:34 +01:00
parent 1dbae06a83
commit a43ca9ae34
2 changed files with 20 additions and 9 deletions

View File

@ -35,6 +35,7 @@ pub struct BlockQueue {
verifiers: Vec<JoinHandle<()>>, verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<Condvar>,
processing: HashSet<H256> processing: HashSet<H256>
} }
@ -79,6 +80,7 @@ impl BlockQueue {
let more_to_verify = Arc::new(Condvar::new()); let more_to_verify = Arc::new(Condvar::new());
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let empty = Arc::new(Condvar::new());
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;
@ -87,8 +89,9 @@ impl BlockQueue {
let engine = engine.clone(); let engine = engine.clone();
let more_to_verify = more_to_verify.clone(); let more_to_verify = more_to_verify.clone();
let ready_signal = ready_signal.clone(); let ready_signal = ready_signal.clone();
let empty = empty.clone();
let deleting = deleting.clone(); let deleting = deleting.clone();
verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting)) verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty))
.expect("Error starting block verification thread")); .expect("Error starting block verification thread"));
} }
BlockQueue { BlockQueue {
@ -99,14 +102,20 @@ impl BlockQueue {
verifiers: verifiers, verifiers: verifiers,
deleting: deleting.clone(), deleting: deleting.clone(),
processing: HashSet::new(), processing: HashSet::new(),
empty: empty.clone(),
} }
} }
fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>) { fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
while !deleting.load(AtomicOrdering::Relaxed) { while !deleting.load(AtomicOrdering::Relaxed) {
{ {
let mut lock = verification.lock().unwrap(); let mut lock = verification.lock().unwrap();
if lock.unverified.is_empty() && lock.verifying.is_empty() {
empty.notify_all();
}
while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) {
lock = wait.wait(lock).unwrap(); lock = wait.wait(lock).unwrap();
} }
@ -176,6 +185,13 @@ impl BlockQueue {
verification.verifying.clear(); verification.verifying.clear();
} }
/// Wait for queue to be empty
pub fn flush(&mut self) {
let mutex: Mutex<()> = Mutex::new(());
let lock = mutex.lock().unwrap();
let _ = self.empty.wait(lock).unwrap();
}
/// Add a block to the queue. /// Add a block to the queue.
pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { pub fn import_block(&mut self, bytes: Bytes) -> ImportResult {
let header = BlockView::new(&bytes).header(); let header = BlockView::new(&bytes).header();

View File

@ -1,5 +1,3 @@
use std::thread;
use std::time;
use util::*; use util::*;
use rocksdb::{Options, DB}; use rocksdb::{Options, DB};
use blockchain::{BlockChain, BlockProvider, CacheSize}; use blockchain::{BlockChain, BlockProvider, CacheSize};
@ -184,10 +182,7 @@ impl Client {
/// Flush the block import queue. /// Flush the block import queue.
pub fn flush_queue(&self) { pub fn flush_queue(&self) {
flushln!("Flushing queue {:?}", self.block_queue.read().unwrap().queue_info()); flushln!("Flushing queue {:?}", self.block_queue.read().unwrap().queue_info());
while self.block_queue.read().unwrap().queue_info().unverified_queue_size > 0 { self.block_queue.write().unwrap().flush();
thread::sleep(time::Duration::from_millis(20));
flushln!("Flushing queue [waited] {:?}", self.block_queue.read().unwrap().queue_info());
}
} }
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion