diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index fb735c973..dcfcec1e4 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -26,6 +26,7 @@ use views::*; use header::*; use service::*; use client::BlockStatus; +use util::panics::*; /// Block queue status #[derive(Debug)] @@ -59,6 +60,7 @@ impl BlockQueueInfo { /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { + panic_handler: Arc, engine: Arc>, more_to_verify: Arc, verification: Arc>, @@ -113,6 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); + let panic_handler = PanicHandler::new_in_arc(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; @@ -123,11 +126,21 @@ impl BlockQueue { let ready_signal = ready_signal.clone(); let empty = empty.clone(); let deleting = deleting.clone(); - verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)) - .expect("Error starting block verification thread")); + let panic_handler = panic_handler.clone(); + verifiers.push( + thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + panic_handler.catch_panic(move || { + BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + }).unwrap() + }) + .expect("Error starting block verification thread") + ); } BlockQueue { engine: engine, + panic_handler: panic_handler, ready_signal: ready_signal.clone(), more_to_verify: more_to_verify.clone(), verification: verification.clone(), @@ -150,7 +163,7 @@ impl BlockQueue { while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { lock = wait.wait(lock).unwrap(); } - + if deleting.load(AtomicOrdering::Relaxed) { return; } @@ -324,6 +337,12 @@ impl BlockQueue { } } +impl MayPanic for BlockQueue { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl Drop for BlockQueue { fn drop(&mut self) { self.clear(); diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 3de5c097e..26535c46c 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -17,6 +17,7 @@ //! Blockchain database client. use util::*; +use util::panics::*; use rocksdb::{Options, DB, DBCompactionStyle}; use blockchain::{BlockChain, BlockProvider, CacheSize, TransactionId}; use views::BlockView; @@ -164,7 +165,8 @@ pub struct Client { state_db: Mutex, block_queue: RwLock, report: RwLock, - import_lock: Mutex<()> + import_lock: Mutex<()>, + panic_handler: Arc, } const HISTORY: u64 = 1000; @@ -205,19 +207,25 @@ impl Client { let mut state_path = path.to_path_buf(); state_path.push("state"); let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); - + let engine = Arc::new(try!(spec.to_engine())); let mut state_db = JournalDB::new_with_arc(db.clone()); if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) { state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } + + let block_queue = BlockQueue::new(engine.clone(), message_channel); + let panic_handler = PanicHandler::new_in_arc(); + panic_handler.forward_from(&block_queue); + Ok(Arc::new(Client { chain: chain, - engine: engine.clone(), + engine: engine, state_db: Mutex::new(state_db), - block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), + block_queue: RwLock::new(block_queue), report: RwLock::new(Default::default()), import_lock: Mutex::new(()), + panic_handler: panic_handler })) } @@ -355,12 +363,12 @@ impl BlockChainClient for Client { fn block_status(&self, hash: &H256) -> BlockStatus { if self.chain.read().unwrap().is_known(&hash) { - BlockStatus::InChain - } else { - self.block_queue.read().unwrap().block_status(hash) + BlockStatus::InChain + } else { + self.block_queue.read().unwrap().block_status(hash) } } - + fn block_total_difficulty(&self, hash: &H256) -> Option { self.chain.read().unwrap().block_details(hash).map(|d| d.total_difficulty) } @@ -438,3 +446,9 @@ impl BlockChainClient for Client { } } } + +impl MayPanic for Client { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 92f483507..8f95dd361 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -17,6 +17,7 @@ //! Creates and registers client and network services. use util::*; +use util::panics::*; use spec::Spec; use error::*; use std::env; @@ -27,7 +28,7 @@ use client::Client; pub enum SyncMessage { /// New block has been imported into the blockchain NewChainBlock(Bytes), //TODO: use Cow - /// A block is ready + /// A block is ready BlockVerified, } @@ -38,17 +39,22 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, + panic_handler: Arc } impl ClientService { /// Start the service in a separate thread. pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let panic_handler = PanicHandler::new_in_arc(); let mut net_service = try!(NetworkService::start(net_config)); + panic_handler.forward_from(&net_service); + info!("Starting {}", net_service.host_info()); info!("Configured for {} using {} engine", spec.name, spec.engine_name); let mut dir = env::home_dir().unwrap(); dir.push(".parity"); let client = try!(Client::new(spec, &dir, net_service.io().channel())); + panic_handler.forward_from(client.deref()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -57,6 +63,7 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, + panic_handler: panic_handler, }) } @@ -81,6 +88,12 @@ impl ClientService { } } +impl MayPanic for ClientService { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + /// IO interface for the Client handler struct ClientIoHandler { client: Arc diff --git a/parity/main.rs b/parity/main.rs index f5a07208e..cc59aacb8 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -40,6 +40,7 @@ use rlog::{LogLevelFilter}; use env_logger::LogBuilder; use ctrlc::CtrlC; use util::*; +use util::panics::MayPanic; use ethcore::spec::*; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; @@ -92,7 +93,7 @@ fn setup_log(init: &str) { #[cfg(feature = "rpc")] fn setup_rpc_server(client: Arc, sync: Arc, url: &str) { use rpc::v1::*; - + let mut server = rpc::HttpServer::new(1); server.add_delegate(Web3Client::new().to_delegate()); server.add_delegate(EthClient::new(client.clone()).to_delegate()); @@ -105,10 +106,17 @@ fn setup_rpc_server(client: Arc, sync: Arc, url: &str) { fn setup_rpc_server(_client: Arc, _sync: Arc, _url: &str) { } -fn main() { - let args: Args = Args::docopt().decode().unwrap_or_else(|e| e.exit()); +struct Configuration { + args: Args +} +impl Configuration { + fn parse() -> Self { + Configuration { + args: Args::docopt().decode().unwrap_or_else(|e| e.exit()) + } + } - if args.flag_version { + fn print_version(&self) { println!("\ Parity version {} ({}-{}-{}) Copyright 2015, 2016 Ethcore (UK) Limited @@ -118,49 +126,100 @@ There is NO WARRANTY, to the extent permitted by law. By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ ", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os()); + } + + fn get_spec(&self) -> Spec { + match self.args.flag_chain.as_ref() { + "frontier" | "mainnet" => ethereum::new_frontier(), + "morden" | "testnet" => ethereum::new_morden(), + "olympic" => ethereum::new_olympic(), + f => Spec::from_json_utf8(contents(f).expect("Couldn't read chain specification file. Sure it exists?").as_ref()), + } + } + + fn get_init_nodes(&self, spec: &Spec) -> Vec { + match self.args.arg_enode.len() { + 0 => spec.nodes().clone(), + _ => self.args.arg_enode.clone(), + } + } + + fn get_net_addresses(&self) -> (SocketAddr, SocketAddr) { + let listen_address; + let public_address; + + match self.args.flag_address { + None => { + listen_address = SocketAddr::from_str(self.args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"); + public_address = SocketAddr::from_str(self.args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); + } + Some(ref a) => { + public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"); + listen_address = public_address.clone(); + } + }; + + (listen_address, public_address) + } +} + +fn wait_for_exit(client_service: &ClientService) { + let exit = Arc::new(Condvar::new()); + // Handle possible exits + let e = exit.clone(); + CtrlC::set_handler(move || { e.notify_all(); }); + let e = exit.clone(); + client_service.on_panic(move |_reason| { e.notify_all(); }); + // Wait for signal + let mutex = Mutex::new(()); + let _ = exit.wait(mutex.lock().unwrap()).unwrap(); +} + +fn main() { + let conf = Configuration::parse(); + if conf.args.flag_version { + conf.print_version(); return; } - setup_log(&args.flag_logging); + let spec = conf.get_spec(); + + // Setup logging + setup_log(&conf.args.flag_logging); + // Raise fdlimit unsafe { ::fdlimit::raise_fd_limit(); } - let spec = match args.flag_chain.as_ref() { - "frontier" | "mainnet" => ethereum::new_frontier(), - "morden" | "testnet" => ethereum::new_morden(), - "olympic" => ethereum::new_olympic(), - f => Spec::from_json_utf8(contents(f).expect("Couldn't read chain specification file. Sure it exists?").as_ref()), - }; - let init_nodes = match args.arg_enode.len() { - 0 => spec.nodes().clone(), - _ => args.arg_enode.clone(), - }; + // Configure network + let init_nodes = conf.get_init_nodes(&spec); + let (listen, public) = conf.get_net_addresses(); let mut net_settings = NetworkConfiguration::new(); net_settings.boot_nodes = init_nodes; - match args.flag_address { - None => { - net_settings.listen_address = SocketAddr::from_str(args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"); - net_settings.public_address = SocketAddr::from_str(args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); - } - Some(ref a) => { - net_settings.public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"); - net_settings.listen_address = net_settings.public_address.clone(); - } - } + net_settings.listen_address = listen; + net_settings.public_address = public; + + // Build client let mut service = ClientService::start(spec, net_settings).unwrap(); let client = service.client().clone(); - client.configure_cache(args.flag_cache_pref_size, args.flag_cache_max_size); + client.configure_cache(conf.args.flag_cache_pref_size, conf.args.flag_cache_max_size); + + // Sync let sync = EthSync::register(service.network(), client); - if args.flag_jsonrpc { - setup_rpc_server(service.client(), sync.clone(), &args.flag_jsonrpc_url); + + // Setup rpc + if conf.args.flag_jsonrpc { + setup_rpc_server(service.client(), sync.clone(), &conf.args.flag_jsonrpc_url); } - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync }); + + // Register IO handler + let io_handler = Arc::new(ClientIoHandler { + client: service.client(), + info: Default::default(), + sync: sync + }); service.io().register_handler(io_handler).expect("Error registering IO handler"); - let exit = Arc::new(Condvar::new()); - let e = exit.clone(); - CtrlC::set_handler(move || { e.notify_all(); }); - let mutex = Mutex::new(()); - let _ = exit.wait(mutex.lock().unwrap()).unwrap(); + // Handle exit + wait_for_exit(&service); } struct Informant { @@ -226,7 +285,7 @@ struct ClientIoHandler { } impl IoHandler for ClientIoHandler { - fn initialize(&self, io: &IoContext) { + fn initialize(&self, io: &IoContext) { io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); } diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 4f16efd30..40cdbc368 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -31,16 +31,16 @@ //! //! impl IoHandler for MyHandler { //! fn initialize(&self, io: &IoContext) { -//! io.register_timer(0, 1000).unwrap(); -//! } +//! io.register_timer(0, 1000).unwrap(); +//! } //! -//! fn timeout(&self, _io: &IoContext, timer: TimerToken) { -//! println!("Timeout {}", timer); -//! } +//! fn timeout(&self, _io: &IoContext, timer: TimerToken) { +//! println!("Timeout {}", timer); +//! } //! -//! fn message(&self, _io: &IoContext, message: &MyMessage) { -//! println!("Message {}", message.data); -//! } +//! fn message(&self, _io: &IoContext, message: &MyMessage) { +//! println!("Message {}", message.data); +//! } //! } //! //! fn main () { @@ -70,7 +70,7 @@ impl From<::mio::NotifyError>> for IoError } } -/// Generic IO handler. +/// Generic IO handler. /// All the handler function are called from within IO event loop. /// `Message` type is used as notification data pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + 'static { @@ -82,7 +82,7 @@ pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + ' fn message(&self, _io: &IoContext, _message: &Message) {} /// Called when an IO stream gets closed fn stream_hup(&self, _io: &IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be read from + /// Called when an IO stream can be read from fn stream_readable(&self, _io: &IoContext, _stream: StreamToken) {} /// Called when an IO stream can be written to fn stream_writable(&self, _io: &IoContext, _stream: StreamToken) {} diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 71b2520ed..c5f4a6072 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -25,6 +25,7 @@ use io::{IoError, IoHandler}; use arrayvec::*; use crossbeam::sync::chase_lev; use io::worker::{Worker, Work, WorkType}; +use panics::*; /// Timer ID pub type TimerToken = usize; @@ -159,13 +160,21 @@ pub struct IoManager where Message: Send + Sync { impl IoManager where Message: Send + Sync + Clone + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + pub fn start(panic_handler: Arc, event_loop: &mut EventLoop>) -> Result<(), UtilError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready = Arc::new(Condvar::new()); - let workers = (0..num_workers).map(|i| - Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); + let workers = (0..num_workers).map(|i| + Worker::new( + i, + stealer.clone(), + IoChannel::new(event_loop.channel()), + work_ready.clone(), + work_ready_mutex.clone(), + panic_handler.clone() + ) + ).collect(); let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), @@ -306,19 +315,32 @@ impl IoChannel where Message: Send + Clone { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + Sync + Clone + 'static { + panic_handler: Arc, thread: Option>, host_channel: Sender>, } +impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { + let panic_handler = PanicHandler::new_in_arc(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); + let panic = panic_handler.clone(); let thread = thread::spawn(move || { - IoManager::::start(&mut event_loop).unwrap(); //TODO: + let p = panic.clone(); + panic.catch_panic(move || { + IoManager::::start(p, &mut event_loop).unwrap(); + }).unwrap() }); Ok(IoService { + panic_handler: panic_handler, thread: Some(thread), host_channel: channel }) diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index fa8a0fa2c..1ba0318bc 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use crossbeam::sync::chase_lev; use io::service::{HandlerId, IoChannel, IoContext}; use io::{IoHandler}; +use panics::*; pub enum WorkType { Readable, @@ -47,12 +48,14 @@ pub struct Worker { impl Worker { /// Creates a new worker instance. - pub fn new(index: usize, - stealer: chase_lev::Stealer>, + pub fn new(index: usize, + stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, - wait_mutex: Arc>) -> Worker - where Message: Send + Sync + Clone + 'static { + wait_mutex: Arc>, + panic_handler: Arc + ) -> Worker + where Message: Send + Sync + Clone + 'static { let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { thread: None, @@ -60,15 +63,19 @@ impl Worker { deleting: deleting.clone(), }; worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( - move || Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)) + move || { + panic_handler.catch_panic(move || { + Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) + }).unwrap() + }) .expect("Error creating worker thread")); worker } fn work_loop(stealer: chase_lev::Stealer>, - channel: IoChannel, wait: Arc, - wait_mutex: Arc>, - deleting: Arc) + channel: IoChannel, wait: Arc, + wait_mutex: Arc>, + deleting: Arc) where Message: Send + Sync + Clone + 'static { while !deleting.load(AtomicOrdering::Relaxed) { { diff --git a/util/src/lib.rs b/util/src/lib.rs index 16a25f538..260ef4301 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -21,6 +21,8 @@ #![feature(plugin)] #![plugin(clippy)] #![allow(needless_range_loop, match_bool)] +#![feature(catch_panic)] + //! Ethcore-util library //! //! ### Rust version: @@ -54,7 +56,7 @@ //! cd parity //! cargo build --release //! ``` -//! +//! //! - OSX: //! //! ```bash @@ -129,6 +131,7 @@ pub mod semantic_version; pub mod io; pub mod network; pub mod log; +pub mod panics; pub use common::*; pub use misc::*; diff --git a/util/src/network/service.rs b/util/src/network/service.rs index d63836daf..60f0ec415 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -16,6 +16,7 @@ use std::sync::*; use error::*; +use panics::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::{NetworkError}; use network::host::{Host, NetworkIoMessage, ProtocolId}; @@ -27,13 +28,17 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, - stats: Arc + stats: Arc, + panic_handler: Arc } impl NetworkService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start(config: NetworkConfiguration) -> Result, UtilError> { + let panic_handler = PanicHandler::new_in_arc(); let mut io_service = try!(IoService::>::start()); + panic_handler.forward_from(&io_service); + let host = Arc::new(Host::new(config)); let stats = host.stats().clone(); let host_info = host.client_version(); @@ -43,6 +48,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat io_service: io_service, host_info: host_info, stats: stats, + panic_handler: panic_handler }) } @@ -72,3 +78,9 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } } + +impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} diff --git a/util/src/panics.rs b/util/src/panics.rs new file mode 100644 index 000000000..72718db58 --- /dev/null +++ b/util/src/panics.rs @@ -0,0 +1,183 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Panic utilities + +use std::thread; +use std::ops::DerefMut; +use std::any::Any; +use std::sync::{Arc, Mutex}; + +/// Thread-safe closure for handling possible panics +pub trait OnPanicListener: Send + Sync + 'static { + /// Invoke listener + fn call(&mut self, arg: &str); +} + +/// Forwards panics from child +pub trait ForwardPanic { + /// Attach `on_panic` listener to `child` and rethrow all panics + fn forward_from(&self, child: &S) where S : MayPanic; +} + +/// Trait indicating that the structure catches some of the panics (most probably from spawned threads) +/// and it's possbile to be notified when one of the threads panics. +pub trait MayPanic { + /// `closure` will be invoked whenever panic in thread is caught + fn on_panic(&self, closure: F) where F: OnPanicListener; +} + +/// Structure that allows to catch panics and notify listeners +pub struct PanicHandler { + listeners: Mutex>> +} + +impl PanicHandler { + /// Creates new `PanicHandler` wrapped in `Arc` + pub fn new_in_arc() -> Arc { + Arc::new(Self::new()) + } + + /// Creates new `PanicHandler` + pub fn new() -> PanicHandler { + PanicHandler { + listeners: Mutex::new(vec![]) + } + } + + /// Invoke closure and catch any possible panics. + /// In case of panic notifies all listeners about it. + #[allow(deprecated)] + // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) + pub fn catch_panic(&self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + let result = thread::catch_panic(g); + + if let Err(ref e) = result { + let res = convert_to_string(e); + if let Some(r) = res { + self.notify_all(r); + } + } + + result + } + + fn notify_all(&self, r: String) { + let mut listeners = self.listeners.lock().unwrap(); + for listener in listeners.deref_mut() { + listener.call(&r); + } + } +} + +impl MayPanic for PanicHandler { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.listeners.lock().unwrap().push(Box::new(closure)); + } +} + +impl ForwardPanic for Arc { + fn forward_from(&self, child: &S) where S : MayPanic { + let p = self.clone(); + child.on_panic(move |t| p.notify_all(t)); + } +} + +impl OnPanicListener for F + where F: FnMut(String) + Send + Sync + 'static { + fn call(&mut self, arg: &str) { + self(arg.to_owned()) + } +} + +fn convert_to_string(t: &Box) -> Option { + let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()); + let as_string = t.downcast_ref::().cloned(); + + as_str.or(as_string) +} + +#[test] +fn should_notify_listeners_about_panic () { + use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); + + // when + p.catch_panic(|| panic!("Panic!")).unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +} + +#[test] +fn should_notify_listeners_about_panic_when_string_is_dynamic () { + use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); + + // when + p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic: 1"); +} + +#[test] +fn should_notify_listeners_about_panic_in_other_thread () { + use std::thread; + use std::sync::RwLock; + + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let p = PanicHandler::new(); + p.on_panic(move |t| i.write().unwrap().push(t)); + + // when + let t = thread::spawn(move || + p.catch_panic(|| panic!("Panic!")).unwrap() + ); + t.join().unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +} + +#[test] +fn should_forward_panics () { +use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let p = PanicHandler::new_in_arc(); + p.on_panic(move |t| i.write().unwrap().push(t)); + + let p2 = PanicHandler::new(); + p.forward_from(&p2); + + // when + p2.catch_panic(|| panic!("Panic!")).unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +}