Implementing PanicHandlers for all places when new thread is spawned. Handling Client panics

This commit is contained in:
Tomusdrw
2016-02-10 12:50:27 +01:00
parent 0757ac1493
commit 2a498fc3eb
8 changed files with 299 additions and 105 deletions

View File

@@ -31,16 +31,16 @@
//!
//! impl IoHandler<MyMessage> for MyHandler {
//! fn initialize(&self, io: &IoContext<MyMessage>) {
//! io.register_timer(0, 1000).unwrap();
//! }
//! io.register_timer(0, 1000).unwrap();
//! }
//!
//! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
//! println!("Timeout {}", timer);
//! }
//! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
//! println!("Timeout {}", timer);
//! }
//!
//! fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
//! println!("Message {}", message.data);
//! }
//! fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
//! println!("Message {}", message.data);
//! }
//! }
//!
//! fn main () {
@@ -70,7 +70,7 @@ impl<Message> From<::mio::NotifyError<service::IoMessage<Message>>> for IoError
}
}
/// Generic IO handler.
/// Generic IO handler.
/// All the handler function are called from within IO event loop.
/// `Message` type is used as notification data
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + 'static {
@@ -82,7 +82,7 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + '
fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
/// Called when an IO stream gets closed
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Called when an IO stream can be read from
/// Called when an IO stream can be read from
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Called when an IO stream can be written to
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}

View File

