From 0757ac1493c10f10b10296c364002324f33af79d Mon Sep 17 00:00:00 2001 From: Tomusdrw Date: Tue, 9 Feb 2016 16:47:21 +0100 Subject: [PATCH 1/6] PanicHandler - work in progress --- util/src/lib.rs | 4 +- util/src/panics.rs | 148 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 util/src/panics.rs diff --git a/util/src/lib.rs b/util/src/lib.rs index 16a25f538..b48352582 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -21,6 +21,7 @@ #![feature(plugin)] #![plugin(clippy)] #![allow(needless_range_loop, match_bool)] +#![feature(std_panic, recover)] //! Ethcore-util library //! //! ### Rust version: @@ -54,7 +55,7 @@ //! cd parity //! cargo build --release //! ``` -//! +//! //! - OSX: //! //! ```bash @@ -129,6 +130,7 @@ pub mod semantic_version; pub mod io; pub mod network; pub mod log; +pub mod panics; pub use common::*; pub use misc::*; diff --git a/util/src/panics.rs b/util/src/panics.rs new file mode 100644 index 000000000..4e1365636 --- /dev/null +++ b/util/src/panics.rs @@ -0,0 +1,148 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Panic utilities + +use std::thread; +use std::panic; +use std::sync::Mutex; +use std::any::Any; +use std::ops::DerefMut; + +pub trait OnPanicListener: Send + Sync + 'static { + fn call(&mut self, arg: &T); +} + +impl OnPanicListener for F + where F: FnMut(&T) + Send + Sync + 'static { + fn call(&mut self, arg: &T) { + self(arg) + } +} + +pub trait ArgsConverter { + fn convert(&self, t: &Box) -> Option; +} + +pub trait MayPanic { + fn on_panic(&mut self, closure: F) + where F: OnPanicListener; +} + +pub trait PanicHandler> : MayPanic{ + fn new(converter: C) -> Self; + fn catch_panic(&mut self, g: G) -> thread::Result + where G: FnOnce() -> R + panic::RecoverSafe; +} + + +pub struct StringConverter; +impl ArgsConverter for StringConverter { + fn convert(&self, t: &Box) -> Option { + t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()) + } +} + +pub struct BasePanicHandler + where C: ArgsConverter, T: 'static { + converter: C, + listeners: Mutex>>> +} + +impl BasePanicHandler + where C: ArgsConverter, T: 'static { + fn notify_all(&mut self, res: Option) { + if let None = res { + return; + } + let r = res.unwrap(); + let mut listeners = self.listeners.lock().unwrap(); + for listener in listeners.deref_mut() { + listener.call(&r); + } + } +} + +impl PanicHandler for BasePanicHandler + where C: ArgsConverter, T: 'static { + + fn new(converter: C) -> Self { + BasePanicHandler { + converter: converter, + listeners: Mutex::new(vec![]) + } + } + + fn catch_panic(&mut self, g: G) -> thread::Result + where G: FnOnce() -> R + panic::RecoverSafe { + let result = panic::recover(g); + + println!("After calling function"); + if let Err(ref e) = result { + let res = self.converter.convert(e); + println!("Got error. Notifying"); + self.notify_all(res); + } + + result + } +} + +impl MayPanic for BasePanicHandler + where C: ArgsConverter, T: 'static { + fn on_panic(&mut self, closure: F) + where F: OnPanicListener { + self.listeners.lock().unwrap().push(Box::new(closure)); + } +} + +#[test] +fn should_notify_listeners_about_panic () { + use std::sync::{Arc, RwLock}; + + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let mut p = BasePanicHandler::new(StringConverter); + p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + + // when + p.catch_panic(|| panic!("Panic!")); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +} + +#[test] +fn should_notify_listeners_about_panic_in_other_thread () { + use std::thread; + use std::sync::{Arc, RwLock}; + + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let mut p = BasePanicHandler::new(StringConverter); + p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + + // when + let t = thread::spawn(move || + p.catch_panic(|| panic!("Panic!")) + ); + t.join(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +} From 2a498fc3ebecc359747341e1b537732fe30377ac Mon Sep 17 00:00:00 2001 From: Tomusdrw Date: Wed, 10 Feb 2016 12:50:27 +0100 Subject: [PATCH 2/6] Implementing PanicHandlers for all places when new thread is spawned. Handling Client panics --- ethcore/src/block_queue.rs | 26 +++++- ethcore/src/client.rs | 31 +++++-- parity/main.rs | 116 +++++++++++++++++++-------- util/src/io/mod.rs | 20 ++--- util/src/io/service.rs | 18 ++++- util/src/io/worker.rs | 30 +++++-- util/src/lib.rs | 3 +- util/src/panics.rs | 160 +++++++++++++++++++++++++++---------- 8 files changed, 299 insertions(+), 105 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index fb735c973..59de4403b 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -26,6 +26,7 @@ use views::*; use header::*; use service::*; use client::BlockStatus; +use util::panics::*; /// Block queue status #[derive(Debug)] @@ -59,6 +60,7 @@ impl BlockQueueInfo { /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { + panic_handler: SafeStringPanicHandler, engine: Arc>, more_to_verify: Arc, verification: Arc>, @@ -113,6 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); + let panic_handler = StringPanicHandler::new_thread_safe(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; @@ -123,11 +126,22 @@ impl BlockQueue { let ready_signal = ready_signal.clone(); let empty = empty.clone(); let deleting = deleting.clone(); - verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)) - .expect("Error starting block verification thread")); + let panic_handler = panic_handler.clone(); + verifiers.push( + thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + let mut panic = panic_handler.lock().unwrap(); + panic.catch_panic(move || { + BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + }).unwrap() + }) + .expect("Error starting block verification thread") + ); } BlockQueue { engine: engine, + panic_handler: panic_handler, ready_signal: ready_signal.clone(), more_to_verify: more_to_verify.clone(), verification: verification.clone(), @@ -150,7 +164,7 @@ impl BlockQueue { while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { lock = wait.wait(lock).unwrap(); } - + if deleting.load(AtomicOrdering::Relaxed) { return; } @@ -324,6 +338,12 @@ impl BlockQueue { } } +impl MayPanic for BlockQueue { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl Drop for BlockQueue { fn drop(&mut self) { self.clear(); diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 46b53e7b9..d657dce6b 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -17,6 +17,7 @@ //! Blockchain database client. use util::*; +use util::panics::*; use rocksdb::{Options, DB, DBCompactionStyle}; use blockchain::{BlockChain, BlockProvider, CacheSize}; use views::BlockView; @@ -157,7 +158,8 @@ pub struct Client { state_db: Mutex, block_queue: RwLock, report: RwLock, - import_lock: Mutex<()> + import_lock: Mutex<()>, + panic_handler: SafeStringPanicHandler, } const HISTORY: u64 = 1000; @@ -198,19 +200,26 @@ impl Client { let mut state_path = path.to_path_buf(); state_path.push("state"); let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); - + let engine = Arc::new(try!(spec.to_engine())); let mut state_db = JournalDB::new_with_arc(db.clone()); if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) { state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } + + let block_queue = BlockQueue::new(engine.clone(), message_channel); + let panic_handler = StringPanicHandler::new_thread_safe(); + let panic = panic_handler.clone(); + block_queue.on_panic(move |t: &String| panic.lock().unwrap().notify_all(t)); + Ok(Arc::new(Client { chain: chain, - engine: engine.clone(), + engine: engine, state_db: Mutex::new(state_db), - block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), + block_queue: RwLock::new(block_queue), report: RwLock::new(Default::default()), import_lock: Mutex::new(()), + panic_handler: panic_handler })) } @@ -348,12 +357,12 @@ impl BlockChainClient for Client { fn block_status(&self, hash: &H256) -> BlockStatus { if self.chain.read().unwrap().is_known(&hash) { - BlockStatus::InChain - } else { - self.block_queue.read().unwrap().block_status(hash) + BlockStatus::InChain + } else { + self.block_queue.read().unwrap().block_status(hash) } } - + fn block_total_difficulty(&self, hash: &H256) -> Option { self.chain.read().unwrap().block_details(hash).map(|d| d.total_difficulty) } @@ -423,3 +432,9 @@ impl BlockChainClient for Client { } } } + +impl MayPanic for Client { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} diff --git a/parity/main.rs b/parity/main.rs index d423caa64..033621362 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -39,9 +39,11 @@ use rlog::{LogLevelFilter}; use env_logger::LogBuilder; use ctrlc::CtrlC; use util::*; +use util::panics::MayPanic; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; +use ethcore::spec; use ethcore::blockchain::CacheSize; use ethsync::EthSync; @@ -84,7 +86,7 @@ fn setup_log(init: &str) { #[cfg(feature = "rpc")] fn setup_rpc_server(client: Arc, sync: Arc, url: &str) { use rpc::v1::*; - + let mut server = rpc::HttpServer::new(1); server.add_delegate(Web3Client::new().to_delegate()); server.add_delegate(EthClient::new(client.clone()).to_delegate()); @@ -97,46 +99,96 @@ fn setup_rpc_server(client: Arc, sync: Arc, url: &str) { fn setup_rpc_server(_client: Arc, _sync: Arc, _url: &str) { } -fn main() { - let args: Args = Args::docopt().decode().unwrap_or_else(|e| e.exit()); - - setup_log(&args.flag_logging); - unsafe { ::fdlimit::raise_fd_limit(); } - - let spec = ethereum::new_frontier(); - let init_nodes = match args.arg_enode.len() { - 0 => spec.nodes().clone(), - _ => args.arg_enode.clone(), - }; - let mut net_settings = NetworkConfiguration::new(); - net_settings.boot_nodes = init_nodes; - match args.flag_address { - None => { - net_settings.listen_address = SocketAddr::from_str(args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"); - net_settings.public_address = SocketAddr::from_str(args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); - } - Some(ref a) => { - net_settings.public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"); - net_settings.listen_address = net_settings.public_address.clone(); +struct Configuration { + args: Args +} +impl Configuration { + fn parse() -> Self { + Configuration { + args: Args::docopt().decode().unwrap_or_else(|e| e.exit()) } } - let mut service = ClientService::start(spec, net_settings).unwrap(); - let client = service.client().clone(); - client.configure_cache(args.flag_cache_pref_size, args.flag_cache_max_size); - let sync = EthSync::register(service.network(), client); - if args.flag_jsonrpc { - setup_rpc_server(service.client(), sync.clone(), &args.flag_jsonrpc_url); - } - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync }); - service.io().register_handler(io_handler).expect("Error registering IO handler"); + fn get_init_nodes(&self, spec: &spec::Spec) -> Vec { + match self.args.arg_enode.len() { + 0 => spec.nodes().clone(), + _ => self.args.arg_enode.clone(), + } + } + + fn get_net_addresses(&self) -> (SocketAddr, SocketAddr) { + let listen_address; + let public_address; + + match self.args.flag_address { + None => { + listen_address = SocketAddr::from_str(self.args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"); + public_address = SocketAddr::from_str(self.args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); + } + Some(ref a) => { + public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"); + listen_address = public_address.clone(); + } + }; + + (listen_address, public_address) + } +} + +fn wait_for_exit(client: Arc) { let exit = Arc::new(Condvar::new()); + // Handle possible exits let e = exit.clone(); CtrlC::set_handler(move || { e.notify_all(); }); + let e = exit.clone(); + client.on_panic(move |_t: &String| { e.notify_all(); }); + // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); } +fn main() { + let conf = Configuration::parse(); + let spec = ethereum::new_frontier(); + + // Setup logging + setup_log(&conf.args.flag_logging); + // Raise fdlimit + unsafe { ::fdlimit::raise_fd_limit(); } + + // Configure network + let init_nodes = conf.get_init_nodes(&spec); + let (listen, public) = conf.get_net_addresses(); + let mut net_settings = NetworkConfiguration::new(); + net_settings.boot_nodes = init_nodes; + net_settings.listen_address = listen; + net_settings.public_address = public; + + // Build client + let mut service = ClientService::start(spec, net_settings).unwrap(); + let client = service.client().clone(); + client.configure_cache(conf.args.flag_cache_pref_size, conf.args.flag_cache_max_size); + + // Sync + let sync = EthSync::register(service.network(), client); + + // Setup rpc + if conf.args.flag_jsonrpc { + setup_rpc_server(service.client(), sync.clone(), &conf.args.flag_jsonrpc_url); + } + + // Register IO handler + let io_handler = Arc::new(ClientIoHandler { + client: service.client(), + info: Default::default(), + sync: sync + }); + service.io().register_handler(io_handler).expect("Error registering IO handler"); + + // Handle exit + wait_for_exit(service.client()); +} + struct Informant { chain_info: RwLock>, cache_info: RwLock>, @@ -200,7 +252,7 @@ struct ClientIoHandler { } impl IoHandler for ClientIoHandler { - fn initialize(&self, io: &IoContext) { + fn initialize(&self, io: &IoContext) { io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); } diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 4f16efd30..40cdbc368 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -31,16 +31,16 @@ //! //! impl IoHandler for MyHandler { //! fn initialize(&self, io: &IoContext) { -//! io.register_timer(0, 1000).unwrap(); -//! } +//! io.register_timer(0, 1000).unwrap(); +//! } //! -//! fn timeout(&self, _io: &IoContext, timer: TimerToken) { -//! println!("Timeout {}", timer); -//! } +//! fn timeout(&self, _io: &IoContext, timer: TimerToken) { +//! println!("Timeout {}", timer); +//! } //! -//! fn message(&self, _io: &IoContext, message: &MyMessage) { -//! println!("Message {}", message.data); -//! } +//! fn message(&self, _io: &IoContext, message: &MyMessage) { +//! println!("Message {}", message.data); +//! } //! } //! //! fn main () { @@ -70,7 +70,7 @@ impl From<::mio::NotifyError>> for IoError } } -/// Generic IO handler. +/// Generic IO handler. /// All the handler function are called from within IO event loop. /// `Message` type is used as notification data pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + 'static { @@ -82,7 +82,7 @@ pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + ' fn message(&self, _io: &IoContext, _message: &Message) {} /// Called when an IO stream gets closed fn stream_hup(&self, _io: &IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be read from + /// Called when an IO stream can be read from fn stream_readable(&self, _io: &IoContext, _stream: StreamToken) {} /// Called when an IO stream can be written to fn stream_writable(&self, _io: &IoContext, _stream: StreamToken) {} diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 71b2520ed..1f4eeea09 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -25,6 +25,7 @@ use io::{IoError, IoHandler}; use arrayvec::*; use crossbeam::sync::chase_lev; use io::worker::{Worker, Work, WorkType}; +use panics::*; /// Timer ID pub type TimerToken = usize; @@ -164,7 +165,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { let num_workers = 4; let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready = Arc::new(Condvar::new()); - let workers = (0..num_workers).map(|i| + let workers = (0..num_workers).map(|i| Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); let mut io = IoManager { @@ -306,19 +307,32 @@ impl IoChannel where Message: Send + Clone { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + Sync + Clone + 'static { + panic_handler: SafeStringPanicHandler, thread: Option>, host_channel: Sender>, } +impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { + let panic_handler = StringPanicHandler::new_thread_safe(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); + let panic = panic_handler.clone(); let thread = thread::spawn(move || { - IoManager::::start(&mut event_loop).unwrap(); //TODO: + let mut panic = panic.lock().unwrap(); + panic.catch_panic(move || { + IoManager::::start(&mut event_loop).unwrap(); + }).unwrap() }); Ok(IoService { + panic_handler: panic_handler, thread: Some(thread), host_channel: channel }) diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index fa8a0fa2c..84979140b 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use crossbeam::sync::chase_lev; use io::service::{HandlerId, IoChannel, IoContext}; use io::{IoHandler}; +use panics::*; pub enum WorkType { Readable, @@ -43,32 +44,41 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, + panic_handler: SafeStringPanicHandler, } impl Worker { /// Creates a new worker instance. - pub fn new(index: usize, - stealer: chase_lev::Stealer>, + pub fn new(index: usize, + stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, - wait_mutex: Arc>) -> Worker + wait_mutex: Arc>) -> Worker where Message: Send + Sync + Clone + 'static { + let panic_handler = StringPanicHandler::new_thread_safe(); let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { + panic_handler: panic_handler.clone(), thread: None, wait: wait.clone(), deleting: deleting.clone(), }; + let panic_handler = panic_handler.clone(); worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( - move || Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)) + move || { + let mut panic = panic_handler.lock().unwrap(); + panic.catch_panic(move || { + Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) + }).unwrap() + }) .expect("Error creating worker thread")); worker } fn work_loop(stealer: chase_lev::Stealer>, - channel: IoChannel, wait: Arc, - wait_mutex: Arc>, - deleting: Arc) + channel: IoChannel, wait: Arc, + wait_mutex: Arc>, + deleting: Arc) where Message: Send + Sync + Clone + 'static { while !deleting.load(AtomicOrdering::Relaxed) { { @@ -105,6 +115,12 @@ impl Worker { } } +impl MayPanic for Worker { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl Drop for Worker { fn drop(&mut self) { self.deleting.store(true, AtomicOrdering::Relaxed); diff --git a/util/src/lib.rs b/util/src/lib.rs index b48352582..260ef4301 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -21,7 +21,8 @@ #![feature(plugin)] #![plugin(clippy)] #![allow(needless_range_loop, match_bool)] -#![feature(std_panic, recover)] +#![feature(catch_panic)] + //! Ethcore-util library //! //! ### Rust version: diff --git a/util/src/panics.rs b/util/src/panics.rs index 4e1365636..dee5f3076 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -17,10 +17,9 @@ //! Panic utilities use std::thread; -use std::panic; -use std::sync::Mutex; -use std::any::Any; use std::ops::DerefMut; +use std::any::Any; +use std::sync::{Arc, Mutex}; pub trait OnPanicListener: Send + Sync + 'static { fn call(&mut self, arg: &T); @@ -33,26 +32,37 @@ impl OnPanicListener for F } } -pub trait ArgsConverter { +pub trait ArgsConverter : Send + Sync { fn convert(&self, t: &Box) -> Option; } pub trait MayPanic { - fn on_panic(&mut self, closure: F) + fn on_panic(&self, closure: F) where F: OnPanicListener; } pub trait PanicHandler> : MayPanic{ - fn new(converter: C) -> Self; + fn with_converter(converter: C) -> Self; fn catch_panic(&mut self, g: G) -> thread::Result - where G: FnOnce() -> R + panic::RecoverSafe; + where G: FnOnce() -> R + Send + 'static; + fn notify_all(&mut self, &T); } +pub type SafeStringPanicHandler = Arc>; + +impl MayPanic for SafeStringPanicHandler { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.lock().unwrap().on_panic(closure); + } +} pub struct StringConverter; impl ArgsConverter for StringConverter { fn convert(&self, t: &Box) -> Option { - t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()) + let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()); + let as_string = t.downcast_ref::().cloned(); + + as_str.or(as_string) } } @@ -62,86 +72,152 @@ pub struct BasePanicHandler listeners: Mutex>>> } -impl BasePanicHandler - where C: ArgsConverter, T: 'static { - fn notify_all(&mut self, res: Option) { - if let None = res { - return; - } - let r = res.unwrap(); - let mut listeners = self.listeners.lock().unwrap(); - for listener in listeners.deref_mut() { - listener.call(&r); - } - } -} - impl PanicHandler for BasePanicHandler where C: ArgsConverter, T: 'static { - fn new(converter: C) -> Self { + fn with_converter(converter: C) -> Self { BasePanicHandler { converter: converter, listeners: Mutex::new(vec![]) } } - fn catch_panic(&mut self, g: G) -> thread::Result - where G: FnOnce() -> R + panic::RecoverSafe { - let result = panic::recover(g); + #[allow(deprecated)] + // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) + fn catch_panic(&mut self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + let result = thread::catch_panic(g); - println!("After calling function"); - if let Err(ref e) = result { - let res = self.converter.convert(e); - println!("Got error. Notifying"); - self.notify_all(res); + if let Err(ref e) = result { + let res = self.converter.convert(e); + if let Some(r) = res { + self.notify_all(&r); } - - result } + + result + } + + fn notify_all(&mut self, r: &T) { + let mut listeners = self.listeners.lock().unwrap(); + for listener in listeners.deref_mut() { + listener.call(r); + } + } } impl MayPanic for BasePanicHandler where C: ArgsConverter, T: 'static { - fn on_panic(&mut self, closure: F) + fn on_panic(&self, closure: F) where F: OnPanicListener { self.listeners.lock().unwrap().push(Box::new(closure)); } } +pub struct StringPanicHandler { + handler: BasePanicHandler +} + +impl StringPanicHandler { + pub fn new_thread_safe() -> SafeStringPanicHandler { + Arc::new(Mutex::new(Self::new())) + } + + pub fn new () -> Self { + Self::with_converter(StringConverter) + } +} + +impl PanicHandler for StringPanicHandler { + + fn with_converter(converter: StringConverter) -> Self { + StringPanicHandler { + handler: BasePanicHandler::with_converter(converter) + } + } + + fn catch_panic(&mut self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + self.handler.catch_panic(g) + } + + fn notify_all(&mut self, r: &String) { + self.handler.notify_all(r); + } +} + +impl MayPanic for StringPanicHandler { + fn on_panic(&self, closure: F) + where F: OnPanicListener { + self.handler.on_panic(closure) + } +} + #[test] fn should_notify_listeners_about_panic () { - use std::sync::{Arc, RwLock}; - + use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = BasePanicHandler::new(StringConverter); + let mut p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when - p.catch_panic(|| panic!("Panic!")); + p.catch_panic(|| panic!("Panic!")).unwrap_err(); // then assert!(invocations.read().unwrap()[0] == "Panic!"); } +#[test] +fn should_notify_listeners_about_panic_when_string_is_dynamic () { + use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let mut p = StringPanicHandler::new(); + p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + + // when + p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic: 1"); +} + #[test] fn should_notify_listeners_about_panic_in_other_thread () { use std::thread; - use std::sync::{Arc, RwLock}; + use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = BasePanicHandler::new(StringConverter); + let mut p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when let t = thread::spawn(move || - p.catch_panic(|| panic!("Panic!")) + p.catch_panic(|| panic!("Panic!")).unwrap() ); - t.join(); + t.join().unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +} + +#[test] +fn should_forward_panics () { +use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let mut p = StringPanicHandler::new(); + p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + + let mut p2 = StringPanicHandler::new(); + p2.on_panic(move |t: &String| p.notify_all(t)); + + // when + p2.catch_panic(|| panic!("Panic!")).unwrap_err(); // then assert!(invocations.read().unwrap()[0] == "Panic!"); From 31bcc541d043302e682bf4bb31ae38044f5a195c Mon Sep 17 00:00:00 2001 From: Tomusdrw Date: Wed, 10 Feb 2016 14:16:42 +0100 Subject: [PATCH 3/6] Fixing parity build --- parity/main.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/parity/main.rs b/parity/main.rs index dd0a336be..50b1557a5 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -45,7 +45,6 @@ use ethcore::spec::*; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; -use ethcore::spec; use ethcore::blockchain::CacheSize; use ethsync::EthSync; use target_info::Target; @@ -117,7 +116,7 @@ impl Configuration { } } - fn print_version() { + fn print_version(&self) { println!("\ Parity version {} ({}-{}-{}) Copyright 2015, 2016 Ethcore (UK) Limited @@ -129,8 +128,8 @@ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ ", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os()); } - fn get_spec() -> Spec { - match args.flag_chain.as_ref() { + fn get_spec(&self) -> Spec { + match self.args.flag_chain.as_ref() { "frontier" | "mainnet" => ethereum::new_frontier(), "morden" | "testnet" => ethereum::new_morden(), "olympic" => ethereum::new_olympic(), From 0d121dd51a84d854d65056e7441a9fb15d607dd5 Mon Sep 17 00:00:00 2001 From: Tomusdrw Date: Wed, 10 Feb 2016 14:49:31 +0100 Subject: [PATCH 4/6] Removing unecessary locks causing dead-locks --- ethcore/src/block_queue.rs | 7 +++---- ethcore/src/client.rs | 6 +++--- util/src/io/service.rs | 5 ++--- util/src/io/worker.rs | 7 +++---- util/src/panics.rs | 34 +++++++++++++--------------------- 5 files changed, 24 insertions(+), 35 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 59de4403b..389435a61 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -60,7 +60,7 @@ impl BlockQueueInfo { /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { - panic_handler: SafeStringPanicHandler, + panic_handler: Arc, engine: Arc>, more_to_verify: Arc, verification: Arc>, @@ -115,7 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); - let panic_handler = StringPanicHandler::new_thread_safe(); + let panic_handler = StringPanicHandler::new_arc(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; @@ -131,8 +131,7 @@ impl BlockQueue { thread::Builder::new() .name(format!("Verifier #{}", i)) .spawn(move || { - let mut panic = panic_handler.lock().unwrap(); - panic.catch_panic(move || { + panic_handler.catch_panic(move || { BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) }).unwrap() }) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 3596d56f9..8a3b18d5c 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -162,7 +162,7 @@ pub struct Client { block_queue: RwLock, report: RwLock, import_lock: Mutex<()>, - panic_handler: SafeStringPanicHandler, + panic_handler: Arc, } const HISTORY: u64 = 1000; @@ -211,9 +211,9 @@ impl Client { } let block_queue = BlockQueue::new(engine.clone(), message_channel); - let panic_handler = StringPanicHandler::new_thread_safe(); + let panic_handler = StringPanicHandler::new_arc(); let panic = panic_handler.clone(); - block_queue.on_panic(move |t: &String| panic.lock().unwrap().notify_all(t)); + block_queue.on_panic(move |t: &String| panic.notify_all(t)); Ok(Arc::new(Client { chain: chain, diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 1f4eeea09..f65619a66 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -307,7 +307,7 @@ impl IoChannel where Message: Send + Clone { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + Sync + Clone + 'static { - panic_handler: SafeStringPanicHandler, + panic_handler: Arc, thread: Option>, host_channel: Sender>, } @@ -321,12 +321,11 @@ impl MayPanic for IoService where Message: Send + Sync impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { - let panic_handler = StringPanicHandler::new_thread_safe(); + let panic_handler = StringPanicHandler::new_arc(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let panic = panic_handler.clone(); let thread = thread::spawn(move || { - let mut panic = panic.lock().unwrap(); panic.catch_panic(move || { IoManager::::start(&mut event_loop).unwrap(); }).unwrap() diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 84979140b..33bb76bd7 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -44,7 +44,7 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, - panic_handler: SafeStringPanicHandler, + panic_handler: Arc, } impl Worker { @@ -55,7 +55,7 @@ impl Worker { wait: Arc, wait_mutex: Arc>) -> Worker where Message: Send + Sync + Clone + 'static { - let panic_handler = StringPanicHandler::new_thread_safe(); + let panic_handler = StringPanicHandler::new_arc(); let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { panic_handler: panic_handler.clone(), @@ -66,8 +66,7 @@ impl Worker { let panic_handler = panic_handler.clone(); worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( move || { - let mut panic = panic_handler.lock().unwrap(); - panic.catch_panic(move || { + panic_handler.catch_panic(move || { Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) }).unwrap() }) diff --git a/util/src/panics.rs b/util/src/panics.rs index dee5f3076..b618903b2 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -43,17 +43,9 @@ pub trait MayPanic { pub trait PanicHandler> : MayPanic{ fn with_converter(converter: C) -> Self; - fn catch_panic(&mut self, g: G) -> thread::Result + fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static; - fn notify_all(&mut self, &T); -} - -pub type SafeStringPanicHandler = Arc>; - -impl MayPanic for SafeStringPanicHandler { - fn on_panic(&self, closure: F) where F: OnPanicListener { - self.lock().unwrap().on_panic(closure); - } + fn notify_all(&self, &T); } pub struct StringConverter; @@ -84,7 +76,7 @@ impl PanicHandler for BasePanicHandler #[allow(deprecated)] // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) - fn catch_panic(&mut self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { let result = thread::catch_panic(g); if let Err(ref e) = result { @@ -97,7 +89,7 @@ impl PanicHandler for BasePanicHandler result } - fn notify_all(&mut self, r: &T) { + fn notify_all(&self, r: &T) { let mut listeners = self.listeners.lock().unwrap(); for listener in listeners.deref_mut() { listener.call(r); @@ -118,8 +110,8 @@ pub struct StringPanicHandler { } impl StringPanicHandler { - pub fn new_thread_safe() -> SafeStringPanicHandler { - Arc::new(Mutex::new(Self::new())) + pub fn new_arc() -> Arc { + Arc::new(Self::new()) } pub fn new () -> Self { @@ -135,11 +127,11 @@ impl PanicHandler for StringPanicHandler { } } - fn catch_panic(&mut self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { self.handler.catch_panic(g) } - fn notify_all(&mut self, r: &String) { + fn notify_all(&self, r: &String) { self.handler.notify_all(r); } } @@ -157,7 +149,7 @@ fn should_notify_listeners_about_panic () { // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = StringPanicHandler::new(); + let p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when @@ -173,7 +165,7 @@ fn should_notify_listeners_about_panic_when_string_is_dynamic () { // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = StringPanicHandler::new(); + let p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when @@ -191,7 +183,7 @@ fn should_notify_listeners_about_panic_in_other_thread () { // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = StringPanicHandler::new(); + let p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when @@ -210,10 +202,10 @@ use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = StringPanicHandler::new(); + let p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); - let mut p2 = StringPanicHandler::new(); + let p2 = StringPanicHandler::new(); p2.on_panic(move |t: &String| p.notify_all(t)); // when From 7925642b1be684124449288b732fdba217f40e64 Mon Sep 17 00:00:00 2001 From: Tomusdrw Date: Wed, 10 Feb 2016 15:28:43 +0100 Subject: [PATCH 5/6] Removing overengineered stuff --- ethcore/src/block_queue.rs | 8 +-- ethcore/src/client.rs | 10 +-- parity/main.rs | 2 +- util/src/io/service.rs | 8 +-- util/src/io/worker.rs | 8 +-- util/src/panics.rs | 137 +++++++++++++------------------------ 6 files changed, 65 insertions(+), 108 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 389435a61..90f4338db 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -60,7 +60,7 @@ impl BlockQueueInfo { /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { - panic_handler: Arc, + panic_handler: Arc, engine: Arc>, more_to_verify: Arc, verification: Arc>, @@ -115,7 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); - let panic_handler = StringPanicHandler::new_arc(); + let panic_handler = PanicHandler::new_arc(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; @@ -337,8 +337,8 @@ impl BlockQueue { } } -impl MayPanic for BlockQueue { - fn on_panic(&self, closure: F) where F: OnPanicListener { +impl MayPanic for BlockQueue { + fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); } } diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 8a3b18d5c..92946b5ae 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -162,7 +162,7 @@ pub struct Client { block_queue: RwLock, report: RwLock, import_lock: Mutex<()>, - panic_handler: Arc, + panic_handler: Arc, } const HISTORY: u64 = 1000; @@ -211,9 +211,9 @@ impl Client { } let block_queue = BlockQueue::new(engine.clone(), message_channel); - let panic_handler = StringPanicHandler::new_arc(); + let panic_handler = PanicHandler::new_arc(); let panic = panic_handler.clone(); - block_queue.on_panic(move |t: &String| panic.notify_all(t)); + block_queue.on_panic(move |t| panic.notify_all(t)); Ok(Arc::new(Client { chain: chain, @@ -440,8 +440,8 @@ impl BlockChainClient for Client { } } -impl MayPanic for Client { - fn on_panic(&self, closure: F) where F: OnPanicListener { +impl MayPanic for Client { + fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); } } diff --git a/parity/main.rs b/parity/main.rs index 50b1557a5..6d341a29f 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -169,7 +169,7 @@ fn wait_for_exit(client: Arc) { let e = exit.clone(); CtrlC::set_handler(move || { e.notify_all(); }); let e = exit.clone(); - client.on_panic(move |_t: &String| { e.notify_all(); }); + client.on_panic(move |_reason| { e.notify_all(); }); // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); diff --git a/util/src/io/service.rs b/util/src/io/service.rs index f65619a66..c740a79c2 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -307,13 +307,13 @@ impl IoChannel where Message: Send + Clone { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + Sync + Clone + 'static { - panic_handler: Arc, + panic_handler: Arc, thread: Option>, host_channel: Sender>, } -impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { - fn on_panic(&self, closure: F) where F: OnPanicListener { +impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); } } @@ -321,7 +321,7 @@ impl MayPanic for IoService where Message: Send + Sync impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { - let panic_handler = StringPanicHandler::new_arc(); + let panic_handler = PanicHandler::new_arc(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let panic = panic_handler.clone(); diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 33bb76bd7..6300dda2e 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -44,7 +44,7 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, - panic_handler: Arc, + panic_handler: Arc, } impl Worker { @@ -55,7 +55,7 @@ impl Worker { wait: Arc, wait_mutex: Arc>) -> Worker where Message: Send + Sync + Clone + 'static { - let panic_handler = StringPanicHandler::new_arc(); + let panic_handler = PanicHandler::new_arc(); let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { panic_handler: panic_handler.clone(), @@ -114,8 +114,8 @@ impl Worker { } } -impl MayPanic for Worker { - fn on_panic(&self, closure: F) where F: OnPanicListener { +impl MayPanic for Worker { + fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); } } diff --git a/util/src/panics.rs b/util/src/panics.rs index b618903b2..44bae9308 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -21,126 +21,83 @@ use std::ops::DerefMut; use std::any::Any; use std::sync::{Arc, Mutex}; -pub trait OnPanicListener: Send + Sync + 'static { - fn call(&mut self, arg: &T); +/// Thread-safe closure for handling possible panics +pub trait OnPanicListener: Send + Sync + 'static { + /// Invoke listener + fn call(&mut self, arg: &str); } -impl OnPanicListener for F - where F: FnMut(&T) + Send + Sync + 'static { - fn call(&mut self, arg: &T) { - self(arg) - } -} - -pub trait ArgsConverter : Send + Sync { - fn convert(&self, t: &Box) -> Option; -} - -pub trait MayPanic { +/// Trait indicating that the structure catches some of the panics (most probably from spawned threads) +/// and it's possbile to be notified when one of the threads panics. +pub trait MayPanic { + /// `closure` will be invoked whenever panic in thread is caught fn on_panic(&self, closure: F) - where F: OnPanicListener; + where F: OnPanicListener; } -pub trait PanicHandler> : MayPanic{ - fn with_converter(converter: C) -> Self; - fn catch_panic(&self, g: G) -> thread::Result - where G: FnOnce() -> R + Send + 'static; - fn notify_all(&self, &T); +/// Structure that allows to catch panics and notify listeners +pub struct PanicHandler { + listeners: Mutex>> } -pub struct StringConverter; -impl ArgsConverter for StringConverter { - fn convert(&self, t: &Box) -> Option { - let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()); - let as_string = t.downcast_ref::().cloned(); - - as_str.or(as_string) +impl PanicHandler { + /// Creates new `PanicHandler` wrapped in `Arc` + pub fn new_arc() -> Arc { + Arc::new(Self::new()) } -} -pub struct BasePanicHandler - where C: ArgsConverter, T: 'static { - converter: C, - listeners: Mutex>>> -} - -impl PanicHandler for BasePanicHandler - where C: ArgsConverter, T: 'static { - - fn with_converter(converter: C) -> Self { - BasePanicHandler { - converter: converter, + /// Creates new `PanicHandler` + pub fn new() -> PanicHandler { + PanicHandler { listeners: Mutex::new(vec![]) } } + /// Invoke closure and catch any possible panics. + /// In case of panic notifies all listeners about it. #[allow(deprecated)] // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) - fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + pub fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { let result = thread::catch_panic(g); if let Err(ref e) = result { - let res = self.converter.convert(e); + let res = convert_to_string(e); if let Some(r) = res { - self.notify_all(&r); + self.notify_all(r); } } result } - fn notify_all(&self, r: &T) { + /// Notify listeners about panic + pub fn notify_all(&self, r: String) { let mut listeners = self.listeners.lock().unwrap(); for listener in listeners.deref_mut() { - listener.call(r); + listener.call(&r); } } } -impl MayPanic for BasePanicHandler - where C: ArgsConverter, T: 'static { +impl MayPanic for PanicHandler { fn on_panic(&self, closure: F) - where F: OnPanicListener { + where F: OnPanicListener { self.listeners.lock().unwrap().push(Box::new(closure)); } } -pub struct StringPanicHandler { - handler: BasePanicHandler -} - -impl StringPanicHandler { - pub fn new_arc() -> Arc { - Arc::new(Self::new()) - } - - pub fn new () -> Self { - Self::with_converter(StringConverter) +impl OnPanicListener for F + where F: FnMut(String) + Send + Sync + 'static { + fn call(&mut self, arg: &str) { + self(arg.to_owned()) } } -impl PanicHandler for StringPanicHandler { +fn convert_to_string(t: &Box) -> Option { + let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()); + let as_string = t.downcast_ref::().cloned(); - fn with_converter(converter: StringConverter) -> Self { - StringPanicHandler { - handler: BasePanicHandler::with_converter(converter) - } - } - - fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { - self.handler.catch_panic(g) - } - - fn notify_all(&self, r: &String) { - self.handler.notify_all(r); - } -} - -impl MayPanic for StringPanicHandler { - fn on_panic(&self, closure: F) - where F: OnPanicListener { - self.handler.on_panic(closure) - } + as_str.or(as_string) } #[test] @@ -149,8 +106,8 @@ fn should_notify_listeners_about_panic () { // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let p = StringPanicHandler::new(); - p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); // when p.catch_panic(|| panic!("Panic!")).unwrap_err(); @@ -165,8 +122,8 @@ fn should_notify_listeners_about_panic_when_string_is_dynamic () { // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let p = StringPanicHandler::new(); - p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); // when p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); @@ -183,8 +140,8 @@ fn should_notify_listeners_about_panic_in_other_thread () { // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let p = StringPanicHandler::new(); - p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); // when let t = thread::spawn(move || @@ -202,11 +159,11 @@ use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let p = StringPanicHandler::new(); - p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); - let p2 = StringPanicHandler::new(); - p2.on_panic(move |t: &String| p.notify_all(t)); + let p2 = PanicHandler::new(); + p2.on_panic(move |t| p.notify_all(t)); // when p2.catch_panic(|| panic!("Panic!")).unwrap_err(); From 96dda7b73a3ca6a34b7dcd63edeb7073dedaf375 Mon Sep 17 00:00:00 2001 From: Tomusdrw Date: Wed, 10 Feb 2016 16:35:52 +0100 Subject: [PATCH 6/6] Forwarding panics from threads --- ethcore/src/block_queue.rs | 2 +- ethcore/src/client.rs | 5 ++--- ethcore/src/service.rs | 15 ++++++++++++++- parity/main.rs | 6 +++--- util/src/io/service.rs | 17 +++++++++++++---- util/src/io/worker.rs | 16 ++++------------ util/src/network/service.rs | 14 +++++++++++++- util/src/panics.rs | 28 +++++++++++++++++++--------- 8 files changed, 69 insertions(+), 34 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 90f4338db..dcfcec1e4 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -115,7 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); - let panic_handler = PanicHandler::new_arc(); + let panic_handler = PanicHandler::new_in_arc(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 92946b5ae..560be8bfd 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -211,9 +211,8 @@ impl Client { } let block_queue = BlockQueue::new(engine.clone(), message_channel); - let panic_handler = PanicHandler::new_arc(); - let panic = panic_handler.clone(); - block_queue.on_panic(move |t| panic.notify_all(t)); + let panic_handler = PanicHandler::new_in_arc(); + panic_handler.forward_from(&block_queue); Ok(Arc::new(Client { chain: chain, diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 92f483507..8f95dd361 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -17,6 +17,7 @@ //! Creates and registers client and network services. use util::*; +use util::panics::*; use spec::Spec; use error::*; use std::env; @@ -27,7 +28,7 @@ use client::Client; pub enum SyncMessage { /// New block has been imported into the blockchain NewChainBlock(Bytes), //TODO: use Cow - /// A block is ready + /// A block is ready BlockVerified, } @@ -38,17 +39,22 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, + panic_handler: Arc } impl ClientService { /// Start the service in a separate thread. pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let panic_handler = PanicHandler::new_in_arc(); let mut net_service = try!(NetworkService::start(net_config)); + panic_handler.forward_from(&net_service); + info!("Starting {}", net_service.host_info()); info!("Configured for {} using {} engine", spec.name, spec.engine_name); let mut dir = env::home_dir().unwrap(); dir.push(".parity"); let client = try!(Client::new(spec, &dir, net_service.io().channel())); + panic_handler.forward_from(client.deref()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -57,6 +63,7 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, + panic_handler: panic_handler, }) } @@ -81,6 +88,12 @@ impl ClientService { } } +impl MayPanic for ClientService { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + /// IO interface for the Client handler struct ClientIoHandler { client: Arc diff --git a/parity/main.rs b/parity/main.rs index 6d341a29f..cc59aacb8 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -163,13 +163,13 @@ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ } } -fn wait_for_exit(client: Arc) { +fn wait_for_exit(client_service: &ClientService) { let exit = Arc::new(Condvar::new()); // Handle possible exits let e = exit.clone(); CtrlC::set_handler(move || { e.notify_all(); }); let e = exit.clone(); - client.on_panic(move |_reason| { e.notify_all(); }); + client_service.on_panic(move |_reason| { e.notify_all(); }); // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); @@ -219,7 +219,7 @@ fn main() { service.io().register_handler(io_handler).expect("Error registering IO handler"); // Handle exit - wait_for_exit(service.client()); + wait_for_exit(&service); } struct Informant { diff --git a/util/src/io/service.rs b/util/src/io/service.rs index c740a79c2..c5f4a6072 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -160,13 +160,21 @@ pub struct IoManager where Message: Send + Sync { impl IoManager where Message: Send + Sync + Clone + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + pub fn start(panic_handler: Arc, event_loop: &mut EventLoop>) -> Result<(), UtilError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready = Arc::new(Condvar::new()); let workers = (0..num_workers).map(|i| - Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); + Worker::new( + i, + stealer.clone(), + IoChannel::new(event_loop.channel()), + work_ready.clone(), + work_ready_mutex.clone(), + panic_handler.clone() + ) + ).collect(); let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), @@ -321,13 +329,14 @@ impl MayPanic for IoService where Message: Send + Sync + Clone impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { - let panic_handler = PanicHandler::new_arc(); + let panic_handler = PanicHandler::new_in_arc(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let panic = panic_handler.clone(); let thread = thread::spawn(move || { + let p = panic.clone(); panic.catch_panic(move || { - IoManager::::start(&mut event_loop).unwrap(); + IoManager::::start(p, &mut event_loop).unwrap(); }).unwrap() }); Ok(IoService { diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 6300dda2e..1ba0318bc 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -44,7 +44,6 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, - panic_handler: Arc, } impl Worker { @@ -53,17 +52,16 @@ impl Worker { stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, - wait_mutex: Arc>) -> Worker - where Message: Send + Sync + Clone + 'static { - let panic_handler = PanicHandler::new_arc(); + wait_mutex: Arc>, + panic_handler: Arc + ) -> Worker + where Message: Send + Sync + Clone + 'static { let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { - panic_handler: panic_handler.clone(), thread: None, wait: wait.clone(), deleting: deleting.clone(), }; - let panic_handler = panic_handler.clone(); worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( move || { panic_handler.catch_panic(move || { @@ -114,12 +112,6 @@ impl Worker { } } -impl MayPanic for Worker { - fn on_panic(&self, closure: F) where F: OnPanicListener { - self.panic_handler.on_panic(closure); - } -} - impl Drop for Worker { fn drop(&mut self) { self.deleting.store(true, AtomicOrdering::Relaxed); diff --git a/util/src/network/service.rs b/util/src/network/service.rs index d63836daf..60f0ec415 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -16,6 +16,7 @@ use std::sync::*; use error::*; +use panics::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::{NetworkError}; use network::host::{Host, NetworkIoMessage, ProtocolId}; @@ -27,13 +28,17 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, - stats: Arc + stats: Arc, + panic_handler: Arc } impl NetworkService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start(config: NetworkConfiguration) -> Result, UtilError> { + let panic_handler = PanicHandler::new_in_arc(); let mut io_service = try!(IoService::>::start()); + panic_handler.forward_from(&io_service); + let host = Arc::new(Host::new(config)); let stats = host.stats().clone(); let host_info = host.client_version(); @@ -43,6 +48,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat io_service: io_service, host_info: host_info, stats: stats, + panic_handler: panic_handler }) } @@ -72,3 +78,9 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } } + +impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} diff --git a/util/src/panics.rs b/util/src/panics.rs index 44bae9308..72718db58 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -27,12 +27,17 @@ pub trait OnPanicListener: Send + Sync + 'static { fn call(&mut self, arg: &str); } +/// Forwards panics from child +pub trait ForwardPanic { + /// Attach `on_panic` listener to `child` and rethrow all panics + fn forward_from(&self, child: &S) where S : MayPanic; +} + /// Trait indicating that the structure catches some of the panics (most probably from spawned threads) /// and it's possbile to be notified when one of the threads panics. pub trait MayPanic { /// `closure` will be invoked whenever panic in thread is caught - fn on_panic(&self, closure: F) - where F: OnPanicListener; + fn on_panic(&self, closure: F) where F: OnPanicListener; } /// Structure that allows to catch panics and notify listeners @@ -42,7 +47,7 @@ pub struct PanicHandler { impl PanicHandler { /// Creates new `PanicHandler` wrapped in `Arc` - pub fn new_arc() -> Arc { + pub fn new_in_arc() -> Arc { Arc::new(Self::new()) } @@ -70,8 +75,7 @@ impl PanicHandler { result } - /// Notify listeners about panic - pub fn notify_all(&self, r: String) { + fn notify_all(&self, r: String) { let mut listeners = self.listeners.lock().unwrap(); for listener in listeners.deref_mut() { listener.call(&r); @@ -80,12 +84,18 @@ impl PanicHandler { } impl MayPanic for PanicHandler { - fn on_panic(&self, closure: F) - where F: OnPanicListener { + fn on_panic(&self, closure: F) where F: OnPanicListener { self.listeners.lock().unwrap().push(Box::new(closure)); } } +impl ForwardPanic for Arc { + fn forward_from(&self, child: &S) where S : MayPanic { + let p = self.clone(); + child.on_panic(move |t| p.notify_all(t)); + } +} + impl OnPanicListener for F where F: FnMut(String) + Send + Sync + 'static { fn call(&mut self, arg: &str) { @@ -159,11 +169,11 @@ use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let p = PanicHandler::new(); + let p = PanicHandler::new_in_arc(); p.on_panic(move |t| i.write().unwrap().push(t)); let p2 = PanicHandler::new(); - p2.on_panic(move |t| p.notify_all(t)); + p.forward_from(&p2); // when p2.catch_panic(|| panic!("Panic!")).unwrap_err();