diff --git a/Cargo.lock b/Cargo.lock index b95de24a3..7559b7167 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -715,6 +715,7 @@ version = "1.12.0" dependencies = [ "crossbeam-deque 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -722,6 +723,7 @@ dependencies = [ "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/util/io/Cargo.toml b/util/io/Cargo.toml index e53b302b7..2f6b8e28b 100644 --- a/util/io/Cargo.toml +++ b/util/io/Cargo.toml @@ -16,3 +16,5 @@ slab = "0.4" num_cpus = "1.8" timer = "0.2" time = "0.1" +tokio = "0.1" +futures = "0.1" diff --git a/util/io/src/lib.rs b/util/io/src/lib.rs index 3aa51fbdf..0673e8da3 100644 --- a/util/io/src/lib.rs +++ b/util/io/src/lib.rs @@ -80,6 +80,8 @@ extern crate num_cpus; extern crate timer; extern crate fnv; extern crate time; +extern crate tokio; +extern crate futures; #[cfg(feature = "mio")] mod service_mio; diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index 1d3359416..74418159c 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -14,11 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use futures::future::{self, Loop}; use std::sync::Arc; use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use deque; use service_mio::{HandlerId, IoChannel, IoContext}; +use tokio::{self}; use IoHandler; use LOCAL_STACK_SIZE; @@ -69,37 +71,33 @@ impl Worker { worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn( move || { LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE)); - Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) + let ini = (stealer, channel.clone(), wait, wait_mutex.clone(), deleting); + let future = future::loop_fn(ini, |(stealer, channel, wait, wait_mutex, deleting)| { + { + let mut lock = wait_mutex.lock(); + if deleting.load(AtomicOrdering::Acquire) { + return Ok(Loop::Break(())); + } + wait.wait(&mut lock); + } + + while !deleting.load(AtomicOrdering::Acquire) { + match stealer.steal() { + deque::Steal::Data(work) => Worker::do_work(work, channel.clone()), + deque::Steal::Retry => {}, + deque::Steal::Empty => break, + } + } + Ok(Loop::Continue((stealer, channel, wait, wait_mutex, deleting))) + }); + if let Err(()) = tokio::runtime::current_thread::block_on_all(future) { + error!(target: "ioworker", "error while executing future") + } }) .expect("Error creating worker thread")); worker } - fn work_loop(stealer: deque::Stealer>, - channel: IoChannel, - wait: Arc, - wait_mutex: Arc>, - deleting: Arc) - where Message: Send + Sync + 'static { - loop { - { - let mut lock = wait_mutex.lock(); - if deleting.load(AtomicOrdering::Acquire) { - return; - } - wait.wait(&mut lock); - } - - while !deleting.load(AtomicOrdering::Acquire) { - match stealer.steal() { - deque::Steal::Data(work) => Worker::do_work(work, channel.clone()), - deque::Steal::Retry => {}, - deque::Steal::Empty => break, - } - } - } - } - fn do_work(work: Work, channel: IoChannel) where Message: Send + Sync + 'static { match work.work_type { WorkType::Readable => {