Make mio optional in ethcore-io (#8537)

* Make mio optional in ethcore-io

* Add some annotations, plus a check for features

* Increase timer for test
This commit is contained in:
Pierre Krieger
2018-05-10 12:34:36 +02:00
committed by Afri Schoedon
parent 6e2e08628a
commit 1b8f299df2
9 changed files with 510 additions and 60 deletions

View File

@@ -54,30 +54,59 @@
//! // Drop the service
//! }
//! ```
//!
//! # Mio vs non-mio
//!
//! This library has two modes: mio and not mio. The `mio` feature can be activated or deactivated
//! when compiling or depending on the library.
//!
//! Without mio, only timers and message-passing are available. With mio, you can also use
//! low-level sockets provided by mio.
//!
//! The non-mio mode exists because the `mio` library doesn't compile on platforms such as
//! emscripten.
//TODO: use Poll from mio
#![allow(deprecated)]
#[cfg(feature = "mio")]
extern crate mio;
#[macro_use]
extern crate log as rlog;
extern crate slab;
extern crate crossbeam;
extern crate parking_lot;
extern crate num_cpus;
extern crate timer;
extern crate fnv;
extern crate time;
mod service;
#[cfg(feature = "mio")]
mod service_mio;
#[cfg(not(feature = "mio"))]
mod service_non_mio;
#[cfg(feature = "mio")]
mod worker;
use std::cell::Cell;
use std::{fmt, error};
#[cfg(feature = "mio")]
use mio::deprecated::{EventLoop, NotifyError};
#[cfg(feature = "mio")]
use mio::Token;
pub use worker::LOCAL_STACK_SIZE;
thread_local! {
/// Stack size
/// Should be modified if it is changed in Rust since it is no way
/// to know or get it
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
}
#[derive(Debug)]
/// IO Error
pub enum IoError {
/// Low level error from mio crate
#[cfg(feature = "mio")]
Mio(::std::io::Error),
/// Error concerning the Rust standard library's IO subsystem.
StdIo(::std::io::Error),
@@ -88,6 +117,7 @@ impl fmt::Display for IoError {
// just defer to the std implementation for now.
// we can refine the formatting when more variants are added.
match *self {
#[cfg(feature = "mio")]
IoError::Mio(ref std_err) => std_err.fmt(f),
IoError::StdIo(ref std_err) => std_err.fmt(f),
}
@@ -106,8 +136,9 @@ impl From<::std::io::Error> for IoError {
}
}
impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where Message: Send {
fn from(_err: NotifyError<service::IoMessage<Message>>) -> IoError {
#[cfg(feature = "mio")]
impl<Message> From<NotifyError<service_mio::IoMessage<Message>>> for IoError where Message: Send {
fn from(_err: NotifyError<service_mio::IoMessage<Message>>) -> IoError {
IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
}
}
@@ -123,58 +154,120 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
/// Called when a broadcasted message is received. The message can only be sent from a different IO handler.
fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
/// Called when an IO stream gets closed
#[cfg(feature = "mio")]
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Called when an IO stream can be read from
#[cfg(feature = "mio")]
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Called when an IO stream can be written to
#[cfg(feature = "mio")]
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Register a new stream with the event loop
#[cfg(feature = "mio")]
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// Re-register a stream with the event loop
#[cfg(feature = "mio")]
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// Deregister a stream. Called whenstream is removed from event loop
#[cfg(feature = "mio")]
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
}
pub use service::TimerToken;
pub use service::StreamToken;
pub use service::IoContext;
pub use service::IoService;
pub use service::IoChannel;
pub use service::IoManager;
pub use service::TOKENS_PER_HANDLER;
#[cfg(feature = "mio")]
pub use service_mio::{TimerToken, StreamToken, IoContext, IoService, IoChannel, IoManager, TOKENS_PER_HANDLER};
#[cfg(not(feature = "mio"))]
pub use service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER};
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic;
use std::thread;
use std::time::Duration;
use super::*;
struct MyHandler;
#[test]
fn send_message_to_handler() {
struct MyHandler(atomic::AtomicBool);
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer(0, Duration::from_secs(1)).unwrap();
#[derive(Clone)]
struct MyMessage {
data: u32
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
println!("Timeout {}", timer);
impl IoHandler<MyMessage> for MyHandler {
fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
assert_eq!(message.data, 5);
self.0.store(true, atomic::Ordering::SeqCst);
}
}
fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
println!("Message {}", message.data);
}
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
service.send_message(MyMessage { data: 5 }).unwrap();
thread::sleep(Duration::from_secs(5));
assert!(handler.0.load(atomic::Ordering::SeqCst));
}
#[test]
fn test_service_register_handler () {
fn timeout_working() {
struct MyHandler(atomic::AtomicBool);
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer_once(1234, Duration::from_millis(500)).unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
assert_eq!(timer, 1234);
assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
}
}
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(Arc::new(MyHandler)).unwrap();
service.register_handler(handler.clone()).unwrap();
thread::sleep(Duration::from_secs(2));
assert!(handler.0.load(atomic::Ordering::SeqCst));
}
#[test]
fn multi_timeout_working() {
struct MyHandler(atomic::AtomicUsize);
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer(1234, Duration::from_millis(500)).unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
assert_eq!(timer, 1234);
self.0.fetch_add(1, atomic::Ordering::SeqCst);
}
}
let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
thread::sleep(Duration::from_secs(2));
assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
}
}

