Use std::sync::Condvar (#1732)

test
This commit is contained in:
Arkadiy Paronyan 2016-07-27 11:39:24 +02:00 committed by Gav Wood
parent 4cb4344542
commit c65ee93542
3 changed files with 33 additions and 26 deletions

View File

@ -18,6 +18,7 @@
//! Sorts them ready for blockchain insertion. //! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self}; use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*; use util::*;
use verification::*; use verification::*;
use error::*; use error::*;
@ -80,12 +81,12 @@ impl BlockQueueInfo {
pub struct BlockQueue { pub struct BlockQueue {
panic_handler: Arc<PanicHandler>, panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>, engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>, more_to_verify: Arc<SCondvar>,
verification: Arc<Verification>, verification: Arc<Verification>,
verifiers: Vec<JoinHandle<()>>, verifiers: Vec<JoinHandle<()>>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<Condvar>, empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>, processing: RwLock<HashSet<H256>>,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
@ -133,6 +134,8 @@ struct Verification {
verified: Mutex<VecDeque<PreverifiedBlock>>, verified: Mutex<VecDeque<PreverifiedBlock>>,
verifying: Mutex<VecDeque<VerifyingBlock>>, verifying: Mutex<VecDeque<VerifyingBlock>>,
bad: Mutex<HashSet<H256>>, bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
} }
impl BlockQueue { impl BlockQueue {
@ -143,15 +146,18 @@ impl BlockQueue {
verified: Mutex::new(VecDeque::new()), verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()), verifying: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()), bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),
}); });
let more_to_verify = Arc::new(Condvar::new()); let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let ready_signal = Arc::new(QueueSignal { let ready_signal = Arc::new(QueueSignal {
deleting: deleting.clone(), deleting: deleting.clone(),
signalled: AtomicBool::new(false), signalled: AtomicBool::new(false),
message_channel: message_channel message_channel: message_channel
}); });
let empty = Arc::new(Condvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
@ -190,17 +196,17 @@ impl BlockQueue {
} }
} }
fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) { fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
while !deleting.load(AtomicOrdering::Acquire) { while !deleting.load(AtomicOrdering::Acquire) {
{ {
let mut unverified = verification.unverified.lock(); let mut more_to_verify = verification.more_to_verify.lock().unwrap();
if unverified.is_empty() && verification.verifying.lock().is_empty() { if verification.unverified.lock().is_empty() && verification.verifying.lock().is_empty() {
empty.notify_all(); empty.notify_all();
} }
while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) { while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) {
wait.wait(&mut unverified); more_to_verify = wait.wait(more_to_verify).unwrap();
} }
if deleting.load(AtomicOrdering::Acquire) { if deleting.load(AtomicOrdering::Acquire) {
@ -276,9 +282,9 @@ impl BlockQueue {
/// Wait for unverified queue to be empty /// Wait for unverified queue to be empty
pub fn flush(&self) { pub fn flush(&self) {
let mut unverified = self.verification.unverified.lock(); let mut lock = self.verification.empty.lock().unwrap();
while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() { while !self.verification.unverified.lock().is_empty() || !self.verification.verifying.lock().is_empty() {
self.empty.wait(&mut unverified); lock = self.empty.wait(lock).unwrap();
} }
} }

View File

@ -25,7 +25,8 @@ use io::{IoError, IoHandler};
use io::worker::{Worker, Work, WorkType}; use io::worker::{Worker, Work, WorkType};
use panics::*; use panics::*;
use parking_lot::{Condvar, RwLock, Mutex}; use parking_lot::{RwLock};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
/// Timer ID /// Timer ID
pub type TimerToken = usize; pub type TimerToken = usize;
@ -169,7 +170,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>, handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
workers: Vec<Worker>, workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>, worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<Condvar>, work_ready: Arc<SCondvar>,
} }
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
@ -177,8 +178,8 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> { pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
let (worker, stealer) = chase_lev::deque(); let (worker, stealer) = chase_lev::deque();
let num_workers = 4; let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready_mutex = Arc::new(SMutex::new(()));
let work_ready = Arc::new(Condvar::new()); let work_ready = Arc::new(SCondvar::new());
let workers = (0..num_workers).map(|i| let workers = (0..num_workers).map(|i|
Worker::new( Worker::new(
i, i,

View File

@ -23,7 +23,7 @@ use io::service::{HandlerId, IoChannel, IoContext};
use io::{IoHandler}; use io::{IoHandler};
use panics::*; use panics::*;
use parking_lot::{Condvar, Mutex}; use std::sync::{Condvar as SCondvar, Mutex as SMutex};
pub enum WorkType<Message> { pub enum WorkType<Message> {
Readable, Readable,
@ -44,9 +44,9 @@ pub struct Work<Message> {
/// Sorts them ready for blockchain insertion. /// Sorts them ready for blockchain insertion.
pub struct Worker { pub struct Worker {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>, wait: Arc<SCondvar>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
wait_mutex: Arc<Mutex<()>>, wait_mutex: Arc<SMutex<()>>,
} }
impl Worker { impl Worker {
@ -54,8 +54,8 @@ impl Worker {
pub fn new<Message>(index: usize, pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>, stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, channel: IoChannel<Message>,
wait: Arc<Condvar>, wait: Arc<SCondvar>,
wait_mutex: Arc<Mutex<()>>, wait_mutex: Arc<SMutex<()>>,
panic_handler: Arc<PanicHandler> panic_handler: Arc<PanicHandler>
) -> Worker ) -> Worker
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
@ -77,17 +77,17 @@ impl Worker {
} }
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>, fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>, channel: IoChannel<Message>, wait: Arc<SCondvar>,
wait_mutex: Arc<Mutex<()>>, wait_mutex: Arc<SMutex<()>>,
deleting: Arc<AtomicBool>) deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
loop { loop {
{ {
let mut lock = wait_mutex.lock(); let lock = wait_mutex.lock().unwrap();
if deleting.load(AtomicOrdering::Acquire) { if deleting.load(AtomicOrdering::Acquire) {
return; return;
} }
wait.wait(&mut lock); let _ = wait.wait(lock);
} }
if deleting.load(AtomicOrdering::Acquire) { if deleting.load(AtomicOrdering::Acquire) {
@ -123,7 +123,7 @@ impl Worker {
impl Drop for Worker { impl Drop for Worker {
fn drop(&mut self) { fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing..."); trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock(); let _ = self.wait_mutex.lock().unwrap();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all(); self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap(); let thread = mem::replace(&mut self.thread, None).unwrap();