Add tokio runtime to ethcore io worker (#9979)

* Add tokio runtime to ethcore io worker

* Reworked with block_on_all
This commit is contained in:
Anton Gavrilov 2018-12-03 15:35:46 +01:00 committed by Wei Tang
parent 7af953fd62
commit 869fa399b1
4 changed files with 30 additions and 26 deletions

2
Cargo.lock generated
View File

@ -715,6 +715,7 @@ version = "1.12.0"
dependencies = [ dependencies = [
"crossbeam-deque 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-deque 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -722,6 +723,7 @@ dependencies = [
"slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]

View File

@ -16,3 +16,5 @@ slab = "0.4"
num_cpus = "1.8" num_cpus = "1.8"
timer = "0.2" timer = "0.2"
time = "0.1" time = "0.1"
tokio = "0.1"
futures = "0.1"

View File

@ -80,6 +80,8 @@ extern crate num_cpus;
extern crate timer; extern crate timer;
extern crate fnv; extern crate fnv;
extern crate time; extern crate time;
extern crate tokio;
extern crate futures;
#[cfg(feature = "mio")] #[cfg(feature = "mio")]
mod service_mio; mod service_mio;

View File

@ -14,11 +14,13 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use futures::future::{self, Loop};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{JoinHandle, self}; use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use deque; use deque;
use service_mio::{HandlerId, IoChannel, IoContext}; use service_mio::{HandlerId, IoChannel, IoContext};
use tokio::{self};
use IoHandler; use IoHandler;
use LOCAL_STACK_SIZE; use LOCAL_STACK_SIZE;
@ -69,37 +71,33 @@ impl Worker {
worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn( worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn(
move || { move || {
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE)); LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) let ini = (stealer, channel.clone(), wait, wait_mutex.clone(), deleting);
let future = future::loop_fn(ini, |(stealer, channel, wait, wait_mutex, deleting)| {
{
let mut lock = wait_mutex.lock();
if deleting.load(AtomicOrdering::Acquire) {
return Ok(Loop::Break(()));
}
wait.wait(&mut lock);
}
while !deleting.load(AtomicOrdering::Acquire) {
match stealer.steal() {
deque::Steal::Data(work) => Worker::do_work(work, channel.clone()),
deque::Steal::Retry => {},
deque::Steal::Empty => break,
}
}
Ok(Loop::Continue((stealer, channel, wait, wait_mutex, deleting)))
});
if let Err(()) = tokio::runtime::current_thread::block_on_all(future) {
error!(target: "ioworker", "error while executing future")
}
}) })
.expect("Error creating worker thread")); .expect("Error creating worker thread"));
worker worker
} }
fn work_loop<Message>(stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + 'static {
loop {
{
let mut lock = wait_mutex.lock();
if deleting.load(AtomicOrdering::Acquire) {
return;
}
wait.wait(&mut lock);
}
while !deleting.load(AtomicOrdering::Acquire) {
match stealer.steal() {
deque::Steal::Data(work) => Worker::do_work(work, channel.clone()),
deque::Steal::Retry => {},
deque::Steal::Empty => break,
}
}
}
}
fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + 'static { fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + 'static {
match work.work_type { match work.work_type {
WorkType::Readable => { WorkType::Readable => {