From 1b8f299df2e6a29738a090c47911cc218db49b8d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 10 May 2018 12:34:36 +0200 Subject: [PATCH] Make mio optional in ethcore-io (#8537) * Make mio optional in ethcore-io * Add some annotations, plus a check for features * Increase timer for test --- Cargo.lock | 46 ++- ethcore/sync/Cargo.toml | 1 + test.sh | 2 + util/io/Cargo.toml | 9 +- util/io/src/lib.rs | 149 +++++++-- util/io/src/{service.rs => service_mio.rs} | 16 +- util/io/src/service_non_mio.rs | 334 +++++++++++++++++++++ util/io/src/worker.rs | 11 +- util/network-devp2p/Cargo.toml | 2 +- 9 files changed, 510 insertions(+), 60 deletions(-) rename util/io/src/{service.rs => service_mio.rs} (97%) create mode 100644 util/io/src/service_non_mio.rs diff --git a/Cargo.lock b/Cargo.lock index edd76f9f4..b4d6bd315 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,6 +209,16 @@ dependencies = [ "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "chrono" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-integer 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cid" version = "0.2.3" @@ -542,7 +552,7 @@ dependencies = [ "memory-cache 0.1.0", "memorydb 0.1.1", "num 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-machine 0.1.0", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "patricia-trie 0.1.0", @@ -597,10 +607,14 @@ name = "ethcore-io" version = "1.12.0" dependencies = [ "crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", + "timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -991,7 +1005,7 @@ dependencies = [ "dir 0.1.0", "docopt 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethstore 0.2.0", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "panic_hook 0.1.0", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1117,7 +1131,7 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1483,7 +1497,7 @@ dependencies = [ "interleaved-ordered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.4.5 (git+https://github.com/paritytech/rust-rocksdb)", @@ -1888,7 +1902,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "num_cpus" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1981,7 +1995,7 @@ dependencies = [ "migration-rocksdb 0.1.0", "node-filter 1.12.0", "node-health 0.1.0", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "panic_hook 0.1.0", "parity-dapps 1.12.0", @@ -2680,7 +2694,7 @@ dependencies = [ "crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3183,7 +3197,7 @@ name = "threadpool" version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3197,6 +3211,14 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tiny-keccak" version = "1.4.1" @@ -3350,7 +3372,7 @@ dependencies = [ "crossbeam-deque 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3795,6 +3817,7 @@ dependencies = [ "checksum bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "1b7db437d718977f6dc9b2e3fd6fc343c02ac6b899b73fdd2179163447bd9ce9" "checksum cc 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8b9d2900f78631a5876dc5d6c9033ede027253efcd33dd36b1309fc6cab97ee0" "checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de" +"checksum chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1cce36c92cb605414e9b824f866f5babe0a0368e39ea07393b9b63cf3844c0e6" "checksum cid 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d85ee025368e69063c420cbb2ed9f852cb03a5e69b73be021e65726ce03585b6" "checksum clap 2.29.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8f4a2b3bb7ef3c672d7c13d15613211d5a6976b6892c598b0fcb5d40765f19c2" "checksum conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "78ff10625fd0ac447827aa30ea8b861fead473bb60aeb73af6c1c58caf0d1299" @@ -3903,7 +3926,7 @@ dependencies = [ "checksum num-iter 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "4b226df12c5a59b63569dd57fafb926d91b385dfce33d8074a412411b689d593" "checksum num-traits 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)" = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31" "checksum num-traits 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dee092fcdf725aee04dd7da1d21debff559237d49ef1cb3e69bcb8ece44c7364" -"checksum num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "514f0d73e64be53ff320680ca671b64fe3fb91da01e1ae2ddc99eb51d453b20d" +"checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum number_prefix 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "59a14be9c211cb9c602bad35ac99f41e9a84b44d71b8cbd3040e3bd02a214902" "checksum ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2c49021782e5233cd243168edfa8037574afed4eba4bbaf538b3d8d1789d8c" "checksum order-stat 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "efa535d5117d3661134dbf1719b6f0ffe06f2375843b13935db186cd094105eb" @@ -4001,6 +4024,7 @@ dependencies = [ "checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" "checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865" "checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520" +"checksum timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "31d42176308937165701f50638db1c31586f183f1aab416268216577aec7306b" "checksum tiny-keccak 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "58911ed5eb275a8fd2f1f0418ed360a42f59329864b64e1e95377a9024498c01" "checksum tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "be15ef40f675c9fe66e354d74c73f3ed012ca1aa14d65846a33ee48f1ae8d922" "checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index ba03075d0..cf163cc7b 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -34,6 +34,7 @@ trace-time = { path = "../../util/trace-time" } ipnetwork = "0.12.6" [dev-dependencies] +ethcore-io = { path = "../../util/io", features = ["mio"] } ethkey = { path = "../../ethkey" } kvdb-memorydb = { path = "../../util/kvdb-memorydb" } ethcore-private-tx = { path = "../private-tx" } diff --git a/test.sh b/test.sh index 6dcd258ea..9bb527b70 100755 --- a/test.sh +++ b/test.sh @@ -33,6 +33,8 @@ if [ "$VALIDATE" -eq "1" ]; then # Validate --no-default-features build echo "________Validate build________" cargo check --no-default-features +cargo check --manifest-path util/io/Cargo.toml --no-default-features +cargo check --manifest-path util/io/Cargo.toml --features "mio" # Validate chainspecs echo "________Validate chainspecs________" diff --git a/util/io/Cargo.toml b/util/io/Cargo.toml index 8538863d0..716886616 100644 --- a/util/io/Cargo.toml +++ b/util/io/Cargo.toml @@ -7,9 +7,12 @@ version = "1.12.0" authors = ["Parity Technologies "] [dependencies] -mio = "0.6.8" +fnv = "1.0" +mio = { version = "0.6.8", optional = true } crossbeam = "0.3" parking_lot = "0.5" log = "0.3" -slab = "0.2" - +slab = "0.4" +num_cpus = "1.8" +timer = "0.2" +time = "0.1" diff --git a/util/io/src/lib.rs b/util/io/src/lib.rs index 9232b2a90..ef8c3adc7 100644 --- a/util/io/src/lib.rs +++ b/util/io/src/lib.rs @@ -54,30 +54,59 @@ //! // Drop the service //! } //! ``` +//! +//! # Mio vs non-mio +//! +//! This library has two modes: mio and not mio. The `mio` feature can be activated or deactivated +//! when compiling or depending on the library. +//! +//! Without mio, only timers and message-passing are available. With mio, you can also use +//! low-level sockets provided by mio. +//! +//! The non-mio mode exists because the `mio` library doesn't compile on platforms such as +//! emscripten. //TODO: use Poll from mio #![allow(deprecated)] +#[cfg(feature = "mio")] extern crate mio; #[macro_use] extern crate log as rlog; extern crate slab; extern crate crossbeam; extern crate parking_lot; +extern crate num_cpus; +extern crate timer; +extern crate fnv; +extern crate time; -mod service; +#[cfg(feature = "mio")] +mod service_mio; +#[cfg(not(feature = "mio"))] +mod service_non_mio; +#[cfg(feature = "mio")] mod worker; +use std::cell::Cell; use std::{fmt, error}; +#[cfg(feature = "mio")] use mio::deprecated::{EventLoop, NotifyError}; +#[cfg(feature = "mio")] use mio::Token; -pub use worker::LOCAL_STACK_SIZE; +thread_local! { + /// Stack size + /// Should be modified if it is changed in Rust since it is no way + /// to know or get it + pub static LOCAL_STACK_SIZE: Cell = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024)); +} #[derive(Debug)] /// IO Error pub enum IoError { /// Low level error from mio crate + #[cfg(feature = "mio")] Mio(::std::io::Error), /// Error concerning the Rust standard library's IO subsystem. StdIo(::std::io::Error), @@ -88,6 +117,7 @@ impl fmt::Display for IoError { // just defer to the std implementation for now. // we can refine the formatting when more variants are added. match *self { + #[cfg(feature = "mio")] IoError::Mio(ref std_err) => std_err.fmt(f), IoError::StdIo(ref std_err) => std_err.fmt(f), } @@ -106,8 +136,9 @@ impl From<::std::io::Error> for IoError { } } -impl From>> for IoError where Message: Send { - fn from(_err: NotifyError>) -> IoError { +#[cfg(feature = "mio")] +impl From>> for IoError where Message: Send { + fn from(_err: NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } } @@ -123,58 +154,120 @@ pub trait IoHandler: Send + Sync where Message: Send + Sync + 'static { /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. fn message(&self, _io: &IoContext, _message: &Message) {} /// Called when an IO stream gets closed + #[cfg(feature = "mio")] fn stream_hup(&self, _io: &IoContext, _stream: StreamToken) {} /// Called when an IO stream can be read from + #[cfg(feature = "mio")] fn stream_readable(&self, _io: &IoContext, _stream: StreamToken) {} /// Called when an IO stream can be written to + #[cfg(feature = "mio")] fn stream_writable(&self, _io: &IoContext, _stream: StreamToken) {} /// Register a new stream with the event loop + #[cfg(feature = "mio")] fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop>) {} /// Re-register a stream with the event loop + #[cfg(feature = "mio")] fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop>) {} /// Deregister a stream. Called whenstream is removed from event loop + #[cfg(feature = "mio")] fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop>) {} } -pub use service::TimerToken; -pub use service::StreamToken; -pub use service::IoContext; -pub use service::IoService; -pub use service::IoChannel; -pub use service::IoManager; -pub use service::TOKENS_PER_HANDLER; +#[cfg(feature = "mio")] +pub use service_mio::{TimerToken, StreamToken, IoContext, IoService, IoChannel, IoManager, TOKENS_PER_HANDLER}; +#[cfg(not(feature = "mio"))] +pub use service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER}; #[cfg(test)] mod tests { - use std::sync::Arc; + use std::sync::atomic; + use std::thread; use std::time::Duration; use super::*; - struct MyHandler; + #[test] + fn send_message_to_handler() { + struct MyHandler(atomic::AtomicBool); - #[derive(Clone)] - struct MyMessage { - data: u32 - } - - impl IoHandler for MyHandler { - fn initialize(&self, io: &IoContext) { - io.register_timer(0, Duration::from_secs(1)).unwrap(); + #[derive(Clone)] + struct MyMessage { + data: u32 } - fn timeout(&self, _io: &IoContext, timer: TimerToken) { - println!("Timeout {}", timer); + impl IoHandler for MyHandler { + fn message(&self, _io: &IoContext, message: &MyMessage) { + assert_eq!(message.data, 5); + self.0.store(true, atomic::Ordering::SeqCst); + } } - fn message(&self, _io: &IoContext, message: &MyMessage) { - println!("Message {}", message.data); - } + let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false))); + + let service = IoService::::start().expect("Error creating network service"); + service.register_handler(handler.clone()).unwrap(); + + service.send_message(MyMessage { data: 5 }).unwrap(); + + thread::sleep(Duration::from_secs(5)); + assert!(handler.0.load(atomic::Ordering::SeqCst)); } #[test] - fn test_service_register_handler () { + fn timeout_working() { + struct MyHandler(atomic::AtomicBool); + + #[derive(Clone)] + struct MyMessage { + data: u32 + } + + impl IoHandler for MyHandler { + fn initialize(&self, io: &IoContext) { + io.register_timer_once(1234, Duration::from_millis(500)).unwrap(); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + assert_eq!(timer, 1234); + assert!(!self.0.swap(true, atomic::Ordering::SeqCst)); + } + } + + let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false))); + let service = IoService::::start().expect("Error creating network service"); - service.register_handler(Arc::new(MyHandler)).unwrap(); + service.register_handler(handler.clone()).unwrap(); + + thread::sleep(Duration::from_secs(2)); + assert!(handler.0.load(atomic::Ordering::SeqCst)); + } + + #[test] + fn multi_timeout_working() { + struct MyHandler(atomic::AtomicUsize); + + #[derive(Clone)] + struct MyMessage { + data: u32 + } + + impl IoHandler for MyHandler { + fn initialize(&self, io: &IoContext) { + io.register_timer(1234, Duration::from_millis(500)).unwrap(); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + assert_eq!(timer, 1234); + self.0.fetch_add(1, atomic::Ordering::SeqCst); + } + } + + let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0))); + + let service = IoService::::start().expect("Error creating network service"); + service.register_handler(handler.clone()).unwrap(); + + thread::sleep(Duration::from_secs(2)); + assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2); } } diff --git a/util/io/src/service.rs b/util/io/src/service_mio.rs similarity index 97% rename from util/io/src/service.rs rename to util/io/src/service_mio.rs index 0de674ae1..f7c9f1976 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service_mio.rs @@ -181,7 +181,7 @@ struct UserTimer { /// Root IO handler. Manages user handlers, messages and IO timers. pub struct IoManager where Message: Send + Sync { timers: Arc>>, - handlers: Arc>, HandlerId>>>, + handlers: Arc>>>>, workers: Vec, worker_channel: chase_lev::Worker>, work_ready: Arc, @@ -191,7 +191,7 @@ impl IoManager where Message: Send + Sync + 'static { /// Creates a new instance and registers it with the event loop. pub fn start( event_loop: &mut EventLoop>, - handlers: Arc>, HandlerId>>> + handlers: Arc>>>> ) -> Result<(), IoError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; @@ -267,7 +267,8 @@ impl Handler for IoManager where Message: Send + Sync + 'stati event_loop.shutdown(); }, IoMessage::AddHandler { handler } => { - let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); + let handler_id = self.handlers.write().insert(handler.clone()); + assert!(handler_id <= MAX_HANDLERS, "Too many handlers registered"); handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id)); }, IoMessage::RemoveHandler { handler_id } => { @@ -332,7 +333,7 @@ impl Handler for IoManager where Message: Send + Sync + 'stati } enum Handlers where Message: Send { - SharedCollection(Weak>, HandlerId>>>), + SharedCollection(Weak>>>>), Single(Weak>), } @@ -417,7 +418,7 @@ impl IoChannel where Message: Send + Sync + 'static { handlers: Handlers::Single(handler), } } - fn new(channel: Sender>, handlers: Weak>, HandlerId>>>) -> IoChannel { + fn new(channel: Sender>, handlers: Weak>>>>) -> IoChannel { IoChannel { channel: Some(channel), handlers: Handlers::SharedCollection(handlers), @@ -430,7 +431,7 @@ impl IoChannel where Message: Send + Sync + 'static { pub struct IoService where Message: Send + Sync + 'static { thread: Mutex>>, host_channel: Mutex>>, - handlers: Arc>, HandlerId>>>, + handlers: Arc>>>>, } impl IoService where Message: Send + Sync + 'static { @@ -440,7 +441,7 @@ impl IoService where Message: Send + Sync + 'static { config.messages_per_tick(1024); let mut event_loop = config.build().expect("Error creating event loop"); let channel = event_loop.channel(); - let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS))); + let handlers = Arc::new(RwLock::new(Slab::with_capacity(MAX_HANDLERS))); let h = handlers.clone(); let thread = thread::spawn(move || { IoManager::::start(&mut event_loop, h).expect("Error starting IO service"); @@ -491,4 +492,3 @@ impl Drop for IoService where Message: Send + Sync { self.stop() } } - diff --git a/util/io/src/service_non_mio.rs b/util/io/src/service_non_mio.rs new file mode 100644 index 000000000..22a795e4e --- /dev/null +++ b/util/io/src/service_non_mio.rs @@ -0,0 +1,334 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::{Arc, Weak}; +use std::thread; +use crossbeam::sync::chase_lev; +use slab::Slab; +use fnv::FnvHashMap; +use {IoError, IoHandler}; +use parking_lot::{RwLock, Mutex}; +use num_cpus; +use std::time::Duration; +use timer::{Timer, Guard as TimerGuard}; +use time::Duration as TimeDuration; + +/// Timer ID +pub type TimerToken = usize; +/// IO Handler ID +pub type HandlerId = usize; + +/// Maximum number of tokens a handler can use +pub const TOKENS_PER_HANDLER: usize = 16384; +const MAX_HANDLERS: usize = 8; + +/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. +pub struct IoContext where Message: Send + Sync + 'static { + handler: HandlerId, + shared: Arc>, +} + +impl IoContext where Message: Send + Sync + 'static { + /// Register a new recurring IO timer. 'IoHandler::timeout' will be called with the token. + pub fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> { + let channel = self.channel(); + + let msg = WorkTask::TimerTrigger { + handler_id: self.handler, + token: token, + }; + + let delay = TimeDuration::from_std(delay) + .map_err(|e| ::std::io::Error::new(::std::io::ErrorKind::Other, e))?; + let guard = self.shared.timer.lock().schedule_repeating(delay, move || { + channel.send_raw(msg.clone()); + }); + + self.shared.timers.lock().insert(token, guard); + + Ok(()) + } + + /// Register a new IO timer once. 'IoHandler::timeout' will be called with the token. + pub fn register_timer_once(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> { + let channel = self.channel(); + + let msg = WorkTask::TimerTrigger { + handler_id: self.handler, + token: token, + }; + + let delay = TimeDuration::from_std(delay) + .map_err(|e| ::std::io::Error::new(::std::io::ErrorKind::Other, e))?; + let guard = self.shared.timer.lock().schedule_with_delay(delay, move || { + channel.send_raw(msg.clone()); + }); + + self.shared.timers.lock().insert(token, guard); + + Ok(()) + } + + /// Delete a timer. + pub fn clear_timer(&self, token: TimerToken) -> Result<(), IoError> { + self.shared.timers.lock().remove(&token); + Ok(()) + } + + /// Broadcast a message to other IO clients + pub fn message(&self, message: Message) -> Result<(), IoError> { + if let Some(ref channel) = *self.shared.channel.lock() { + channel.push(WorkTask::UserMessage(Arc::new(message))); + } + for thread in self.shared.threads.read().iter() { + thread.unpark(); + } + + Ok(()) + } + + /// Get message channel + pub fn channel(&self) -> IoChannel { + IoChannel { shared: Arc::downgrade(&self.shared) } + } + + /// Unregister current IO handler. + pub fn unregister_handler(&self) -> Result<(), IoError> { + self.shared.handlers.write().remove(self.handler); + Ok(()) + } +} + +/// Allows sending messages into the event loop. All the IO handlers will get the message +/// in the `message` callback. +pub struct IoChannel where Message: Send + Sync + 'static { + shared: Weak>, +} + +impl Clone for IoChannel where Message: Send + Sync + 'static { + fn clone(&self) -> IoChannel { + IoChannel { + shared: self.shared.clone(), + } + } +} + +impl IoChannel where Message: Send + Sync + 'static { + /// Send a message through the channel + pub fn send(&self, message: Message) -> Result<(), IoError> { + if let Some(shared) = self.shared.upgrade() { + match *shared.channel.lock() { + Some(ref channel) => channel.push(WorkTask::UserMessage(Arc::new(message))), + None => self.send_sync(message)? + }; + + for thread in shared.threads.read().iter() { + thread.unpark(); + } + } + + Ok(()) + } + + /// Send a message through the channel and handle it synchronously + pub fn send_sync(&self, message: Message) -> Result<(), IoError> { + if let Some(shared) = self.shared.upgrade() { + for id in 0 .. MAX_HANDLERS { + if let Some(h) = shared.handlers.read().get(id) { + let handler = h.clone(); + let ctxt = IoContext { handler: id, shared: shared.clone() }; + handler.message(&ctxt, &message); + } + } + } + + Ok(()) + } + + // Send low level io message + fn send_raw(&self, message: WorkTask) { + if let Some(shared) = self.shared.upgrade() { + if let Some(ref channel) = *shared.channel.lock() { + channel.push(message); + } + + for thread in shared.threads.read().iter() { + thread.unpark(); + } + } + } + + /// Create a new channel disconnected from an event loop. + pub fn disconnected() -> IoChannel { + IoChannel { + shared: Weak::default(), + } + } +} + +/// General IO Service. Starts an event loop and dispatches IO requests. +/// 'Message' is a notification message type +pub struct IoService where Message: Send + Sync + 'static { + thread_joins: Mutex>>, + shared: Arc>, +} + +// Struct shared throughout the whole implementation. +struct Shared where Message: Send + Sync + 'static { + // All the I/O handlers that have been registered. + handlers: RwLock>>>, + // All the background threads, so that we can unpark them. + threads: RwLock>, + // Used to create timeouts. + timer: Mutex, + // List of created timers. We need to keep them in a data struct so that we can cancel them if + // necessary. + timers: Mutex>, + // Channel used to send work to the worker threads. + channel: Mutex>>>, +} + +// Messages used to communicate with the event loop from other threads. +enum WorkTask where Message: Send + Sized { + Shutdown, + TimerTrigger { + handler_id: HandlerId, + token: TimerToken, + }, + UserMessage(Arc) +} + +impl Clone for WorkTask where Message: Send + Sized { + fn clone(&self) -> WorkTask { + match *self { + WorkTask::Shutdown => WorkTask::Shutdown, + WorkTask::TimerTrigger { handler_id, token } => WorkTask::TimerTrigger { handler_id, token }, + WorkTask::UserMessage(ref msg) => WorkTask::UserMessage(msg.clone()), + } + } +} + +impl IoService where Message: Send + Sync + 'static { + /// Starts IO event loop + pub fn start() -> Result, IoError> { + let (tx, rx) = chase_lev::deque(); + + let shared = Arc::new(Shared { + handlers: RwLock::new(Slab::with_capacity(MAX_HANDLERS)), + threads: RwLock::new(Vec::new()), + timer: Mutex::new(Timer::new()), + timers: Mutex::new(FnvHashMap::default()), + channel: Mutex::new(Some(tx)), + }); + + let thread_joins = (0 .. num_cpus::get()).map(|_| { + let rx = rx.clone(); + let shared = shared.clone(); + thread::spawn(move || { + do_work(&shared, rx) + }) + }).collect::>(); + + *shared.threads.write() = thread_joins.iter().map(|t| t.thread().clone()).collect(); + + Ok(IoService { + thread_joins: Mutex::new(thread_joins), + shared, + }) + } + + /// Stops the IO service. + pub fn stop(&self) { + trace!(target: "shutdown", "[IoService] Closing..."); + // Clear handlers so that shared pointers are not stuck on stack + // in Channel::send_sync + self.shared.handlers.write().clear(); + let channel = self.shared.channel.lock().take(); + let mut thread_joins = self.thread_joins.lock(); + if let Some(channel) = channel { + for _ in 0 .. thread_joins.len() { + channel.push(WorkTask::Shutdown); + } + } + for thread in thread_joins.drain(..) { + thread.thread().unpark(); + thread.join().unwrap_or_else(|e| { + debug!(target: "shutdown", "Error joining IO service worker thread: {:?}", e); + }); + } + trace!(target: "shutdown", "[IoService] Closed."); + } + + /// Register an IO handler with the event loop. + pub fn register_handler(&self, handler: Arc+Send>) -> Result<(), IoError> { + let id = self.shared.handlers.write().insert(handler.clone()); + assert!(id <= MAX_HANDLERS, "Too many handlers registered"); + let ctxt = IoContext { handler: id, shared: self.shared.clone() }; + handler.initialize(&ctxt); + Ok(()) + } + + /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. + pub fn send_message(&self, message: Message) -> Result<(), IoError> { + if let Some(ref channel) = *self.shared.channel.lock() { + channel.push(WorkTask::UserMessage(Arc::new(message))); + } + for thread in self.shared.threads.read().iter() { + thread.unpark(); + } + Ok(()) + } + + /// Create a new message channel + #[inline] + pub fn channel(&self) -> IoChannel { + IoChannel { + shared: Arc::downgrade(&self.shared) + } + } +} + +impl Drop for IoService where Message: Send + Sync { + fn drop(&mut self) { + self.stop() + } +} + +fn do_work(shared: &Arc>, rx: chase_lev::Stealer>) + where Message: Send + Sync + 'static +{ + loop { + match rx.steal() { + chase_lev::Steal::Abort => continue, + chase_lev::Steal::Empty => thread::park(), + chase_lev::Steal::Data(WorkTask::Shutdown) => break, + chase_lev::Steal::Data(WorkTask::UserMessage(message)) => { + for id in 0 .. MAX_HANDLERS { + if let Some(handler) = shared.handlers.read().get(id) { + let ctxt = IoContext { handler: id, shared: shared.clone() }; + handler.message(&ctxt, &message); + } + } + }, + chase_lev::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => { + if let Some(handler) = shared.handlers.read().get(handler_id) { + let ctxt = IoContext { handler: handler_id, shared: shared.clone() }; + handler.timeout(&ctxt, token); + } + }, + } + } +} diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index 0f0d448ec..89657810d 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -18,21 +18,14 @@ use std::sync::Arc; use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use crossbeam::sync::chase_lev; -use service::{HandlerId, IoChannel, IoContext}; +use service_mio::{HandlerId, IoChannel, IoContext}; use IoHandler; -use std::cell::Cell; +use LOCAL_STACK_SIZE; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; const STACK_SIZE: usize = 16*1024*1024; -thread_local! { - /// Stack size - /// Should be modified if it is changed in Rust since it is no way - /// to know or get it - pub static LOCAL_STACK_SIZE: Cell = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024)); -} - pub enum WorkType { Readable, Writable, diff --git a/util/network-devp2p/Cargo.toml b/util/network-devp2p/Cargo.toml index 3ec968439..f4889fe26 100644 --- a/util/network-devp2p/Cargo.toml +++ b/util/network-devp2p/Cargo.toml @@ -19,7 +19,7 @@ libc = "0.2.7" parking_lot = "0.5" ansi_term = "0.10" rustc-hex = "1.0" -ethcore-io = { path = "../io" } +ethcore-io = { path = "../io", features = ["mio"] } ethcore-bytes = { path = "../bytes" } ethcore-crypto = { path = "../../ethcore/crypto" } ethcore-logger = { path ="../../logger" }