From ebaa43fa4cee9f45d701c9a06f693061e4e5557f Mon Sep 17 00:00:00 2001 From: Marek Kotewicz Date: Sat, 29 Sep 2018 21:25:16 +0100 Subject: [PATCH] ethcore-io retries failed work steal (#9651) * ethcore-io uses newer version of crossbeam && retries failed work steal * ethcore-io non-mio service uses newer crossbeam --- Cargo.lock | 4 ++-- ethcore/src/engines/tendermint/mod.rs | 1 - util/io/Cargo.toml | 2 +- util/io/src/lib.rs | 2 +- util/io/src/service_mio.rs | 14 +++++++------- util/io/src/service_non_mio.rs | 22 +++++++++++----------- util/io/src/worker.rs | 14 ++++++++------ 7 files changed, 30 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69773af63..c0c020162 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -600,7 +600,7 @@ version = "1.12.0" name = "ethcore-io" version = "1.12.0" dependencies = [ - "crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-deque 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2296,7 +2296,7 @@ source = "git+https://github.com/nikvolf/parity-tokio-ipc#c0f80b40399d7f08ef1e68 dependencies = [ "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio-named-pipes 0.1.6 (git+https://github.com/alexcrichton/mio-named-pipes)", "miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/src/engines/tendermint/mod.rs b/ethcore/src/engines/tendermint/mod.rs index 2d0015d06..70d78147e 100644 --- a/ethcore/src/engines/tendermint/mod.rs +++ b/ethcore/src/engines/tendermint/mod.rs @@ -693,7 +693,6 @@ impl Engine for Tendermint { } fn stop(&self) { - self.step_service.stop() } fn is_proposal(&self, header: &Header) -> bool { diff --git a/util/io/Cargo.toml b/util/io/Cargo.toml index ad50881dd..e53b302b7 100644 --- a/util/io/Cargo.toml +++ b/util/io/Cargo.toml @@ -9,7 +9,7 @@ authors = ["Parity Technologies "] [dependencies] fnv = "1.0" mio = { version = "0.6.8", optional = true } -crossbeam = "0.3" +crossbeam-deque = "0.6" parking_lot = "0.6" log = "0.4" slab = "0.4" diff --git a/util/io/src/lib.rs b/util/io/src/lib.rs index 02dbf223b..3aa51fbdf 100644 --- a/util/io/src/lib.rs +++ b/util/io/src/lib.rs @@ -74,7 +74,7 @@ extern crate mio; #[macro_use] extern crate log as rlog; extern crate slab; -extern crate crossbeam; +extern crate crossbeam_deque as deque; extern crate parking_lot; extern crate num_cpus; extern crate timer; diff --git a/util/io/src/service_mio.rs b/util/io/src/service_mio.rs index 729fcec17..07d0f8106 100644 --- a/util/io/src/service_mio.rs +++ b/util/io/src/service_mio.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use mio::*; use mio::timer::{Timeout}; use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder}; -use crossbeam::sync::chase_lev; +use deque; use slab::Slab; use {IoError, IoHandler}; use worker::{Worker, Work, WorkType}; @@ -184,7 +184,7 @@ pub struct IoManager where Message: Send + Sync { timers: Arc>>, handlers: Arc>>>>, workers: Vec, - worker_channel: chase_lev::Worker>, + worker_channel: deque::Worker>, work_ready: Arc, } @@ -194,7 +194,7 @@ impl IoManager where Message: Send + Sync + 'static { event_loop: &mut EventLoop>, handlers: Arc>>>> ) -> Result<(), IoError> { - let (worker, stealer) = chase_lev::deque(); + let (worker, stealer) = deque::fifo(); let num_workers = 4; let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready = Arc::new(Condvar::new()); @@ -430,7 +430,7 @@ impl IoChannel where Message: Send + Sync + 'static { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + Sync + 'static { - thread: Mutex>>, + thread: Option>, host_channel: Mutex>>, handlers: Arc>>>>, } @@ -448,19 +448,19 @@ impl IoService where Message: Send + Sync + 'static { IoManager::::start(&mut event_loop, h).expect("Error starting IO service"); }); Ok(IoService { - thread: Mutex::new(Some(thread)), + thread: Some(thread), host_channel: Mutex::new(channel), handlers: handlers, }) } - pub fn stop(&self) { + pub fn stop(&mut self) { trace!(target: "shutdown", "[IoService] Closing..."); // Clear handlers so that shared pointers are not stuck on stack // in Channel::send_sync self.handlers.write().clear(); self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e)); - if let Some(thread) = self.thread.lock().take() { + if let Some(thread) = self.thread.take() { thread.join().unwrap_or_else(|e| { debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e); }); diff --git a/util/io/src/service_non_mio.rs b/util/io/src/service_non_mio.rs index 315f84c4d..30839f9e9 100644 --- a/util/io/src/service_non_mio.rs +++ b/util/io/src/service_non_mio.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, Weak}; use std::thread; -use crossbeam::sync::chase_lev; +use deque; use slab::Slab; use fnv::FnvHashMap; use {IoError, IoHandler}; @@ -198,7 +198,7 @@ struct Shared where Message: Send + Sync + 'static { // necessary. timers: Mutex>, // Channel used to send work to the worker threads. - channel: Mutex>>>, + channel: Mutex>>>, } // Messages used to communicate with the event loop from other threads. @@ -224,7 +224,7 @@ impl Clone for WorkTask where Message: Send + Sized { impl IoService where Message: Send + Sync + 'static { /// Starts IO event loop pub fn start() -> Result, IoError> { - let (tx, rx) = chase_lev::deque(); + let (tx, rx) = deque::fifo(); let shared = Arc::new(Shared { handlers: RwLock::new(Slab::with_capacity(MAX_HANDLERS)), @@ -251,7 +251,7 @@ impl IoService where Message: Send + Sync + 'static { } /// Stops the IO service. - pub fn stop(&self) { + pub fn stop(&mut self) { trace!(target: "shutdown", "[IoService] Closing..."); // Clear handlers so that shared pointers are not stuck on stack // in Channel::send_sync @@ -307,15 +307,15 @@ impl Drop for IoService where Message: Send + Sync { } } -fn do_work(shared: &Arc>, rx: chase_lev::Stealer>) - where Message: Send + Sync + 'static +fn do_work(shared: &Arc>, rx: deque::Stealer>) + where Message: Send + Sync + 'static { loop { match rx.steal() { - chase_lev::Steal::Abort => continue, - chase_lev::Steal::Empty => thread::park(), - chase_lev::Steal::Data(WorkTask::Shutdown) => break, - chase_lev::Steal::Data(WorkTask::UserMessage(message)) => { + deque::Steal::Retry => continue, + deque::Steal::Empty => thread::park(), + deque::Steal::Data(WorkTask::Shutdown) => break, + deque::Steal::Data(WorkTask::UserMessage(message)) => { for id in 0 .. MAX_HANDLERS { if let Some(handler) = shared.handlers.read().get(id) { let ctxt = IoContext { handler: id, shared: shared.clone() }; @@ -323,7 +323,7 @@ fn do_work(shared: &Arc>, rx: chase_lev::Stealer { + deque::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => { if let Some(handler) = shared.handlers.read().get(handler_id) { let ctxt = IoContext { handler: handler_id, shared: shared.clone() }; handler.timeout(&ctxt, token); diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index 252060848..1d3359416 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; -use crossbeam::sync::chase_lev; +use deque; use service_mio::{HandlerId, IoChannel, IoContext}; use IoHandler; use LOCAL_STACK_SIZE; @@ -53,7 +53,7 @@ pub struct Worker { impl Worker { /// Creates a new worker instance. pub fn new(index: usize, - stealer: chase_lev::Stealer>, + stealer: deque::Stealer>, channel: IoChannel, wait: Arc, wait_mutex: Arc>, @@ -75,8 +75,9 @@ impl Worker { worker } - fn work_loop(stealer: chase_lev::Stealer>, - channel: IoChannel, wait: Arc, + fn work_loop(stealer: deque::Stealer>, + channel: IoChannel, + wait: Arc, wait_mutex: Arc>, deleting: Arc) where Message: Send + Sync + 'static { @@ -91,8 +92,9 @@ impl Worker { while !deleting.load(AtomicOrdering::Acquire) { match stealer.steal() { - chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()), - _ => break, + deque::Steal::Data(work) => Worker::do_work(work, channel.clone()), + deque::Steal::Retry => {}, + deque::Steal::Empty => break, } } }