Merge branch 'master' of github.com:ethcore/parity
This commit is contained in:
@@ -31,16 +31,16 @@
|
||||
//!
|
||||
//! impl IoHandler<MyMessage> for MyHandler {
|
||||
//! fn initialize(&self, io: &IoContext<MyMessage>) {
|
||||
//! io.register_timer(0, 1000).unwrap();
|
||||
//! }
|
||||
//! io.register_timer(0, 1000).unwrap();
|
||||
//! }
|
||||
//!
|
||||
//! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
|
||||
//! println!("Timeout {}", timer);
|
||||
//! }
|
||||
//! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
|
||||
//! println!("Timeout {}", timer);
|
||||
//! }
|
||||
//!
|
||||
//! fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
|
||||
//! println!("Message {}", message.data);
|
||||
//! }
|
||||
//! fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
|
||||
//! println!("Message {}", message.data);
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! fn main () {
|
||||
@@ -70,7 +70,7 @@ impl<Message> From<::mio::NotifyError<service::IoMessage<Message>>> for IoError
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic IO handler.
|
||||
/// Generic IO handler.
|
||||
/// All the handler function are called from within IO event loop.
|
||||
/// `Message` type is used as notification data
|
||||
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + 'static {
|
||||
@@ -82,7 +82,7 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + '
|
||||
fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
|
||||
/// Called when an IO stream gets closed
|
||||
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
|
||||
/// Called when an IO stream can be read from
|
||||
/// Called when an IO stream can be read from
|
||||
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
|
||||
/// Called when an IO stream can be written to
|
||||
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
|
||||
|
||||
@@ -25,6 +25,7 @@ use io::{IoError, IoHandler};
|
||||
use arrayvec::*;
|
||||
use crossbeam::sync::chase_lev;
|
||||
use io::worker::{Worker, Work, WorkType};
|
||||
use panics::*;
|
||||
|
||||
/// Timer ID
|
||||
pub type TimerToken = usize;
|
||||
@@ -159,13 +160,21 @@ pub struct IoManager<Message> where Message: Send + Sync {
|
||||
|
||||
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
|
||||
/// Creates a new instance and registers it with the event loop.
|
||||
pub fn start(event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
|
||||
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
|
||||
let (worker, stealer) = chase_lev::deque();
|
||||
let num_workers = 4;
|
||||
let work_ready_mutex = Arc::new(Mutex::new(()));
|
||||
let work_ready = Arc::new(Condvar::new());
|
||||
let workers = (0..num_workers).map(|i|
|
||||
Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect();
|
||||
let workers = (0..num_workers).map(|i|
|
||||
Worker::new(
|
||||
i,
|
||||
stealer.clone(),
|
||||
IoChannel::new(event_loop.channel()),
|
||||
work_ready.clone(),
|
||||
work_ready_mutex.clone(),
|
||||
panic_handler.clone()
|
||||
)
|
||||
).collect();
|
||||
|
||||
let mut io = IoManager {
|
||||
timers: Arc::new(RwLock::new(HashMap::new())),
|
||||
@@ -306,19 +315,32 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
|
||||
/// General IO Service. Starts an event loop and dispatches IO requests.
|
||||
/// 'Message' is a notification message type
|
||||
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
panic_handler: Arc<PanicHandler>,
|
||||
thread: Option<JoinHandle<()>>,
|
||||
host_channel: Sender<IoMessage<Message>>,
|
||||
}
|
||||
|
||||
impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
|
||||
self.panic_handler.on_panic(closure);
|
||||
}
|
||||
}
|
||||
|
||||
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
/// Starts IO event loop
|
||||
pub fn start() -> Result<IoService<Message>, UtilError> {
|
||||
let panic_handler = PanicHandler::new_in_arc();
|
||||
let mut event_loop = EventLoop::new().unwrap();
|
||||
let channel = event_loop.channel();
|
||||
let panic = panic_handler.clone();
|
||||
let thread = thread::spawn(move || {
|
||||
IoManager::<Message>::start(&mut event_loop).unwrap(); //TODO:
|
||||
let p = panic.clone();
|
||||
panic.catch_panic(move || {
|
||||
IoManager::<Message>::start(p, &mut event_loop).unwrap();
|
||||
}).unwrap()
|
||||
});
|
||||
Ok(IoService {
|
||||
panic_handler: panic_handler,
|
||||
thread: Some(thread),
|
||||
host_channel: channel
|
||||
})
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||
use crossbeam::sync::chase_lev;
|
||||
use io::service::{HandlerId, IoChannel, IoContext};
|
||||
use io::{IoHandler};
|
||||
use panics::*;
|
||||
|
||||
pub enum WorkType<Message> {
|
||||
Readable,
|
||||
@@ -47,12 +48,14 @@ pub struct Worker {
|
||||
|
||||
impl Worker {
|
||||
/// Creates a new worker instance.
|
||||
pub fn new<Message>(index: usize,
|
||||
stealer: chase_lev::Stealer<Work<Message>>,
|
||||
pub fn new<Message>(index: usize,
|
||||
stealer: chase_lev::Stealer<Work<Message>>,
|
||||
channel: IoChannel<Message>,
|
||||
wait: Arc<Condvar>,
|
||||
wait_mutex: Arc<Mutex<()>>) -> Worker
|
||||
where Message: Send + Sync + Clone + 'static {
|
||||
wait_mutex: Arc<Mutex<()>>,
|
||||
panic_handler: Arc<PanicHandler>
|
||||
) -> Worker
|
||||
where Message: Send + Sync + Clone + 'static {
|
||||
let deleting = Arc::new(AtomicBool::new(false));
|
||||
let mut worker = Worker {
|
||||
thread: None,
|
||||
@@ -60,15 +63,19 @@ impl Worker {
|
||||
deleting: deleting.clone(),
|
||||
};
|
||||
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
|
||||
move || Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting))
|
||||
move || {
|
||||
panic_handler.catch_panic(move || {
|
||||
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
|
||||
}).unwrap()
|
||||
})
|
||||
.expect("Error creating worker thread"));
|
||||
worker
|
||||
}
|
||||
|
||||
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
|
||||
channel: IoChannel<Message>, wait: Arc<Condvar>,
|
||||
wait_mutex: Arc<Mutex<()>>,
|
||||
deleting: Arc<AtomicBool>)
|
||||
channel: IoChannel<Message>, wait: Arc<Condvar>,
|
||||
wait_mutex: Arc<Mutex<()>>,
|
||||
deleting: Arc<AtomicBool>)
|
||||
where Message: Send + Sync + Clone + 'static {
|
||||
while !deleting.load(AtomicOrdering::Relaxed) {
|
||||
{
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
#![feature(plugin)]
|
||||
#![plugin(clippy)]
|
||||
#![allow(needless_range_loop, match_bool)]
|
||||
#![feature(catch_panic)]
|
||||
|
||||
//! Ethcore-util library
|
||||
//!
|
||||
//! ### Rust version:
|
||||
@@ -54,7 +56,7 @@
|
||||
//! cd parity
|
||||
//! cargo build --release
|
||||
//! ```
|
||||
//!
|
||||
//!
|
||||
//! - OSX:
|
||||
//!
|
||||
//! ```bash
|
||||
@@ -129,6 +131,7 @@ pub mod semantic_version;
|
||||
pub mod io;
|
||||
pub mod network;
|
||||
pub mod log;
|
||||
pub mod panics;
|
||||
|
||||
pub use common::*;
|
||||
pub use misc::*;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::sync::*;
|
||||
use error::*;
|
||||
use panics::*;
|
||||
use network::{NetworkProtocolHandler, NetworkConfiguration};
|
||||
use network::error::{NetworkError};
|
||||
use network::host::{Host, NetworkIoMessage, ProtocolId};
|
||||
@@ -27,13 +28,17 @@ use io::*;
|
||||
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
io_service: IoService<NetworkIoMessage<Message>>,
|
||||
host_info: String,
|
||||
stats: Arc<NetworkStats>
|
||||
stats: Arc<NetworkStats>,
|
||||
panic_handler: Arc<PanicHandler>
|
||||
}
|
||||
|
||||
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
/// Starts IO event loop
|
||||
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
|
||||
let panic_handler = PanicHandler::new_in_arc();
|
||||
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
|
||||
panic_handler.forward_from(&io_service);
|
||||
|
||||
let host = Arc::new(Host::new(config));
|
||||
let stats = host.stats().clone();
|
||||
let host_info = host.client_version();
|
||||
@@ -43,6 +48,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
|
||||
io_service: io_service,
|
||||
host_info: host_info,
|
||||
stats: stats,
|
||||
panic_handler: panic_handler
|
||||
})
|
||||
}
|
||||
|
||||
@@ -72,3 +78,9 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {
|
||||
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
|
||||
self.panic_handler.on_panic(closure);
|
||||
}
|
||||
}
|
||||
|
||||
183
util/src/panics.rs
Normal file
183
util/src/panics.rs
Normal file
@@ -0,0 +1,183 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Panic utilities
|
||||
|
||||
use std::thread;
|
||||
use std::ops::DerefMut;
|
||||
use std::any::Any;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// Thread-safe closure for handling possible panics
|
||||
pub trait OnPanicListener: Send + Sync + 'static {
|
||||
/// Invoke listener
|
||||
fn call(&mut self, arg: &str);
|
||||
}
|
||||
|
||||
/// Forwards panics from child
|
||||
pub trait ForwardPanic {
|
||||
/// Attach `on_panic` listener to `child` and rethrow all panics
|
||||
fn forward_from<S>(&self, child: &S) where S : MayPanic;
|
||||
}
|
||||
|
||||
/// Trait indicating that the structure catches some of the panics (most probably from spawned threads)
|
||||
/// and it's possbile to be notified when one of the threads panics.
|
||||
pub trait MayPanic {
|
||||
/// `closure` will be invoked whenever panic in thread is caught
|
||||
fn on_panic<F>(&self, closure: F) where F: OnPanicListener;
|
||||
}
|
||||
|
||||
/// Structure that allows to catch panics and notify listeners
|
||||
pub struct PanicHandler {
|
||||
listeners: Mutex<Vec<Box<OnPanicListener>>>
|
||||
}
|
||||
|
||||
impl PanicHandler {
|
||||
/// Creates new `PanicHandler` wrapped in `Arc`
|
||||
pub fn new_in_arc() -> Arc<PanicHandler> {
|
||||
Arc::new(Self::new())
|
||||
}
|
||||
|
||||
/// Creates new `PanicHandler`
|
||||
pub fn new() -> PanicHandler {
|
||||
PanicHandler {
|
||||
listeners: Mutex::new(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
/// Invoke closure and catch any possible panics.
|
||||
/// In case of panic notifies all listeners about it.
|
||||
#[allow(deprecated)]
|
||||
// TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex)
|
||||
pub fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
|
||||
let result = thread::catch_panic(g);
|
||||
|
||||
if let Err(ref e) = result {
|
||||
let res = convert_to_string(e);
|
||||
if let Some(r) = res {
|
||||
self.notify_all(r);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn notify_all(&self, r: String) {
|
||||
let mut listeners = self.listeners.lock().unwrap();
|
||||
for listener in listeners.deref_mut() {
|
||||
listener.call(&r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MayPanic for PanicHandler {
|
||||
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
|
||||
self.listeners.lock().unwrap().push(Box::new(closure));
|
||||
}
|
||||
}
|
||||
|
||||
impl ForwardPanic for Arc<PanicHandler> {
|
||||
fn forward_from<S>(&self, child: &S) where S : MayPanic {
|
||||
let p = self.clone();
|
||||
child.on_panic(move |t| p.notify_all(t));
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> OnPanicListener for F
|
||||
where F: FnMut(String) + Send + Sync + 'static {
|
||||
fn call(&mut self, arg: &str) {
|
||||
self(arg.to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_to_string(t: &Box<Any + Send>) -> Option<String> {
|
||||
let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned());
|
||||
let as_string = t.downcast_ref::<String>().cloned();
|
||||
|
||||
as_str.or(as_string)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_notify_listeners_about_panic () {
|
||||
use std::sync::RwLock;
|
||||
// given
|
||||
let invocations = Arc::new(RwLock::new(vec![]));
|
||||
let i = invocations.clone();
|
||||
let p = PanicHandler::new();
|
||||
p.on_panic(move |t| i.write().unwrap().push(t));
|
||||
|
||||
// when
|
||||
p.catch_panic(|| panic!("Panic!")).unwrap_err();
|
||||
|
||||
// then
|
||||
assert!(invocations.read().unwrap()[0] == "Panic!");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_notify_listeners_about_panic_when_string_is_dynamic () {
|
||||
use std::sync::RwLock;
|
||||
// given
|
||||
let invocations = Arc::new(RwLock::new(vec![]));
|
||||
let i = invocations.clone();
|
||||
let p = PanicHandler::new();
|
||||
p.on_panic(move |t| i.write().unwrap().push(t));
|
||||
|
||||
// when
|
||||
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
|
||||
|
||||
// then
|
||||
assert!(invocations.read().unwrap()[0] == "Panic: 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_notify_listeners_about_panic_in_other_thread () {
|
||||
use std::thread;
|
||||
use std::sync::RwLock;
|
||||
|
||||
// given
|
||||
let invocations = Arc::new(RwLock::new(vec![]));
|
||||
let i = invocations.clone();
|
||||
let p = PanicHandler::new();
|
||||
p.on_panic(move |t| i.write().unwrap().push(t));
|
||||
|
||||
// when
|
||||
let t = thread::spawn(move ||
|
||||
p.catch_panic(|| panic!("Panic!")).unwrap()
|
||||
);
|
||||
t.join().unwrap_err();
|
||||
|
||||
// then
|
||||
assert!(invocations.read().unwrap()[0] == "Panic!");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_forward_panics () {
|
||||
use std::sync::RwLock;
|
||||
// given
|
||||
let invocations = Arc::new(RwLock::new(vec![]));
|
||||
let i = invocations.clone();
|
||||
let p = PanicHandler::new_in_arc();
|
||||
p.on_panic(move |t| i.write().unwrap().push(t));
|
||||
|
||||
let p2 = PanicHandler::new();
|
||||
p.forward_from(&p2);
|
||||
|
||||
// when
|
||||
p2.catch_panic(|| panic!("Panic!")).unwrap_err();
|
||||
|
||||
// then
|
||||
assert!(invocations.read().unwrap()[0] == "Panic!");
|
||||
}
|
||||
Reference in New Issue
Block a user