parent
4cb4344542
commit
c65ee93542
@ -18,6 +18,7 @@
|
|||||||
//! Sorts them ready for blockchain insertion.
|
//! Sorts them ready for blockchain insertion.
|
||||||
use std::thread::{JoinHandle, self};
|
use std::thread::{JoinHandle, self};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||||
|
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||||
use util::*;
|
use util::*;
|
||||||
use verification::*;
|
use verification::*;
|
||||||
use error::*;
|
use error::*;
|
||||||
@ -80,12 +81,12 @@ impl BlockQueueInfo {
|
|||||||
pub struct BlockQueue {
|
pub struct BlockQueue {
|
||||||
panic_handler: Arc<PanicHandler>,
|
panic_handler: Arc<PanicHandler>,
|
||||||
engine: Arc<Box<Engine>>,
|
engine: Arc<Box<Engine>>,
|
||||||
more_to_verify: Arc<Condvar>,
|
more_to_verify: Arc<SCondvar>,
|
||||||
verification: Arc<Verification>,
|
verification: Arc<Verification>,
|
||||||
verifiers: Vec<JoinHandle<()>>,
|
verifiers: Vec<JoinHandle<()>>,
|
||||||
deleting: Arc<AtomicBool>,
|
deleting: Arc<AtomicBool>,
|
||||||
ready_signal: Arc<QueueSignal>,
|
ready_signal: Arc<QueueSignal>,
|
||||||
empty: Arc<Condvar>,
|
empty: Arc<SCondvar>,
|
||||||
processing: RwLock<HashSet<H256>>,
|
processing: RwLock<HashSet<H256>>,
|
||||||
max_queue_size: usize,
|
max_queue_size: usize,
|
||||||
max_mem_use: usize,
|
max_mem_use: usize,
|
||||||
@ -133,6 +134,8 @@ struct Verification {
|
|||||||
verified: Mutex<VecDeque<PreverifiedBlock>>,
|
verified: Mutex<VecDeque<PreverifiedBlock>>,
|
||||||
verifying: Mutex<VecDeque<VerifyingBlock>>,
|
verifying: Mutex<VecDeque<VerifyingBlock>>,
|
||||||
bad: Mutex<HashSet<H256>>,
|
bad: Mutex<HashSet<H256>>,
|
||||||
|
more_to_verify: SMutex<()>,
|
||||||
|
empty: SMutex<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockQueue {
|
impl BlockQueue {
|
||||||
@ -143,15 +146,18 @@ impl BlockQueue {
|
|||||||
verified: Mutex::new(VecDeque::new()),
|
verified: Mutex::new(VecDeque::new()),
|
||||||
verifying: Mutex::new(VecDeque::new()),
|
verifying: Mutex::new(VecDeque::new()),
|
||||||
bad: Mutex::new(HashSet::new()),
|
bad: Mutex::new(HashSet::new()),
|
||||||
|
more_to_verify: SMutex::new(()),
|
||||||
|
empty: SMutex::new(()),
|
||||||
|
|
||||||
});
|
});
|
||||||
let more_to_verify = Arc::new(Condvar::new());
|
let more_to_verify = Arc::new(SCondvar::new());
|
||||||
let deleting = Arc::new(AtomicBool::new(false));
|
let deleting = Arc::new(AtomicBool::new(false));
|
||||||
let ready_signal = Arc::new(QueueSignal {
|
let ready_signal = Arc::new(QueueSignal {
|
||||||
deleting: deleting.clone(),
|
deleting: deleting.clone(),
|
||||||
signalled: AtomicBool::new(false),
|
signalled: AtomicBool::new(false),
|
||||||
message_channel: message_channel
|
message_channel: message_channel
|
||||||
});
|
});
|
||||||
let empty = Arc::new(Condvar::new());
|
let empty = Arc::new(SCondvar::new());
|
||||||
let panic_handler = PanicHandler::new_in_arc();
|
let panic_handler = PanicHandler::new_in_arc();
|
||||||
|
|
||||||
let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
|
let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
|
||||||
@ -190,17 +196,17 @@ impl BlockQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
|
fn verify(verification: Arc<Verification>, engine: Arc<Box<Engine>>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>) {
|
||||||
while !deleting.load(AtomicOrdering::Acquire) {
|
while !deleting.load(AtomicOrdering::Acquire) {
|
||||||
{
|
{
|
||||||
let mut unverified = verification.unverified.lock();
|
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
|
||||||
|
|
||||||
if unverified.is_empty() && verification.verifying.lock().is_empty() {
|
if verification.unverified.lock().is_empty() && verification.verifying.lock().is_empty() {
|
||||||
empty.notify_all();
|
empty.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
while unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
|
while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) {
|
||||||
wait.wait(&mut unverified);
|
more_to_verify = wait.wait(more_to_verify).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
if deleting.load(AtomicOrdering::Acquire) {
|
if deleting.load(AtomicOrdering::Acquire) {
|
||||||
@ -276,9 +282,9 @@ impl BlockQueue {
|
|||||||
|
|
||||||
/// Wait for unverified queue to be empty
|
/// Wait for unverified queue to be empty
|
||||||
pub fn flush(&self) {
|
pub fn flush(&self) {
|
||||||
let mut unverified = self.verification.unverified.lock();
|
let mut lock = self.verification.empty.lock().unwrap();
|
||||||
while !unverified.is_empty() || !self.verification.verifying.lock().is_empty() {
|
while !self.verification.unverified.lock().is_empty() || !self.verification.verifying.lock().is_empty() {
|
||||||
self.empty.wait(&mut unverified);
|
lock = self.empty.wait(lock).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,8 @@ use io::{IoError, IoHandler};
|
|||||||
use io::worker::{Worker, Work, WorkType};
|
use io::worker::{Worker, Work, WorkType};
|
||||||
use panics::*;
|
use panics::*;
|
||||||
|
|
||||||
use parking_lot::{Condvar, RwLock, Mutex};
|
use parking_lot::{RwLock};
|
||||||
|
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||||
|
|
||||||
/// Timer ID
|
/// Timer ID
|
||||||
pub type TimerToken = usize;
|
pub type TimerToken = usize;
|
||||||
@ -169,7 +170,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
|
|||||||
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
|
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
|
||||||
workers: Vec<Worker>,
|
workers: Vec<Worker>,
|
||||||
worker_channel: chase_lev::Worker<Work<Message>>,
|
worker_channel: chase_lev::Worker<Work<Message>>,
|
||||||
work_ready: Arc<Condvar>,
|
work_ready: Arc<SCondvar>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
||||||
@ -177,8 +178,8 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
|||||||
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
|
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
|
||||||
let (worker, stealer) = chase_lev::deque();
|
let (worker, stealer) = chase_lev::deque();
|
||||||
let num_workers = 4;
|
let num_workers = 4;
|
||||||
let work_ready_mutex = Arc::new(Mutex::new(()));
|
let work_ready_mutex = Arc::new(SMutex::new(()));
|
||||||
let work_ready = Arc::new(Condvar::new());
|
let work_ready = Arc::new(SCondvar::new());
|
||||||
let workers = (0..num_workers).map(|i|
|
let workers = (0..num_workers).map(|i|
|
||||||
Worker::new(
|
Worker::new(
|
||||||
i,
|
i,
|
||||||
|
@ -23,7 +23,7 @@ use io::service::{HandlerId, IoChannel, IoContext};
|
|||||||
use io::{IoHandler};
|
use io::{IoHandler};
|
||||||
use panics::*;
|
use panics::*;
|
||||||
|
|
||||||
use parking_lot::{Condvar, Mutex};
|
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||||
|
|
||||||
pub enum WorkType<Message> {
|
pub enum WorkType<Message> {
|
||||||
Readable,
|
Readable,
|
||||||
@ -44,9 +44,9 @@ pub struct Work<Message> {
|
|||||||
/// Sorts them ready for blockchain insertion.
|
/// Sorts them ready for blockchain insertion.
|
||||||
pub struct Worker {
|
pub struct Worker {
|
||||||
thread: Option<JoinHandle<()>>,
|
thread: Option<JoinHandle<()>>,
|
||||||
wait: Arc<Condvar>,
|
wait: Arc<SCondvar>,
|
||||||
deleting: Arc<AtomicBool>,
|
deleting: Arc<AtomicBool>,
|
||||||
wait_mutex: Arc<Mutex<()>>,
|
wait_mutex: Arc<SMutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Worker {
|
impl Worker {
|
||||||
@ -54,8 +54,8 @@ impl Worker {
|
|||||||
pub fn new<Message>(index: usize,
|
pub fn new<Message>(index: usize,
|
||||||
stealer: chase_lev::Stealer<Work<Message>>,
|
stealer: chase_lev::Stealer<Work<Message>>,
|
||||||
channel: IoChannel<Message>,
|
channel: IoChannel<Message>,
|
||||||
wait: Arc<Condvar>,
|
wait: Arc<SCondvar>,
|
||||||
wait_mutex: Arc<Mutex<()>>,
|
wait_mutex: Arc<SMutex<()>>,
|
||||||
panic_handler: Arc<PanicHandler>
|
panic_handler: Arc<PanicHandler>
|
||||||
) -> Worker
|
) -> Worker
|
||||||
where Message: Send + Sync + Clone + 'static {
|
where Message: Send + Sync + Clone + 'static {
|
||||||
@ -77,17 +77,17 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
|
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
|
||||||
channel: IoChannel<Message>, wait: Arc<Condvar>,
|
channel: IoChannel<Message>, wait: Arc<SCondvar>,
|
||||||
wait_mutex: Arc<Mutex<()>>,
|
wait_mutex: Arc<SMutex<()>>,
|
||||||
deleting: Arc<AtomicBool>)
|
deleting: Arc<AtomicBool>)
|
||||||
where Message: Send + Sync + Clone + 'static {
|
where Message: Send + Sync + Clone + 'static {
|
||||||
loop {
|
loop {
|
||||||
{
|
{
|
||||||
let mut lock = wait_mutex.lock();
|
let lock = wait_mutex.lock().unwrap();
|
||||||
if deleting.load(AtomicOrdering::Acquire) {
|
if deleting.load(AtomicOrdering::Acquire) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
wait.wait(&mut lock);
|
let _ = wait.wait(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if deleting.load(AtomicOrdering::Acquire) {
|
if deleting.load(AtomicOrdering::Acquire) {
|
||||||
@ -123,7 +123,7 @@ impl Worker {
|
|||||||
impl Drop for Worker {
|
impl Drop for Worker {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
trace!(target: "shutdown", "[IoWorker] Closing...");
|
trace!(target: "shutdown", "[IoWorker] Closing...");
|
||||||
let _ = self.wait_mutex.lock();
|
let _ = self.wait_mutex.lock().unwrap();
|
||||||
self.deleting.store(true, AtomicOrdering::Release);
|
self.deleting.store(true, AtomicOrdering::Release);
|
||||||
self.wait.notify_all();
|
self.wait.notify_all();
|
||||||
let thread = mem::replace(&mut self.thread, None).unwrap();
|
let thread = mem::replace(&mut self.thread, None).unwrap();
|
||||||
|
Loading…
Reference in New Issue
Block a user