@@ -25,6 +25,7 @@ use io::{IoError, IoHandler};
use arrayvec::*;
use crossbeam::sync::chase_lev;
use io::worker::{Worker, Work, WorkType};
use panics::*;
/// Timer ID
pub type TimerToken = usize;
@@ -164,7 +165,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new());
let workers = (0..num_workers).map(|i|
let workers = (0..num_workers).map(|i|
Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect();
let mut io = IoManager {
@@ -306,19 +307,32 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
/// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: SafeStringPanicHandler,
thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>,
}
impl<Message> MayPanic<String> for IoService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> {
self.panic_handler.on_panic(closure);
}
}
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, UtilError> {
let panic_handler = StringPanicHandler::new_thread_safe();
let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel();
let panic = panic_handler.clone();
let thread = thread::spawn(move || {
IoManager::<Message>::start(&mut event_loop).unwrap(); //TODO:
let mut panic = panic.lock().unwrap();
panic.catch_panic(move || {
IoManager::<Message>::start(&mut event_loop).unwrap();
}).unwrap()
});
Ok(IoService {
panic_handler: panic_handler,
thread: Some(thread),
host_channel: channel
})

View File

@@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
use io::service::{HandlerId, IoChannel, IoContext};
use io::{IoHandler};
use panics::*;
pub enum WorkType<Message> {
Readable,
@@ -43,32 +44,41 @@ pub struct Worker {
thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>,
deleting: Arc<AtomicBool>,
panic_handler: SafeStringPanicHandler,
}
impl Worker {
/// Creates a new worker instance.
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>) -> Worker
wait_mutex: Arc<Mutex<()>>) -> Worker
where Message: Send + Sync + Clone + 'static {
let panic_handler = StringPanicHandler::new_thread_safe();
let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker {
panic_handler: panic_handler.clone(),
thread: None,
wait: wait.clone(),
deleting: deleting.clone(),
};
let panic_handler = panic_handler.clone();
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
move || Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting))
move || {
let mut panic = panic_handler.lock().unwrap();
panic.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap()
})
.expect("Error creating worker thread"));
worker
}
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
channel: IoChannel<Message>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static {
while !deleting.load(AtomicOrdering::Relaxed) {
{
@@ -105,6 +115,12 @@ impl Worker {
}
}
impl MayPanic<String> for Worker {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> {
self.panic_handler.on_panic(closure);
}
}
impl Drop for Worker {
fn drop(&mut self) {
self.deleting.store(true, AtomicOrdering::Relaxed);

View File

@@ -21,7 +21,8 @@
#![feature(plugin)]
#![plugin(clippy)]
#![allow(needless_range_loop, match_bool)]
#![feature(std_panic, recover)]
#![feature(catch_panic)]
//! Ethcore-util library
//!
//! ### Rust version:

View File

@@ -17,10 +17,9 @@
//! Panic utilities
use std::thread;
use std::panic;
use std::sync::Mutex;
use std::any::Any;
use std::ops::DerefMut;
use std::any::Any;
use std::sync::{Arc, Mutex};
pub trait OnPanicListener<T>: Send + Sync + 'static {
fn call(&mut self, arg: &T);
@@ -33,26 +32,37 @@ impl<F, T> OnPanicListener<T> for F
}
}
pub trait ArgsConverter<T> {
pub trait ArgsConverter<T> : Send + Sync {
fn convert(&self, t: &Box<Any + Send>) -> Option<T>;
}
pub trait MayPanic<T> {
fn on_panic<F>(&mut self, closure: F)
fn on_panic<F>(&self, closure: F)
where F: OnPanicListener<T>;
}
pub trait PanicHandler<T, C: ArgsConverter<T>> : MayPanic<T>{
fn new(converter: C) -> Self;
fn with_converter(converter: C) -> Self;
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R>
where G: FnOnce() -> R + panic::RecoverSafe;
where G: FnOnce() -> R + Send + 'static;
fn notify_all(&mut self, &T);
}
pub type SafeStringPanicHandler = Arc<Mutex<StringPanicHandler>>;
impl MayPanic<String> for SafeStringPanicHandler {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> {
self.lock().unwrap().on_panic(closure);
}
}
pub struct StringConverter;
impl ArgsConverter<String> for StringConverter {
fn convert(&self, t: &Box<Any + Send>) -> Option<String> {
t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned())
let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned());
let as_string = t.downcast_ref::<String>().cloned();
as_str.or(as_string)
}
}
@@ -62,86 +72,152 @@ pub struct BasePanicHandler<T, C>
listeners: Mutex<Vec<Box<OnPanicListener<T>>>>
}
impl<T, C> BasePanicHandler<T, C>
where C: ArgsConverter<T>, T: 'static {
fn notify_all(&mut self, res: Option<T>) {
if let None = res {
return;
}
let r = res.unwrap();
let mut listeners = self.listeners.lock().unwrap();
for listener in listeners.deref_mut() {
listener.call(&r);
}
}
}
impl<T, C> PanicHandler<T, C> for BasePanicHandler<T, C>
where C: ArgsConverter<T>, T: 'static {
fn new(converter: C) -> Self {
fn with_converter(converter: C) -> Self {
BasePanicHandler {
converter: converter,
listeners: Mutex::new(vec![])
}
}
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R>
where G: FnOnce() -> R + panic::RecoverSafe {
let result = panic::recover(g);
#[allow(deprecated)]
// TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex)
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
let result = thread::catch_panic(g);
println!("After calling function");
if let Err(ref e) = result {
let res = self.converter.convert(e);
println!("Got error. Notifying");
self.notify_all(res);
if let Err(ref e) = result {
let res = self.converter.convert(e);
if let Some(r) = res {
self.notify_all(&r);
}
result
}
result
}
fn notify_all(&mut self, r: &T) {
let mut listeners = self.listeners.lock().unwrap();
for listener in listeners.deref_mut() {
listener.call(r);
}
}
}
impl<T, C> MayPanic<T> for BasePanicHandler<T, C>
where C: ArgsConverter<T>, T: 'static {
fn on_panic<F>(&mut self, closure: F)
fn on_panic<F>(&self, closure: F)
where F: OnPanicListener<T> {
self.listeners.lock().unwrap().push(Box::new(closure));
}
}
pub struct StringPanicHandler {
handler: BasePanicHandler<String, StringConverter>
}
impl StringPanicHandler {
pub fn new_thread_safe() -> SafeStringPanicHandler {
Arc::new(Mutex::new(Self::new()))
}
pub fn new () -> Self {
Self::with_converter(StringConverter)
}
}
impl PanicHandler<String, StringConverter> for StringPanicHandler {
fn with_converter(converter: StringConverter) -> Self {
StringPanicHandler {
handler: BasePanicHandler::with_converter(converter)
}
}
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
self.handler.catch_panic(g)
}
fn notify_all(&mut self, r: &String) {
self.handler.notify_all(r);
}
}
impl MayPanic<String> for StringPanicHandler {
fn on_panic<F>(&self, closure: F)
where F: OnPanicListener<String> {
self.handler.on_panic(closure)
}
}
#[test]
fn should_notify_listeners_about_panic () {
use std::sync::{Arc, RwLock};
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let mut p = BasePanicHandler::new(StringConverter);
let mut p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
// when
p.catch_panic(|| panic!("Panic!"));
p.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
}
#[test]
fn should_notify_listeners_about_panic_when_string_is_dynamic () {
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let mut p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
// when
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic: 1");
}
#[test]
fn should_notify_listeners_about_panic_in_other_thread () {
use std::thread;
use std::sync::{Arc, RwLock};
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let mut p = BasePanicHandler::new(StringConverter);
let mut p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
// when
let t = thread::spawn(move ||
p.catch_panic(|| panic!("Panic!"))
p.catch_panic(|| panic!("Panic!")).unwrap()
);
t.join();
t.join().unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
}
#[test]
fn should_forward_panics () {
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let mut p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
let mut p2 = StringPanicHandler::new();
p2.on_panic(move |t: &String| p.notify_all(t));
// when
p2.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");