ethcore-io retries failed work steal (#9651)

* ethcore-io uses newer version of crossbeam && retries failed work steal

* ethcore-io non-mio service uses newer crossbeam
This commit is contained in:
Marek Kotewicz 2018-09-29 21:25:16 +01:00 committed by Afri Schoedon
parent 2d44b3ebea
commit ebaa43fa4c
7 changed files with 30 additions and 29 deletions

4
Cargo.lock generated
View File

@ -600,7 +600,7 @@ version = "1.12.0"
name = "ethcore-io"
version = "1.12.0"
dependencies = [
"crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2296,7 +2296,7 @@ source = "git+https://github.com/nikvolf/parity-tokio-ipc#c0f80b40399d7f08ef1e68
dependencies = [
"bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-named-pipes 0.1.6 (git+https://github.com/alexcrichton/mio-named-pipes)",
"miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -693,7 +693,6 @@ impl Engine<EthereumMachine> for Tendermint {
}
fn stop(&self) {
self.step_service.stop()
}
fn is_proposal(&self, header: &Header) -> bool {

View File

@ -9,7 +9,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
fnv = "1.0"
mio = { version = "0.6.8", optional = true }
crossbeam = "0.3"
crossbeam-deque = "0.6"
parking_lot = "0.6"
log = "0.4"
slab = "0.4"

View File

@ -74,7 +74,7 @@ extern crate mio;
#[macro_use]
extern crate log as rlog;
extern crate slab;
extern crate crossbeam;
extern crate crossbeam_deque as deque;
extern crate parking_lot;
extern crate num_cpus;
extern crate timer;

View File

@ -20,7 +20,7 @@ use std::collections::HashMap;
use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use crossbeam::sync::chase_lev;
use deque;
use slab::Slab;
use {IoError, IoHandler};
use worker::{Worker, Work, WorkType};
@ -184,7 +184,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
worker_channel: deque::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
}
@ -194,7 +194,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>
) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque();
let (worker, stealer) = deque::fifo();
let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new());
@ -430,7 +430,7 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
/// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Mutex<Option<JoinHandle<()>>>,
thread: Option<JoinHandle<()>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
}
@ -448,19 +448,19 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
IoManager::<Message>::start(&mut event_loop, h).expect("Error starting IO service");
});
Ok(IoService {
thread: Mutex::new(Some(thread)),
thread: Some(thread),
host_channel: Mutex::new(channel),
handlers: handlers,
})
}
pub fn stop(&self) {
pub fn stop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
self.handlers.write().clear();
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
if let Some(thread) = self.thread.lock().take() {
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});

View File

@ -16,7 +16,7 @@
use std::sync::{Arc, Weak};
use std::thread;
use crossbeam::sync::chase_lev;
use deque;
use slab::Slab;
use fnv::FnvHashMap;
use {IoError, IoHandler};
@ -198,7 +198,7 @@ struct Shared<Message> where Message: Send + Sync + 'static {
// necessary.
timers: Mutex<FnvHashMap<TimerToken, TimerGuard>>,
// Channel used to send work to the worker threads.
channel: Mutex<Option<chase_lev::Worker<WorkTask<Message>>>>,
channel: Mutex<Option<deque::Worker<WorkTask<Message>>>>,
}
// Messages used to communicate with the event loop from other threads.
@ -224,7 +224,7 @@ impl<Message> Clone for WorkTask<Message> where Message: Send + Sized {
impl<Message> IoService<Message> where Message: Send + Sync + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, IoError> {
let (tx, rx) = chase_lev::deque();
let (tx, rx) = deque::fifo();
let shared = Arc::new(Shared {
handlers: RwLock::new(Slab::with_capacity(MAX_HANDLERS)),
@ -251,7 +251,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
}
/// Stops the IO service.
pub fn stop(&self) {
pub fn stop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
@ -307,15 +307,15 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync {
}
}
fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: chase_lev::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: deque::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
{
loop {
match rx.steal() {
chase_lev::Steal::Abort => continue,
chase_lev::Steal::Empty => thread::park(),
chase_lev::Steal::Data(WorkTask::Shutdown) => break,
chase_lev::Steal::Data(WorkTask::UserMessage(message)) => {
deque::Steal::Retry => continue,
deque::Steal::Empty => thread::park(),
deque::Steal::Data(WorkTask::Shutdown) => break,
deque::Steal::Data(WorkTask::UserMessage(message)) => {
for id in 0 .. MAX_HANDLERS {
if let Some(handler) = shared.handlers.read().get(id) {
let ctxt = IoContext { handler: id, shared: shared.clone() };
@ -323,7 +323,7 @@ fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: chase_lev::Stealer<WorkTa
}
}
},
chase_lev::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
deque::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
if let Some(handler) = shared.handlers.read().get(handler_id) {
let ctxt = IoContext { handler: handler_id, shared: shared.clone() };
handler.timeout(&ctxt, token);

View File

@ -17,7 +17,7 @@
use std::sync::Arc;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
use deque;
use service_mio::{HandlerId, IoChannel, IoContext};
use IoHandler;
use LOCAL_STACK_SIZE;
@ -53,7 +53,7 @@ pub struct Worker {
impl Worker {
/// Creates a new worker instance.
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
@ -75,8 +75,9 @@ impl Worker {
worker
}
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>,
fn work_loop<Message>(stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + 'static {
@ -91,8 +92,9 @@ impl Worker {
while !deleting.load(AtomicOrdering::Acquire) {
match stealer.steal() {
chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()),
_ => break,
deque::Steal::Data(work) => Worker::do_work(work, channel.clone()),
deque::Steal::Retry => {},
deque::Steal::Empty => break,
}
}
}