Merge remote-tracking branch 'parity/master'
This commit is contained in:
@@ -68,6 +68,8 @@ mod panics;
|
||||
use mio::{EventLoop, Token};
|
||||
use std::fmt;
|
||||
|
||||
pub use worker::LOCAL_STACK_SIZE;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// IO Error
|
||||
pub enum IoError {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::collections::HashMap;
|
||||
use mio::*;
|
||||
@@ -76,12 +76,12 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
|
||||
}
|
||||
|
||||
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
|
||||
pub struct IoContext<Message> where Message: Send + Clone + 'static {
|
||||
pub struct IoContext<Message> where Message: Send + Clone + Sync + 'static {
|
||||
channel: IoChannel<Message>,
|
||||
handler: HandlerId,
|
||||
}
|
||||
|
||||
impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
||||
impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
|
||||
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
|
||||
pub fn new(channel: IoChannel<Message>, handler: HandlerId) -> IoContext<Message> {
|
||||
IoContext {
|
||||
@@ -179,7 +179,7 @@ struct UserTimer {
|
||||
/// Root IO handler. Manages user handlers, messages and IO timers.
|
||||
pub struct IoManager<Message> where Message: Send + Sync {
|
||||
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
|
||||
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
|
||||
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
|
||||
workers: Vec<Worker>,
|
||||
worker_channel: chase_lev::Worker<Work<Message>>,
|
||||
work_ready: Arc<SCondvar>,
|
||||
@@ -187,7 +187,11 @@ pub struct IoManager<Message> where Message: Send + Sync {
|
||||
|
||||
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
||||
/// Creates a new instance and registers it with the event loop.
|
||||
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), IoError> {
|
||||
pub fn start(
|
||||
panic_handler: Arc<PanicHandler>,
|
||||
event_loop: &mut EventLoop<IoManager<Message>>,
|
||||
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>
|
||||
) -> Result<(), IoError> {
|
||||
let (worker, stealer) = chase_lev::deque();
|
||||
let num_workers = 4;
|
||||
let work_ready_mutex = Arc::new(SMutex::new(()));
|
||||
@@ -196,7 +200,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
||||
Worker::new(
|
||||
i,
|
||||
stealer.clone(),
|
||||
IoChannel::new(event_loop.channel()),
|
||||
IoChannel::new(event_loop.channel(), Arc::downgrade(&handlers)),
|
||||
work_ready.clone(),
|
||||
work_ready_mutex.clone(),
|
||||
panic_handler.clone(),
|
||||
@@ -205,7 +209,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
||||
|
||||
let mut io = IoManager {
|
||||
timers: Arc::new(RwLock::new(HashMap::new())),
|
||||
handlers: Slab::new(MAX_HANDLERS),
|
||||
handlers: handlers,
|
||||
worker_channel: worker,
|
||||
workers: workers,
|
||||
work_ready: work_ready,
|
||||
@@ -222,7 +226,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
||||
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
||||
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
||||
if let Some(handler) = self.handlers.get(handler_index) {
|
||||
if let Some(handler) = self.handlers.read().get(handler_index) {
|
||||
if events.is_hup() {
|
||||
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||
}
|
||||
@@ -241,9 +245,8 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
|
||||
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
||||
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
||||
if let Some(handler) = self.handlers.get(handler_index) {
|
||||
let option = self.timers.read().get(&token.as_usize()).cloned();
|
||||
if let Some(timer) = option {
|
||||
if let Some(handler) = self.handlers.read().get(handler_index) {
|
||||
if let Some(timer) = self.timers.read().get(&token.as_usize()) {
|
||||
if timer.once {
|
||||
self.timers.write().remove(&token_id);
|
||||
event_loop.clear_timeout(timer.timeout);
|
||||
@@ -263,12 +266,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
event_loop.shutdown();
|
||||
},
|
||||
IoMessage::AddHandler { handler } => {
|
||||
let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
|
||||
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id));
|
||||
let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
|
||||
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id));
|
||||
},
|
||||
IoMessage::RemoveHandler { handler_id } => {
|
||||
// TODO: flush event loop
|
||||
self.handlers.remove(handler_id);
|
||||
self.handlers.write().remove(handler_id);
|
||||
// unregister timers
|
||||
let mut timers = self.timers.write();
|
||||
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
|
||||
@@ -289,12 +292,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
}
|
||||
},
|
||||
IoMessage::RegisterStream { handler_id, token } => {
|
||||
if let Some(handler) = self.handlers.get(handler_id) {
|
||||
if let Some(handler) = self.handlers.read().get(handler_id) {
|
||||
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||
}
|
||||
},
|
||||
IoMessage::DeregisterStream { handler_id, token } => {
|
||||
if let Some(handler) = self.handlers.get(handler_id) {
|
||||
if let Some(handler) = self.handlers.read().get(handler_id) {
|
||||
handler.deregister_stream(token, event_loop);
|
||||
// unregister a timer associated with the token (if any)
|
||||
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
||||
@@ -304,14 +307,14 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
}
|
||||
},
|
||||
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
||||
if let Some(handler) = self.handlers.get(handler_id) {
|
||||
if let Some(handler) = self.handlers.read().get(handler_id) {
|
||||
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||
}
|
||||
},
|
||||
IoMessage::UserMessage(data) => {
|
||||
//TODO: better way to iterate the slab
|
||||
for id in 0 .. MAX_HANDLERS {
|
||||
if let Some(h) = self.handlers.get(id) {
|
||||
if let Some(h) = self.handlers.read().get(id) {
|
||||
let handler = h.clone();
|
||||
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id });
|
||||
}
|
||||
@@ -325,19 +328,21 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
||||
/// Allows sending messages into the event loop. All the IO handlers will get the message
|
||||
/// in the `message` callback.
|
||||
pub struct IoChannel<Message> where Message: Send + Clone{
|
||||
channel: Option<Sender<IoMessage<Message>>>
|
||||
channel: Option<Sender<IoMessage<Message>>>,
|
||||
handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
|
||||
}
|
||||
|
||||
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone {
|
||||
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync + 'static {
|
||||
fn clone(&self) -> IoChannel<Message> {
|
||||
IoChannel {
|
||||
channel: self.channel.clone()
|
||||
channel: self.channel.clone(),
|
||||
handlers: self.handlers.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Message> IoChannel<Message> where Message: Send + Clone {
|
||||
/// Send a msessage through the channel
|
||||
impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
|
||||
/// Send a message through the channel
|
||||
pub fn send(&self, message: Message) -> Result<(), IoError> {
|
||||
if let Some(ref channel) = self.channel {
|
||||
try!(channel.send(IoMessage::UserMessage(message)));
|
||||
@@ -345,6 +350,19 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a message through the channel and handle it synchronously
|
||||
pub fn send_sync(&self, message: Message) -> Result<(), IoError> {
|
||||
if let Some(handlers) = self.handlers.upgrade() {
|
||||
for id in 0 .. MAX_HANDLERS {
|
||||
if let Some(h) = handlers.read().get(id) {
|
||||
let handler = h.clone();
|
||||
handler.message(&IoContext::new(self.clone(), id), &message);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send low level io message
|
||||
pub fn send_io(&self, message: IoMessage<Message>) -> Result<(), IoError> {
|
||||
if let Some(ref channel) = self.channel {
|
||||
@@ -354,11 +372,17 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
|
||||
}
|
||||
/// Create a new channel to connected to event loop.
|
||||
pub fn disconnected() -> IoChannel<Message> {
|
||||
IoChannel { channel: None }
|
||||
IoChannel {
|
||||
channel: None,
|
||||
handlers: Weak::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn new(channel: Sender<IoMessage<Message>>) -> IoChannel<Message> {
|
||||
IoChannel { channel: Some(channel) }
|
||||
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>) -> IoChannel<Message> {
|
||||
IoChannel {
|
||||
channel: Some(channel),
|
||||
handlers: handlers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -368,6 +392,7 @@ pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
panic_handler: Arc<PanicHandler>,
|
||||
thread: Option<JoinHandle<()>>,
|
||||
host_channel: Sender<IoMessage<Message>>,
|
||||
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
|
||||
}
|
||||
|
||||
impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
@@ -385,16 +410,19 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
|
||||
let channel = event_loop.channel();
|
||||
let panic = panic_handler.clone();
|
||||
let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS)));
|
||||
let h = handlers.clone();
|
||||
let thread = thread::spawn(move || {
|
||||
let p = panic.clone();
|
||||
panic.catch_panic(move || {
|
||||
IoManager::<Message>::start(p, &mut event_loop).unwrap();
|
||||
IoManager::<Message>::start(p, &mut event_loop, h).unwrap();
|
||||
}).unwrap()
|
||||
});
|
||||
Ok(IoService {
|
||||
panic_handler: panic_handler,
|
||||
thread: Some(thread),
|
||||
host_channel: channel
|
||||
host_channel: channel,
|
||||
handlers: handlers,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -414,7 +442,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
|
||||
/// Create a new message channel
|
||||
pub fn channel(&self) -> IoChannel<Message> {
|
||||
IoChannel { channel: Some(self.host_channel.clone()) }
|
||||
IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,9 +22,19 @@ use crossbeam::sync::chase_lev;
|
||||
use service::{HandlerId, IoChannel, IoContext};
|
||||
use IoHandler;
|
||||
use panics::*;
|
||||
use std::cell::Cell;
|
||||
|
||||
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||
|
||||
const STACK_SIZE: usize = 16*1024*1024;
|
||||
|
||||
thread_local! {
|
||||
/// Stack size
|
||||
/// Should be modified if it is changed in Rust since it is no way
|
||||
/// to know or get it
|
||||
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
|
||||
}
|
||||
|
||||
pub enum WorkType<Message> {
|
||||
Readable,
|
||||
Writable,
|
||||
@@ -66,8 +76,9 @@ impl Worker {
|
||||
deleting: deleting.clone(),
|
||||
wait_mutex: wait_mutex.clone(),
|
||||
};
|
||||
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
|
||||
worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn(
|
||||
move || {
|
||||
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
|
||||
panic_handler.catch_panic(move || {
|
||||
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
|
||||
}).unwrap()
|
||||
@@ -90,11 +101,11 @@ impl Worker {
|
||||
let _ = wait.wait(lock);
|
||||
}
|
||||
|
||||
if deleting.load(AtomicOrdering::Acquire) {
|
||||
return;
|
||||
}
|
||||
while let chase_lev::Steal::Data(work) = stealer.steal() {
|
||||
Worker::do_work(work, channel.clone());
|
||||
while !deleting.load(AtomicOrdering::Acquire) {
|
||||
match stealer.steal() {
|
||||
chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()),
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user