openethereum/util/src/io/service.rs

338 lines
11 KiB
Rust
Raw Normal View History

2016-01-21 16:48:37 +01:00
use std::sync::*;
2016-01-12 17:33:40 +01:00
use std::thread::{self, JoinHandle};
2016-01-21 16:48:37 +01:00
use std::collections::HashMap;
2016-01-12 17:33:40 +01:00
use mio::*;
use hash::*;
use rlp::*;
use error::*;
use io::{IoError, IoHandler};
2016-01-21 16:48:37 +01:00
use arrayvec::*;
use crossbeam::sync::chase_lev;
use io::worker::{Worker, Work, WorkType};
2016-01-12 17:33:40 +01:00
/// Timer ID
2016-01-12 17:33:40 +01:00
pub type TimerToken = usize;
/// Timer ID
2016-01-12 17:33:40 +01:00
pub type StreamToken = usize;
/// IO Hadndler ID
2016-01-21 16:48:37 +01:00
pub type HandlerId = usize;
2016-01-12 17:33:40 +01:00
/// Maximum number of tokens a handler can use
2016-01-21 16:48:37 +01:00
pub const TOKENS_PER_HANDLER: usize = 16384;
2016-01-12 17:33:40 +01:00
/// Messages used to communicate with the event loop from other threads.
2016-01-21 16:48:37 +01:00
#[derive(Clone)]
pub enum IoMessage<Message> where Message: Send + Clone + Sized {
2016-01-12 17:33:40 +01:00
/// Shutdown the event loop
Shutdown,
/// Register a new protocol handler.
AddHandler {
2016-01-21 16:48:37 +01:00
handler: Arc<IoHandler<Message>+Send>,
},
AddTimer {
handler_id: HandlerId,
token: TimerToken,
delay: u64,
},
RemoveTimer {
handler_id: HandlerId,
token: TimerToken,
},
RegisterStream {
handler_id: HandlerId,
token: StreamToken,
},
2016-01-22 18:13:59 +01:00
DeregisterStream {
handler_id: HandlerId,
token: StreamToken,
},
2016-01-21 16:48:37 +01:00
UpdateStreamRegistration {
handler_id: HandlerId,
token: StreamToken,
2016-01-12 17:33:40 +01:00
},
/// Broadcast a message across all protocol handlers.
2016-01-13 13:56:48 +01:00
UserMessage(Message)
2016-01-12 17:33:40 +01:00
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
2016-01-21 16:48:37 +01:00
pub struct IoContext<Message> where Message: Send + Clone + 'static {
channel: IoChannel<Message>,
handler: HandlerId,
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
2016-01-12 17:33:40 +01:00
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
2016-01-21 16:48:37 +01:00
pub fn new(channel: IoChannel<Message>, handler: HandlerId) -> IoContext<Message> {
2016-01-12 17:33:40 +01:00
IoContext {
2016-01-21 16:48:37 +01:00
handler: handler,
channel: channel,
2016-01-12 17:33:40 +01:00
}
}
2016-01-21 16:48:37 +01:00
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::AddTimer {
token: token,
2016-01-12 17:33:40 +01:00
delay: ms,
2016-01-21 16:48:37 +01:00
handler_id: self.handler,
}));
Ok(())
}
/// Delete a timer.
pub fn clear_timer(&self, token: TimerToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::RemoveTimer {
token: token,
handler_id: self.handler,
}));
Ok(())
}
2016-01-22 18:13:59 +01:00
2016-01-21 16:48:37 +01:00
/// Register a new IO stream.
pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::RegisterStream {
token: token,
handler_id: self.handler,
}));
Ok(())
}
2016-01-22 18:13:59 +01:00
/// Deregister an IO stream.
pub fn deregister_stream(&self, token: StreamToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::DeregisterStream {
token: token,
handler_id: self.handler,
}));
Ok(())
}
2016-01-21 16:48:37 +01:00
/// Reregister an IO stream.
pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::UpdateStreamRegistration {
token: token,
handler_id: self.handler,
}));
Ok(())
2016-01-12 17:33:40 +01:00
}
/// Broadcast a message to other IO clients
2016-01-21 16:48:37 +01:00
pub fn message(&self, message: Message) {
self.channel.send(message).expect("Error seding message");
2016-01-12 17:33:40 +01:00
}
2016-01-21 23:33:52 +01:00
/// Get message channel
pub fn channel(&self) -> IoChannel<Message> {
self.channel.clone()
}
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
#[derive(Clone)]
2016-01-12 17:33:40 +01:00
struct UserTimer {
delay: u64,
2016-01-21 16:48:37 +01:00
timeout: Timeout,
2016-01-12 17:33:40 +01:00
}
/// Root IO handler. Manages user handlers, messages and IO timers.
2016-01-21 16:48:37 +01:00
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Vec<Arc<IoHandler<Message>>>,
_workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
2016-01-12 17:33:40 +01:00
/// Creates a new instance and registers it with the event loop.
2016-01-13 13:56:48 +01:00
pub fn start(event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
2016-01-21 16:48:37 +01:00
let (worker, stealer) = chase_lev::deque();
let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(()));
2016-01-21 16:48:37 +01:00
let work_ready = Arc::new(Condvar::new());
let workers = (0..num_workers).map(|i|
Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect();
2016-01-12 17:33:40 +01:00
let mut io = IoManager {
2016-01-21 16:48:37 +01:00
timers: Arc::new(RwLock::new(HashMap::new())),
2016-01-12 17:33:40 +01:00
handlers: Vec::new(),
2016-01-21 16:48:37 +01:00
worker_channel: worker,
_workers: workers,
work_ready: work_ready,
2016-01-12 17:33:40 +01:00
};
try!(event_loop.run(&mut io));
Ok(())
}
}
2016-01-21 16:48:37 +01:00
impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync + 'static {
2016-01-12 17:33:40 +01:00
type Timeout = Token;
2016-01-13 13:56:48 +01:00
type Message = IoMessage<Message>;
2016-01-12 17:33:40 +01:00
2016-01-21 16:48:37 +01:00
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if handler_index >= self.handlers.len() {
panic!("Unexpected stream token: {}", token.as_usize());
}
let handler = self.handlers[handler_index].clone();
2016-01-12 17:33:40 +01:00
if events.is_hup() {
2016-01-21 16:48:37 +01:00
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
else {
if events.is_readable() {
self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index });
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
if events.is_writable() {
self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index });
2016-01-12 17:33:40 +01:00
}
}
2016-01-21 16:48:37 +01:00
self.work_ready.notify_all();
2016-01-12 17:33:40 +01:00
}
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
2016-01-21 16:48:37 +01:00
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if handler_index >= self.handlers.len() {
panic!("Unexpected timer token: {}", token.as_usize());
}
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
let handler = self.handlers[handler_index].clone();
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index });
self.work_ready.notify_all();
2016-01-12 17:33:40 +01:00
}
}
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
2016-01-21 16:48:37 +01:00
match msg {
2016-01-12 17:33:40 +01:00
IoMessage::Shutdown => event_loop.shutdown(),
2016-01-21 16:48:37 +01:00
IoMessage::AddHandler { handler } => {
let handler_id = {
self.handlers.push(handler.clone());
self.handlers.len() - 1
};
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id));
},
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
self.timers.write().unwrap().insert(timer_id, UserTimer { delay: delay, timeout: timeout });
},
IoMessage::RemoveTimer { handler_id, token } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
2016-01-12 17:33:40 +01:00
},
2016-01-21 16:48:37 +01:00
IoMessage::RegisterStream { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
},
2016-01-22 18:13:59 +01:00
IoMessage::DeregisterStream { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.deregister_stream(token, event_loop);
},
2016-01-21 16:48:37 +01:00
IoMessage::UpdateStreamRegistration { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
},
IoMessage::UserMessage(data) => {
for n in 0 .. self.handlers.len() {
let handler = self.handlers[n].clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: n });
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
self.work_ready.notify_all();
2016-01-12 17:33:40 +01:00
}
}
}
}
2016-01-13 23:13:57 +01:00
/// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback.
2016-01-21 16:48:37 +01:00
pub struct IoChannel<Message> where Message: Send + Clone{
2016-01-18 00:24:20 +01:00
channel: Option<Sender<IoMessage<Message>>>
2016-01-13 23:13:57 +01:00
}
2016-01-21 16:48:37 +01:00
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone {
fn clone(&self) -> IoChannel<Message> {
IoChannel {
channel: self.channel.clone()
}
}
}
impl<Message> IoChannel<Message> where Message: Send + Clone {
2016-01-18 00:24:20 +01:00
/// Send a msessage through the channel
2016-01-17 23:07:58 +01:00
pub fn send(&self, message: Message) -> Result<(), IoError> {
2016-01-18 00:24:20 +01:00
if let Some(ref channel) = self.channel {
try!(channel.send(IoMessage::UserMessage(message)));
}
2016-01-13 23:13:57 +01:00
Ok(())
}
2016-01-18 00:24:20 +01:00
/// Send low level io message
2016-01-21 16:48:37 +01:00
pub fn send_io(&self, message: IoMessage<Message>) -> Result<(), IoError> {
if let Some(ref channel) = self.channel {
try!(channel.send(message))
}
Ok(())
}
2016-01-18 00:24:20 +01:00
/// Create a new channel to connected to event loop.
pub fn disconnected() -> IoChannel<Message> {
IoChannel { channel: None }
}
2016-01-21 16:48:37 +01:00
fn new(channel: Sender<IoMessage<Message>>) -> IoChannel<Message> {
IoChannel { channel: Some(channel) }
}
2016-01-13 23:13:57 +01:00
}
2016-01-12 17:33:40 +01:00
/// General IO Service. Starts an event loop and dispatches IO requests.
2016-01-13 13:56:48 +01:00
/// 'Message' is a notification message type
2016-01-21 16:48:37 +01:00
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
2016-01-12 17:33:40 +01:00
thread: Option<JoinHandle<()>>,
2016-01-21 16:48:37 +01:00
host_channel: Sender<IoMessage<Message>>,
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
2016-01-12 17:33:40 +01:00
/// Starts IO event loop
2016-01-13 13:56:48 +01:00
pub fn start() -> Result<IoService<Message>, UtilError> {
2016-01-12 17:33:40 +01:00
let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel();
let thread = thread::spawn(move || {
2016-01-13 13:56:48 +01:00
IoManager::<Message>::start(&mut event_loop).unwrap(); //TODO:
2016-01-12 17:33:40 +01:00
});
Ok(IoService {
thread: Some(thread),
host_channel: channel
})
}
/// Regiter a IO hadnler with the event loop.
2016-01-21 16:48:37 +01:00
pub fn register_handler(&mut self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
2016-01-12 17:33:40 +01:00
try!(self.host_channel.send(IoMessage::AddHandler {
handler: handler,
}));
Ok(())
}
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
2016-01-13 13:56:48 +01:00
pub fn send_message(&mut self, message: Message) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::UserMessage(message)));
Ok(())
}
2016-01-13 23:13:57 +01:00
/// Create a new message channel
pub fn channel(&mut self) -> IoChannel<Message> {
2016-01-18 00:24:20 +01:00
IoChannel { channel: Some(self.host_channel.clone()) }
2016-01-13 23:13:57 +01:00
}
2016-01-12 17:33:40 +01:00
}
2016-01-21 16:48:37 +01:00
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
2016-01-12 17:33:40 +01:00
fn drop(&mut self) {
self.host_channel.send(IoMessage::Shutdown).unwrap();
2016-01-22 01:27:51 +01:00
self.thread.take().unwrap().join().ok();
2016-01-12 17:33:40 +01:00
}
}