diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 90f4338db..dcfcec1e4 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -115,7 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); - let panic_handler = PanicHandler::new_arc(); + let panic_handler = PanicHandler::new_in_arc(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 92946b5ae..560be8bfd 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -211,9 +211,8 @@ impl Client { } let block_queue = BlockQueue::new(engine.clone(), message_channel); - let panic_handler = PanicHandler::new_arc(); - let panic = panic_handler.clone(); - block_queue.on_panic(move |t| panic.notify_all(t)); + let panic_handler = PanicHandler::new_in_arc(); + panic_handler.forward_from(&block_queue); Ok(Arc::new(Client { chain: chain, diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 92f483507..8f95dd361 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -17,6 +17,7 @@ //! Creates and registers client and network services. use util::*; +use util::panics::*; use spec::Spec; use error::*; use std::env; @@ -27,7 +28,7 @@ use client::Client; pub enum SyncMessage { /// New block has been imported into the blockchain NewChainBlock(Bytes), //TODO: use Cow - /// A block is ready + /// A block is ready BlockVerified, } @@ -38,17 +39,22 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, + panic_handler: Arc } impl ClientService { /// Start the service in a separate thread. pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let panic_handler = PanicHandler::new_in_arc(); let mut net_service = try!(NetworkService::start(net_config)); + panic_handler.forward_from(&net_service); + info!("Starting {}", net_service.host_info()); info!("Configured for {} using {} engine", spec.name, spec.engine_name); let mut dir = env::home_dir().unwrap(); dir.push(".parity"); let client = try!(Client::new(spec, &dir, net_service.io().channel())); + panic_handler.forward_from(client.deref()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -57,6 +63,7 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, + panic_handler: panic_handler, }) } @@ -81,6 +88,12 @@ impl ClientService { } } +impl MayPanic for ClientService { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + /// IO interface for the Client handler struct ClientIoHandler { client: Arc diff --git a/parity/main.rs b/parity/main.rs index 6d341a29f..cc59aacb8 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -163,13 +163,13 @@ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ } } -fn wait_for_exit(client: Arc) { +fn wait_for_exit(client_service: &ClientService) { let exit = Arc::new(Condvar::new()); // Handle possible exits let e = exit.clone(); CtrlC::set_handler(move || { e.notify_all(); }); let e = exit.clone(); - client.on_panic(move |_reason| { e.notify_all(); }); + client_service.on_panic(move |_reason| { e.notify_all(); }); // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); @@ -219,7 +219,7 @@ fn main() { service.io().register_handler(io_handler).expect("Error registering IO handler"); // Handle exit - wait_for_exit(service.client()); + wait_for_exit(&service); } struct Informant { diff --git a/util/src/io/service.rs b/util/src/io/service.rs index c740a79c2..c5f4a6072 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -160,13 +160,21 @@ pub struct IoManager where Message: Send + Sync { impl IoManager where Message: Send + Sync + Clone + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(event_loop: &mut EventLoop>) -> Result<(), UtilError> { + pub fn start(panic_handler: Arc, event_loop: &mut EventLoop>) -> Result<(), UtilError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready = Arc::new(Condvar::new()); let workers = (0..num_workers).map(|i| - Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); + Worker::new( + i, + stealer.clone(), + IoChannel::new(event_loop.channel()), + work_ready.clone(), + work_ready_mutex.clone(), + panic_handler.clone() + ) + ).collect(); let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), @@ -321,13 +329,14 @@ impl MayPanic for IoService where Message: Send + Sync + Clone impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { - let panic_handler = PanicHandler::new_arc(); + let panic_handler = PanicHandler::new_in_arc(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); let panic = panic_handler.clone(); let thread = thread::spawn(move || { + let p = panic.clone(); panic.catch_panic(move || { - IoManager::::start(&mut event_loop).unwrap(); + IoManager::::start(p, &mut event_loop).unwrap(); }).unwrap() }); Ok(IoService { diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index 6300dda2e..1ba0318bc 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -44,7 +44,6 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, - panic_handler: Arc, } impl Worker { @@ -53,17 +52,16 @@ impl Worker { stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, - wait_mutex: Arc>) -> Worker - where Message: Send + Sync + Clone + 'static { - let panic_handler = PanicHandler::new_arc(); + wait_mutex: Arc>, + panic_handler: Arc + ) -> Worker + where Message: Send + Sync + Clone + 'static { let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { - panic_handler: panic_handler.clone(), thread: None, wait: wait.clone(), deleting: deleting.clone(), }; - let panic_handler = panic_handler.clone(); worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( move || { panic_handler.catch_panic(move || { @@ -114,12 +112,6 @@ impl Worker { } } -impl MayPanic for Worker { - fn on_panic(&self, closure: F) where F: OnPanicListener { - self.panic_handler.on_panic(closure); - } -} - impl Drop for Worker { fn drop(&mut self) { self.deleting.store(true, AtomicOrdering::Relaxed); diff --git a/util/src/network/service.rs b/util/src/network/service.rs index d63836daf..60f0ec415 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -16,6 +16,7 @@ use std::sync::*; use error::*; +use panics::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::{NetworkError}; use network::host::{Host, NetworkIoMessage, ProtocolId}; @@ -27,13 +28,17 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, - stats: Arc + stats: Arc, + panic_handler: Arc } impl NetworkService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start(config: NetworkConfiguration) -> Result, UtilError> { + let panic_handler = PanicHandler::new_in_arc(); let mut io_service = try!(IoService::>::start()); + panic_handler.forward_from(&io_service); + let host = Arc::new(Host::new(config)); let stats = host.stats().clone(); let host_info = host.client_version(); @@ -43,6 +48,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat io_service: io_service, host_info: host_info, stats: stats, + panic_handler: panic_handler }) } @@ -72,3 +78,9 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } } + +impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} diff --git a/util/src/panics.rs b/util/src/panics.rs index 44bae9308..72718db58 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -27,12 +27,17 @@ pub trait OnPanicListener: Send + Sync + 'static { fn call(&mut self, arg: &str); } +/// Forwards panics from child +pub trait ForwardPanic { + /// Attach `on_panic` listener to `child` and rethrow all panics + fn forward_from(&self, child: &S) where S : MayPanic; +} + /// Trait indicating that the structure catches some of the panics (most probably from spawned threads) /// and it's possbile to be notified when one of the threads panics. pub trait MayPanic { /// `closure` will be invoked whenever panic in thread is caught - fn on_panic(&self, closure: F) - where F: OnPanicListener; + fn on_panic(&self, closure: F) where F: OnPanicListener; } /// Structure that allows to catch panics and notify listeners @@ -42,7 +47,7 @@ pub struct PanicHandler { impl PanicHandler { /// Creates new `PanicHandler` wrapped in `Arc` - pub fn new_arc() -> Arc { + pub fn new_in_arc() -> Arc { Arc::new(Self::new()) } @@ -70,8 +75,7 @@ impl PanicHandler { result } - /// Notify listeners about panic - pub fn notify_all(&self, r: String) { + fn notify_all(&self, r: String) { let mut listeners = self.listeners.lock().unwrap(); for listener in listeners.deref_mut() { listener.call(&r); @@ -80,12 +84,18 @@ impl PanicHandler { } impl MayPanic for PanicHandler { - fn on_panic(&self, closure: F) - where F: OnPanicListener { + fn on_panic(&self, closure: F) where F: OnPanicListener { self.listeners.lock().unwrap().push(Box::new(closure)); } } +impl ForwardPanic for Arc { + fn forward_from(&self, child: &S) where S : MayPanic { + let p = self.clone(); + child.on_panic(move |t| p.notify_all(t)); + } +} + impl OnPanicListener for F where F: FnMut(String) + Send + Sync + 'static { fn call(&mut self, arg: &str) { @@ -159,11 +169,11 @@ use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let p = PanicHandler::new(); + let p = PanicHandler::new_in_arc(); p.on_panic(move |t| i.write().unwrap().push(t)); let p2 = PanicHandler::new(); - p2.on_panic(move |t| p.notify_all(t)); + p.forward_from(&p2); // when p2.catch_panic(|| panic!("Panic!")).unwrap_err();