Forwarding panics from threads

This commit is contained in:
Tomusdrw 2016-02-10 16:35:52 +01:00
parent 7925642b1b
commit 96dda7b73a
8 changed files with 69 additions and 34 deletions

View File

@ -115,7 +115,7 @@ impl BlockQueue {
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let empty = Arc::new(Condvar::new()); let empty = Arc::new(Condvar::new());
let panic_handler = PanicHandler::new_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;

View File

@ -211,9 +211,8 @@ impl Client {
} }
let block_queue = BlockQueue::new(engine.clone(), message_channel); let block_queue = BlockQueue::new(engine.clone(), message_channel);
let panic_handler = PanicHandler::new_arc(); let panic_handler = PanicHandler::new_in_arc();
let panic = panic_handler.clone(); panic_handler.forward_from(&block_queue);
block_queue.on_panic(move |t| panic.notify_all(t));
Ok(Arc::new(Client { Ok(Arc::new(Client {
chain: chain, chain: chain,

View File

@ -17,6 +17,7 @@
//! Creates and registers client and network services. //! Creates and registers client and network services.
use util::*; use util::*;
use util::panics::*;
use spec::Spec; use spec::Spec;
use error::*; use error::*;
use std::env; use std::env;
@ -38,17 +39,22 @@ pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
pub struct ClientService { pub struct ClientService {
net_service: NetworkService<SyncMessage>, net_service: NetworkService<SyncMessage>,
client: Arc<Client>, client: Arc<Client>,
panic_handler: Arc<PanicHandler>
} }
impl ClientService { impl ClientService {
/// Start the service in a separate thread. /// Start the service in a separate thread.
pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result<ClientService, Error> { pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result<ClientService, Error> {
let panic_handler = PanicHandler::new_in_arc();
let mut net_service = try!(NetworkService::start(net_config)); let mut net_service = try!(NetworkService::start(net_config));
panic_handler.forward_from(&net_service);
info!("Starting {}", net_service.host_info()); info!("Starting {}", net_service.host_info());
info!("Configured for {} using {} engine", spec.name, spec.engine_name); info!("Configured for {} using {} engine", spec.name, spec.engine_name);
let mut dir = env::home_dir().unwrap(); let mut dir = env::home_dir().unwrap();
dir.push(".parity"); dir.push(".parity");
let client = try!(Client::new(spec, &dir, net_service.io().channel())); let client = try!(Client::new(spec, &dir, net_service.io().channel()));
panic_handler.forward_from(client.deref());
let client_io = Arc::new(ClientIoHandler { let client_io = Arc::new(ClientIoHandler {
client: client.clone() client: client.clone()
}); });
@ -57,6 +63,7 @@ impl ClientService {
Ok(ClientService { Ok(ClientService {
net_service: net_service, net_service: net_service,
client: client, client: client,
panic_handler: panic_handler,
}) })
} }
@ -81,6 +88,12 @@ impl ClientService {
} }
} }
impl MayPanic for ClientService {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}
/// IO interface for the Client handler /// IO interface for the Client handler
struct ClientIoHandler { struct ClientIoHandler {
client: Arc<Client> client: Arc<Client>

View File

@ -163,13 +163,13 @@ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\
} }
} }
fn wait_for_exit(client: Arc<Client>) { fn wait_for_exit(client_service: &ClientService) {
let exit = Arc::new(Condvar::new()); let exit = Arc::new(Condvar::new());
// Handle possible exits // Handle possible exits
let e = exit.clone(); let e = exit.clone();
CtrlC::set_handler(move || { e.notify_all(); }); CtrlC::set_handler(move || { e.notify_all(); });
let e = exit.clone(); let e = exit.clone();
client.on_panic(move |_reason| { e.notify_all(); }); client_service.on_panic(move |_reason| { e.notify_all(); });
// Wait for signal // Wait for signal
let mutex = Mutex::new(()); let mutex = Mutex::new(());
let _ = exit.wait(mutex.lock().unwrap()).unwrap(); let _ = exit.wait(mutex.lock().unwrap()).unwrap();
@ -219,7 +219,7 @@ fn main() {
service.io().register_handler(io_handler).expect("Error registering IO handler"); service.io().register_handler(io_handler).expect("Error registering IO handler");
// Handle exit // Handle exit
wait_for_exit(service.client()); wait_for_exit(&service);
} }
struct Informant { struct Informant {

View File

@ -160,13 +160,21 @@ pub struct IoManager<Message> where Message: Send + Sync {
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
/// Creates a new instance and registers it with the event loop. /// Creates a new instance and registers it with the event loop.
pub fn start(event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> { pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
let (worker, stealer) = chase_lev::deque(); let (worker, stealer) = chase_lev::deque();
let num_workers = 4; let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new()); let work_ready = Arc::new(Condvar::new());
let workers = (0..num_workers).map(|i| let workers = (0..num_workers).map(|i|
Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); Worker::new(
i,
stealer.clone(),
IoChannel::new(event_loop.channel()),
work_ready.clone(),
work_ready_mutex.clone(),
panic_handler.clone()
)
).collect();
let mut io = IoManager { let mut io = IoManager {
timers: Arc::new(RwLock::new(HashMap::new())), timers: Arc::new(RwLock::new(HashMap::new())),
@ -321,13 +329,14 @@ impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop /// Starts IO event loop
pub fn start() -> Result<IoService<Message>, UtilError> { pub fn start() -> Result<IoService<Message>, UtilError> {
let panic_handler = PanicHandler::new_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel(); let channel = event_loop.channel();
let panic = panic_handler.clone(); let panic = panic_handler.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let p = panic.clone();
panic.catch_panic(move || { panic.catch_panic(move || {
IoManager::<Message>::start(&mut event_loop).unwrap(); IoManager::<Message>::start(p, &mut event_loop).unwrap();
}).unwrap() }).unwrap()
}); });
Ok(IoService { Ok(IoService {

View File

@ -44,7 +44,6 @@ pub struct Worker {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>, wait: Arc<Condvar>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
panic_handler: Arc<PanicHandler>,
} }
impl Worker { impl Worker {
@ -53,17 +52,16 @@ impl Worker {
stealer: chase_lev::Stealer<Work<Message>>, stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, channel: IoChannel<Message>,
wait: Arc<Condvar>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>) -> Worker wait_mutex: Arc<Mutex<()>>,
panic_handler: Arc<PanicHandler>
) -> Worker
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
let panic_handler = PanicHandler::new_arc();
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker { let mut worker = Worker {
panic_handler: panic_handler.clone(),
thread: None, thread: None,
wait: wait.clone(), wait: wait.clone(),
deleting: deleting.clone(), deleting: deleting.clone(),
}; };
let panic_handler = panic_handler.clone();
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
move || { move || {
panic_handler.catch_panic(move || { panic_handler.catch_panic(move || {
@ -114,12 +112,6 @@ impl Worker {
} }
} }
impl MayPanic for Worker {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}
impl Drop for Worker { impl Drop for Worker {
fn drop(&mut self) { fn drop(&mut self) {
self.deleting.store(true, AtomicOrdering::Relaxed); self.deleting.store(true, AtomicOrdering::Relaxed);

View File

@ -16,6 +16,7 @@
use std::sync::*; use std::sync::*;
use error::*; use error::*;
use panics::*;
use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::{NetworkError}; use network::error::{NetworkError};
use network::host::{Host, NetworkIoMessage, ProtocolId}; use network::host::{Host, NetworkIoMessage, ProtocolId};
@ -27,13 +28,17 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static { pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>, io_service: IoService<NetworkIoMessage<Message>>,
host_info: String, host_info: String,
stats: Arc<NetworkStats> stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler>
} }
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop /// Starts IO event loop
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> { pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start()); let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service);
let host = Arc::new(Host::new(config)); let host = Arc::new(Host::new(config));
let stats = host.stats().clone(); let stats = host.stats().clone();
let host_info = host.client_version(); let host_info = host.client_version();
@ -43,6 +48,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
io_service: io_service, io_service: io_service,
host_info: host_info, host_info: host_info,
stats: stats, stats: stats,
panic_handler: panic_handler
}) })
} }
@ -72,3 +78,9 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
} }
} }
impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}

View File

@ -27,12 +27,17 @@ pub trait OnPanicListener: Send + Sync + 'static {
fn call(&mut self, arg: &str); fn call(&mut self, arg: &str);
} }
/// Forwards panics from child
pub trait ForwardPanic {
/// Attach `on_panic` listener to `child` and rethrow all panics
fn forward_from<S>(&self, child: &S) where S : MayPanic;
}
/// Trait indicating that the structure catches some of the panics (most probably from spawned threads) /// Trait indicating that the structure catches some of the panics (most probably from spawned threads)
/// and it's possbile to be notified when one of the threads panics. /// and it's possbile to be notified when one of the threads panics.
pub trait MayPanic { pub trait MayPanic {
/// `closure` will be invoked whenever panic in thread is caught /// `closure` will be invoked whenever panic in thread is caught
fn on_panic<F>(&self, closure: F) fn on_panic<F>(&self, closure: F) where F: OnPanicListener;
where F: OnPanicListener;
} }
/// Structure that allows to catch panics and notify listeners /// Structure that allows to catch panics and notify listeners
@ -42,7 +47,7 @@ pub struct PanicHandler {
impl PanicHandler { impl PanicHandler {
/// Creates new `PanicHandler` wrapped in `Arc` /// Creates new `PanicHandler` wrapped in `Arc`
pub fn new_arc() -> Arc<PanicHandler> { pub fn new_in_arc() -> Arc<PanicHandler> {
Arc::new(Self::new()) Arc::new(Self::new())
} }
@ -70,8 +75,7 @@ impl PanicHandler {
result result
} }
/// Notify listeners about panic fn notify_all(&self, r: String) {
pub fn notify_all(&self, r: String) {
let mut listeners = self.listeners.lock().unwrap(); let mut listeners = self.listeners.lock().unwrap();
for listener in listeners.deref_mut() { for listener in listeners.deref_mut() {
listener.call(&r); listener.call(&r);
@ -80,12 +84,18 @@ impl PanicHandler {
} }
impl MayPanic for PanicHandler { impl MayPanic for PanicHandler {
fn on_panic<F>(&self, closure: F) fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
where F: OnPanicListener {
self.listeners.lock().unwrap().push(Box::new(closure)); self.listeners.lock().unwrap().push(Box::new(closure));
} }
} }
impl ForwardPanic for Arc<PanicHandler> {
fn forward_from<S>(&self, child: &S) where S : MayPanic {
let p = self.clone();
child.on_panic(move |t| p.notify_all(t));
}
}
impl<F> OnPanicListener for F impl<F> OnPanicListener for F
where F: FnMut(String) + Send + Sync + 'static { where F: FnMut(String) + Send + Sync + 'static {
fn call(&mut self, arg: &str) { fn call(&mut self, arg: &str) {
@ -159,11 +169,11 @@ use std::sync::RwLock;
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let p = PanicHandler::new(); let p = PanicHandler::new_in_arc();
p.on_panic(move |t| i.write().unwrap().push(t)); p.on_panic(move |t| i.write().unwrap().push(t));
let p2 = PanicHandler::new(); let p2 = PanicHandler::new();
p2.on_panic(move |t| p.notify_all(t)); p.forward_from(&p2);
// when // when
p2.catch_panic(|| panic!("Panic!")).unwrap_err(); p2.catch_panic(|| panic!("Panic!")).unwrap_err();