From 982063e1ac8db74b263960d5cfaba0428064acfe Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 12 Jan 2016 17:33:40 +0100 Subject: [PATCH] Started IO service refactoring --- src/io/mod.rs | 68 +++++++++++++++ src/io/service.rs | 207 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 276 insertions(+) create mode 100644 src/io/mod.rs create mode 100644 src/io/service.rs diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 000000000..e8562e64a --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,68 @@ +/// General IO module. +/// +/// Example usage for craeting a network service and adding an IO handler: +/// +/// ```rust +/// extern crate ethcore_util as util; +/// use util::network::*; +/// +/// struct MyHandler; +/// +/// impl ProtocolHandler for MyHandler { +/// fn initialize(&mut self, io: &mut HandlerIo) { +/// io.register_timer(1000); +/// } +/// +/// fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { +/// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); +/// } +/// +/// fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// println!("Connected {}", peer); +/// } +/// +/// fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { +/// println!("Disconnected {}", peer); +/// } +/// +/// fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { +/// println!("Timeout {}", timer); +/// } +/// +/// fn message(&mut self, io: &mut HandlerIo, message: &Message) { +/// println!("Message {}:{}", message.protocol, message.id); +/// } +/// } +/// +/// fn main () { +/// let mut service = NetworkService::start().expect("Error creating network service"); +/// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]); +/// +/// // Wait for quit condition +/// // ... +/// // Drop the service +/// } +/// ``` +extern crate mio; +mod service; + +#[derive(Debug)] +pub enum IoError { + Mio(::std::io::Error), +} + +impl From<::mio::NotifyError>> for IoError { + fn from(_err: ::mio::NotifyError>) -> IoError { + IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) + } +} + +pub type TimerToken = service::TimerToken; +pub type StreamToken = service::StreamToken; +pub type IoContext<'s, M> = service::IoContext<'s, M>; +pub type Message = service::UserMessage; +pub type IoService = service::IoService; +pub type IoHandler = service::IoHandler; + + + diff --git a/src/io/service.rs b/src/io/service.rs new file mode 100644 index 000000000..4ecc34723 --- /dev/null +++ b/src/io/service.rs @@ -0,0 +1,207 @@ +use std::thread::{self, JoinHandle}; +use mio::*; +use mio::util::{Slab}; +use hash::*; +use rlp::*; +use error::*; +use io::IoError; + +/// Generic IO handler. +/// All the handler function are called from within IO event loop. +pub trait IoHandler: Send where M: Send + 'static { + /// Initialize the hadler + fn initialize(&mut self, _io: &mut IoContext) {} + /// Timer function called after a timeout created with `HandlerIo::timeout`. + fn timeout(&mut self, _io: &mut IoContext, _timer: TimerToken) {} + /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. + fn message(&mut self, _io: &mut IoContext, _message: &M) {} + /// Called when an IO stream gets closed + fn stream_hup(&mut self, _io: &mut IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be read from + fn stream_readable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} + /// Called when an IO stream can be written to + fn stream_writable(&mut self, _io: &mut IoContext, _stream: StreamToken) {} +} + +pub type TimerToken = usize; +pub type StreamToken = usize; + +// Tokens +const MAX_USER_TIMERS: usize = 32; +const USER_TIMER: usize = 0; +const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; + +/// Messages used to communicate with the event loop from other threads. +pub enum IoMessage { + /// Shutdown the event loop + Shutdown, + /// Register a new protocol handler. + AddHandler { + handler: Box+Send>, + }, + /// Broadcast a message across all protocol handlers. + UserMessage(UserMessage), +} + +/// User +pub struct UserMessage { + pub data: M, +} + +/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. +pub struct IoContext<'s, M> where M: Send + 'static { + timers: &'s mut Slab, + event_loop: &'s mut EventLoop>, +} + +impl<'s, M> IoContext<'s, M> where M: Send + 'static { + /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. + fn new(event_loop: &'s mut EventLoop>, timers: &'s mut Slab) -> IoContext<'s, M> { + IoContext { + event_loop: event_loop, + timers: timers, + } + } + + /// Register a new IO timer. Returns a new timer token. 'IoHandler::timeout' will be called with the token. + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.timers.insert(UserTimer { + delay: ms, + }) { + Ok(token) => { + self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); + Ok(token.as_usize()) + }, + _ => { panic!("Max timers reached") } + } + } + + /// Broadcast a message to other IO clients + pub fn message(&mut self, message: M) { + match self.event_loop.channel().send(IoMessage::UserMessage(UserMessage { + data: message + })) { + Ok(_) => {} + Err(e) => { panic!("Error sending io message {:?}", e); } + } + } +} + +struct UserTimer { + delay: u64, +} + +/// Root IO handler. Manages user handlers, messages and IO timers. +pub struct IoManager where M: Send { + timers: Slab, + handlers: Vec>>, +} + +impl IoManager where M: Send + 'static { + /// Creates a new instance and registers it with the event loop. + pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + let mut io = IoManager { + timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + handlers: Vec::new(), + }; + try!(event_loop.run(&mut io)); + Ok(()) + } +} + +impl Handler for IoManager where M: Send + 'static { + type Timeout = Token; + type Message = IoMessage; + + fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { + if events.is_hup() { + for h in self.handlers.iter_mut() { + h.stream_hup(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + else if events.is_readable() { + for h in self.handlers.iter_mut() { + h.stream_readable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + else if events.is_writable() { + for h in self.handlers.iter_mut() { + h.stream_writable(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + } + + fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { + match token.as_usize() { + USER_TIMER ... LAST_USER_TIMER => { + let delay = { + let timer = self.timers.get_mut(token).expect("Unknown user timer token"); + timer.delay + }; + for h in self.handlers.iter_mut() { + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + } + _ => { // Just pass the event down. IoHandler is supposed to re-register it if required. + for h in self.handlers.iter_mut() { + h.timeout(&mut IoContext::new(event_loop, &mut self.timers), token.as_usize()); + } + } + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { + match msg { + IoMessage::Shutdown => event_loop.shutdown(), + IoMessage::AddHandler { + handler, + } => { + self.handlers.push(handler); + }, + IoMessage::UserMessage(message) => { + for h in self.handlers.iter_mut() { + h.message(&mut IoContext::new(event_loop, &mut self.timers), &message.data); + } + } + } + } +} + +/// General IO Service. Starts an event loop and dispatches IO requests. +/// 'M' is a notification message type +pub struct IoService where M: Send + 'static { + thread: Option>, + host_channel: Sender> +} + +impl IoService where M: Send + 'static { + /// Starts IO event loop + pub fn start() -> Result, UtilError> { + let mut event_loop = EventLoop::new().unwrap(); + let channel = event_loop.channel(); + let thread = thread::spawn(move || { + IoManager::::start(&mut event_loop).unwrap(); //TODO: + }); + Ok(IoService { + thread: Some(thread), + host_channel: channel + }) + } + + /// Regiter a IO hadnler with the event loop. + pub fn register_handler(&mut self, handler: Box+Send>) -> Result<(), IoError> { + try!(self.host_channel.send(IoMessage::AddHandler { + handler: handler, + })); + Ok(()) + } +} + +impl Drop for IoService where M: Send { + fn drop(&mut self) { + self.host_channel.send(IoMessage::Shutdown).unwrap(); + self.thread.take().unwrap().join().unwrap(); + } +} + diff --git a/src/lib.rs b/src/lib.rs index b856431cd..acd96ee5f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,7 @@ pub mod nibbleslice; pub mod heapsizeof; pub mod squeeze; pub mod semantic_version; +pub mod io; pub mod network; pub use common::*;