View File

@@ -181,7 +181,7 @@ struct UserTimer {
/// Root IO handler. Manages user handlers, messages and IO timers.
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<SCondvar>,
@@ -191,7 +191,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
/// Creates a new instance and registers it with the event loop.
pub fn start(
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>
) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque();
let num_workers = 4;
@@ -267,7 +267,8 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Sync + 'stati
event_loop.shutdown();
},
IoMessage::AddHandler { handler } => {
let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
let handler_id = self.handlers.write().insert(handler.clone());
assert!(handler_id <= MAX_HANDLERS, "Too many handlers registered");
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id));
},
IoMessage::RemoveHandler { handler_id } => {
@@ -332,7 +333,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Sync + 'stati
}
enum Handlers<Message> where Message: Send {
SharedCollection(Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>),
SharedCollection(Weak<RwLock<Slab<Arc<IoHandler<Message>>>>>),
Single(Weak<IoHandler<Message>>),
}
@@ -417,7 +418,7 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
handlers: Handlers::Single(handler),
}
}
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>) -> IoChannel<Message> {
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>>>>) -> IoChannel<Message> {
IoChannel {
channel: Some(channel),
handlers: Handlers::SharedCollection(handlers),
@@ -430,7 +431,7 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Mutex<Option<JoinHandle<()>>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
}
impl<Message> IoService<Message> where Message: Send + Sync + 'static {
@@ -440,7 +441,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
config.messages_per_tick(1024);
let mut event_loop = config.build().expect("Error creating event loop");
let channel = event_loop.channel();
let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS)));
let handlers = Arc::new(RwLock::new(Slab::with_capacity(MAX_HANDLERS)));
let h = handlers.clone();
let thread = thread::spawn(move || {
IoManager::<Message>::start(&mut event_loop, h).expect("Error starting IO service");
@@ -491,4 +492,3 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync {
self.stop()
}
}

View File

@@ -0,0 +1,334 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{Arc, Weak};
use std::thread;
use crossbeam::sync::chase_lev;
use slab::Slab;
use fnv::FnvHashMap;
use {IoError, IoHandler};
use parking_lot::{RwLock, Mutex};
use num_cpus;
use std::time::Duration;
use timer::{Timer, Guard as TimerGuard};
use time::Duration as TimeDuration;
/// Timer ID
pub type TimerToken = usize;
/// IO Handler ID
pub type HandlerId = usize;
/// Maximum number of tokens a handler can use
pub const TOKENS_PER_HANDLER: usize = 16384;
const MAX_HANDLERS: usize = 8;
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct IoContext<Message> where Message: Send + Sync + 'static {
handler: HandlerId,
shared: Arc<Shared<Message>>,
}
impl<Message> IoContext<Message> where Message: Send + Sync + 'static {
/// Register a new recurring IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> {
let channel = self.channel();
let msg = WorkTask::TimerTrigger {
handler_id: self.handler,
token: token,
};
let delay = TimeDuration::from_std(delay)
.map_err(|e| ::std::io::Error::new(::std::io::ErrorKind::Other, e))?;
let guard = self.shared.timer.lock().schedule_repeating(delay, move || {
channel.send_raw(msg.clone());
});
self.shared.timers.lock().insert(token, guard);
Ok(())
}
/// Register a new IO timer once. 'IoHandler::timeout' will be called with the token.
pub fn register_timer_once(&self, token: TimerToken, delay: Duration) -> Result<(), IoError> {
let channel = self.channel();
let msg = WorkTask::TimerTrigger {
handler_id: self.handler,
token: token,
};
let delay = TimeDuration::from_std(delay)
.map_err(|e| ::std::io::Error::new(::std::io::ErrorKind::Other, e))?;
let guard = self.shared.timer.lock().schedule_with_delay(delay, move || {
channel.send_raw(msg.clone());
});
self.shared.timers.lock().insert(token, guard);
Ok(())
}
/// Delete a timer.
pub fn clear_timer(&self, token: TimerToken) -> Result<(), IoError> {
self.shared.timers.lock().remove(&token);
Ok(())
}
/// Broadcast a message to other IO clients
pub fn message(&self, message: Message) -> Result<(), IoError> {
if let Some(ref channel) = *self.shared.channel.lock() {
channel.push(WorkTask::UserMessage(Arc::new(message)));
}
for thread in self.shared.threads.read().iter() {
thread.unpark();
}
Ok(())
}
/// Get message channel
pub fn channel(&self) -> IoChannel<Message> {
IoChannel { shared: Arc::downgrade(&self.shared) }
}
/// Unregister current IO handler.
pub fn unregister_handler(&self) -> Result<(), IoError> {
self.shared.handlers.write().remove(self.handler);
Ok(())
}
}
/// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback.
pub struct IoChannel<Message> where Message: Send + Sync + 'static {
shared: Weak<Shared<Message>>,
}
impl<Message> Clone for IoChannel<Message> where Message: Send + Sync + 'static {
fn clone(&self) -> IoChannel<Message> {
IoChannel {
shared: self.shared.clone(),
}
}
}
impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
/// Send a message through the channel
pub fn send(&self, message: Message) -> Result<(), IoError> {
if let Some(shared) = self.shared.upgrade() {
match *shared.channel.lock() {
Some(ref channel) => channel.push(WorkTask::UserMessage(Arc::new(message))),
None => self.send_sync(message)?
};
for thread in shared.threads.read().iter() {
thread.unpark();
}
}
Ok(())
}
/// Send a message through the channel and handle it synchronously
pub fn send_sync(&self, message: Message) -> Result<(), IoError> {
if let Some(shared) = self.shared.upgrade() {
for id in 0 .. MAX_HANDLERS {
if let Some(h) = shared.handlers.read().get(id) {
let handler = h.clone();
let ctxt = IoContext { handler: id, shared: shared.clone() };
handler.message(&ctxt, &message);
}
}
}
Ok(())
}
// Send low level io message
fn send_raw(&self, message: WorkTask<Message>) {
if let Some(shared) = self.shared.upgrade() {
if let Some(ref channel) = *shared.channel.lock() {
channel.push(message);
}
for thread in shared.threads.read().iter() {
thread.unpark();
}
}
}
/// Create a new channel disconnected from an event loop.
pub fn disconnected() -> IoChannel<Message> {
IoChannel {
shared: Weak::default(),
}
}
}
/// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread_joins: Mutex<Vec<thread::JoinHandle<()>>>,
shared: Arc<Shared<Message>>,
}
// Struct shared throughout the whole implementation.
struct Shared<Message> where Message: Send + Sync + 'static {
// All the I/O handlers that have been registered.
handlers: RwLock<Slab<Arc<IoHandler<Message>>>>,
// All the background threads, so that we can unpark them.
threads: RwLock<Vec<thread::Thread>>,
// Used to create timeouts.
timer: Mutex<Timer>,
// List of created timers. We need to keep them in a data struct so that we can cancel them if
// necessary.
timers: Mutex<FnvHashMap<TimerToken, TimerGuard>>,
// Channel used to send work to the worker threads.
channel: Mutex<Option<chase_lev::Worker<WorkTask<Message>>>>,
}
// Messages used to communicate with the event loop from other threads.
enum WorkTask<Message> where Message: Send + Sized {
Shutdown,
TimerTrigger {
handler_id: HandlerId,
token: TimerToken,
},
UserMessage(Arc<Message>)
}
impl<Message> Clone for WorkTask<Message> where Message: Send + Sized {
fn clone(&self) -> WorkTask<Message> {
match *self {
WorkTask::Shutdown => WorkTask::Shutdown,
WorkTask::TimerTrigger { handler_id, token } => WorkTask::TimerTrigger { handler_id, token },
WorkTask::UserMessage(ref msg) => WorkTask::UserMessage(msg.clone()),
}
}
}
impl<Message> IoService<Message> where Message: Send + Sync + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, IoError> {
let (tx, rx) = chase_lev::deque();
let shared = Arc::new(Shared {
handlers: RwLock::new(Slab::with_capacity(MAX_HANDLERS)),
threads: RwLock::new(Vec::new()),
timer: Mutex::new(Timer::new()),
timers: Mutex::new(FnvHashMap::default()),
channel: Mutex::new(Some(tx)),
});
let thread_joins = (0 .. num_cpus::get()).map(|_| {
let rx = rx.clone();
let shared = shared.clone();
thread::spawn(move || {
do_work(&shared, rx)
})
}).collect::<Vec<_>>();
*shared.threads.write() = thread_joins.iter().map(|t| t.thread().clone()).collect();
Ok(IoService {
thread_joins: Mutex::new(thread_joins),
shared,
})
}
/// Stops the IO service.
pub fn stop(&self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
self.shared.handlers.write().clear();
let channel = self.shared.channel.lock().take();
let mut thread_joins = self.thread_joins.lock();
if let Some(channel) = channel {
for _ in 0 .. thread_joins.len() {
channel.push(WorkTask::Shutdown);
}
}
for thread in thread_joins.drain(..) {
thread.thread().unpark();
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service worker thread: {:?}", e);
});
}
trace!(target: "shutdown", "[IoService] Closed.");
}
/// Register an IO handler with the event loop.
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
let id = self.shared.handlers.write().insert(handler.clone());
assert!(id <= MAX_HANDLERS, "Too many handlers registered");
let ctxt = IoContext { handler: id, shared: self.shared.clone() };
handler.initialize(&ctxt);
Ok(())
}
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
pub fn send_message(&self, message: Message) -> Result<(), IoError> {
if let Some(ref channel) = *self.shared.channel.lock() {
channel.push(WorkTask::UserMessage(Arc::new(message)));
}
for thread in self.shared.threads.read().iter() {
thread.unpark();
}
Ok(())
}
/// Create a new message channel
#[inline]
pub fn channel(&self) -> IoChannel<Message> {
IoChannel {
shared: Arc::downgrade(&self.shared)
}
}
}
impl<Message> Drop for IoService<Message> where Message: Send + Sync {
fn drop(&mut self) {
self.stop()
}
}
fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: chase_lev::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
{
loop {
match rx.steal() {
chase_lev::Steal::Abort => continue,
chase_lev::Steal::Empty => thread::park(),
chase_lev::Steal::Data(WorkTask::Shutdown) => break,
chase_lev::Steal::Data(WorkTask::UserMessage(message)) => {
for id in 0 .. MAX_HANDLERS {
if let Some(handler) = shared.handlers.read().get(id) {
let ctxt = IoContext { handler: id, shared: shared.clone() };
handler.message(&ctxt, &message);
}
}
},
chase_lev::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
if let Some(handler) = shared.handlers.read().get(handler_id) {
let ctxt = IoContext { handler: handler_id, shared: shared.clone() };
handler.timeout(&ctxt, token);
}
},
}
}
}

View File

@@ -18,21 +18,14 @@ use std::sync::Arc;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
use service::{HandlerId, IoChannel, IoContext};
use service_mio::{HandlerId, IoChannel, IoContext};
use IoHandler;
use std::cell::Cell;
use LOCAL_STACK_SIZE;
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
const STACK_SIZE: usize = 16*1024*1024;
thread_local! {
/// Stack size
/// Should be modified if it is changed in Rust since it is no way
/// to know or get it
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
}
pub enum WorkType<Message> {
Readable,
Writable,