Revert "Use std::sync::Condvar (#1732)" (#9392)

* Revert "Use std::sync::Condvar (#1732)"

This reverts commit c65ee93542.

* verification_queue: remove redundant mutexes
This commit is contained in:
Andronik Ordian 2018-08-22 17:01:07 +03:00 committed by Marek Kotewicz
parent e12a26dac5
commit 491ce61a76
3 changed files with 29 additions and 34 deletions

View File

@ -19,7 +19,7 @@
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex, Arc}; use std::sync::Arc;
use std::cmp; use std::cmp;
use std::collections::{VecDeque, HashSet, HashMap}; use std::collections::{VecDeque, HashSet, HashMap};
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
@ -141,11 +141,11 @@ struct Sizes {
/// Keeps them in the same order as inserted, minus invalid items. /// Keeps them in the same order as inserted, minus invalid items.
pub struct VerificationQueue<K: Kind> { pub struct VerificationQueue<K: Kind> {
engine: Arc<EthEngine>, engine: Arc<EthEngine>,
more_to_verify: Arc<SCondvar>, more_to_verify: Arc<Condvar>,
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<Condvar>,
processing: RwLock<HashMap<H256, U256>>, // hash to difficulty processing: RwLock<HashMap<H256, U256>>, // hash to difficulty
ticks_since_adjustment: AtomicUsize, ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
@ -202,8 +202,6 @@ struct Verification<K: Kind> {
verifying: Mutex<VecDeque<Verifying<K>>>, verifying: Mutex<VecDeque<Verifying<K>>>,
verified: Mutex<VecDeque<K::Verified>>, verified: Mutex<VecDeque<K::Verified>>,
bad: Mutex<HashSet<H256>>, bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
sizes: Sizes, sizes: Sizes,
check_seal: bool, check_seal: bool,
} }
@ -216,8 +214,6 @@ impl<K: Kind> VerificationQueue<K> {
verifying: Mutex::new(VecDeque::new()), verifying: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()), verified: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()), bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),
sizes: Sizes { sizes: Sizes {
unverified: AtomicUsize::new(0), unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0), verifying: AtomicUsize::new(0),
@ -225,14 +221,14 @@ impl<K: Kind> VerificationQueue<K> {
}, },
check_seal: check_seal, check_seal: check_seal,
}); });
let more_to_verify = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(Condvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let ready_signal = Arc::new(QueueSignal { let ready_signal = Arc::new(QueueSignal {
deleting: deleting.clone(), deleting: deleting.clone(),
signalled: AtomicBool::new(false), signalled: AtomicBool::new(false),
message_channel: Mutex::new(message_channel), message_channel: Mutex::new(message_channel),
}); });
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(Condvar::new());
let scale_verifiers = config.verifier_settings.scale_verifiers; let scale_verifiers = config.verifier_settings.scale_verifiers;
let num_cpus = ::num_cpus::get(); let num_cpus = ::num_cpus::get();
@ -292,9 +288,9 @@ impl<K: Kind> VerificationQueue<K> {
fn verify( fn verify(
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
engine: Arc<EthEngine>, engine: Arc<EthEngine>,
wait: Arc<SCondvar>, wait: Arc<Condvar>,
ready: Arc<QueueSignal>, ready: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<Condvar>,
state: Arc<(Mutex<State>, Condvar)>, state: Arc<(Mutex<State>, Condvar)>,
id: usize, id: usize,
) { ) {
@ -319,19 +315,19 @@ impl<K: Kind> VerificationQueue<K> {
// wait for work if empty. // wait for work if empty.
{ {
let mut more_to_verify = verification.more_to_verify.lock().unwrap(); let mut unverified = verification.unverified.lock();
if verification.unverified.lock().is_empty() && verification.verifying.lock().is_empty() { if unverified.is_empty() && verification.verifying.lock().is_empty() {
empty.notify_all(); empty.notify_all();
} }
while verification.unverified.lock().is_empty() { while unverified.is_empty() {
if let State::Exit = *state.0.lock() { if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id); debug!(target: "verification", "verifier {} exiting", id);
return; return;
} }
more_to_verify = wait.wait(more_to_verify).unwrap(); wait.wait(&mut unverified);
} }
if let State::Exit = *state.0.lock() { if let State::Exit = *state.0.lock() {
@ -450,9 +446,9 @@ impl<K: Kind> VerificationQueue<K> {
/// Wait for unverified queue to be empty /// Wait for unverified queue to be empty
pub fn flush(&self) { pub fn flush(&self) {
let mut lock = self.verification.empty.lock().unwrap(); let mut unverified = self.verification.unverified.lock();
while !self.verification.unverified.lock().is_empty() || !self.verification.verifying.lock().is_empty() { while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() {
lock = self.empty.wait(lock).unwrap(); self.empty.wait(&mut unverified);
} }
} }
@ -712,7 +708,7 @@ impl<K: Kind> Drop for VerificationQueue<K> {
// acquire this lock to force threads to reach the waiting point // acquire this lock to force threads to reach the waiting point
// if they're in-between the exit check and the more_to_verify wait. // if they're in-between the exit check and the more_to_verify wait.
{ {
let _more = self.verification.more_to_verify.lock().unwrap(); let _unverified = self.verification.unverified.lock();
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
} }

View File

@ -24,8 +24,7 @@ use crossbeam::sync::chase_lev;
use slab::Slab; use slab::Slab;
use {IoError, IoHandler}; use {IoError, IoHandler};
use worker::{Worker, Work, WorkType}; use worker::{Worker, Work, WorkType};
use parking_lot::{RwLock, Mutex}; use parking_lot::{Condvar, RwLock, Mutex};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use std::time::Duration; use std::time::Duration;
/// Timer ID /// Timer ID
@ -186,7 +185,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>, handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
workers: Vec<Worker>, workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>, worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<SCondvar>, work_ready: Arc<Condvar>,
} }
impl<Message> IoManager<Message> where Message: Send + Sync + 'static { impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
@ -197,8 +196,8 @@ impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
) -> Result<(), IoError> { ) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque(); let (worker, stealer) = chase_lev::deque();
let num_workers = 4; let num_workers = 4;
let work_ready_mutex = Arc::new(SMutex::new(())); let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(SCondvar::new()); let work_ready = Arc::new(Condvar::new());
let workers = (0..num_workers).map(|i| let workers = (0..num_workers).map(|i|
Worker::new( Worker::new(
i, i,

View File

@ -22,7 +22,7 @@ use service_mio::{HandlerId, IoChannel, IoContext};
use IoHandler; use IoHandler;
use LOCAL_STACK_SIZE; use LOCAL_STACK_SIZE;
use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use parking_lot::{Condvar, Mutex};
const STACK_SIZE: usize = 16*1024*1024; const STACK_SIZE: usize = 16*1024*1024;
@ -45,9 +45,9 @@ pub struct Work<Message> {
/// Sorts them ready for blockchain insertion. /// Sorts them ready for blockchain insertion.
pub struct Worker { pub struct Worker {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
wait: Arc<SCondvar>, wait: Arc<Condvar>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
wait_mutex: Arc<SMutex<()>>, wait_mutex: Arc<Mutex<()>>,
} }
impl Worker { impl Worker {
@ -55,8 +55,8 @@ impl Worker {
pub fn new<Message>(index: usize, pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>, stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, channel: IoChannel<Message>,
wait: Arc<SCondvar>, wait: Arc<Condvar>,
wait_mutex: Arc<SMutex<()>>, wait_mutex: Arc<Mutex<()>>,
) -> Worker ) -> Worker
where Message: Send + Sync + 'static { where Message: Send + Sync + 'static {
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
@ -76,17 +76,17 @@ impl Worker {
} }
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>, fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<SCondvar>, channel: IoChannel<Message>, wait: Arc<Condvar>,
wait_mutex: Arc<SMutex<()>>, wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>) deleting: Arc<AtomicBool>)
where Message: Send + Sync + 'static { where Message: Send + Sync + 'static {
loop { loop {
{ {
let lock = wait_mutex.lock().expect("Poisoned work_loop mutex"); let mut lock = wait_mutex.lock();
if deleting.load(AtomicOrdering::Acquire) { if deleting.load(AtomicOrdering::Acquire) {
return; return;
} }
let _ = wait.wait(lock); wait.wait(&mut lock);
} }
while !deleting.load(AtomicOrdering::Acquire) { while !deleting.load(AtomicOrdering::Acquire) {
@ -122,7 +122,7 @@ impl Worker {
impl Drop for Worker { impl Drop for Worker {
fn drop(&mut self) { fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing..."); trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex"); let _ = self.wait_mutex.lock();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all(); self.wait.notify_all();
if let Some(thread) = self.thread.take() { if let Some(thread) = self.thread.take() {