diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index fb735c973..59de4403b 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -26,6 +26,7 @@ use views::*; use header::*; use service::*; use client::BlockStatus; +use util::panics::*; /// Block queue status #[derive(Debug)] @@ -59,6 +60,7 @@ impl BlockQueueInfo { /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { + panic_handler: SafeStringPanicHandler, engine: Arc>, more_to_verify: Arc, verification: Arc>, @@ -113,6 +115,7 @@ impl BlockQueue { let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let deleting = Arc::new(AtomicBool::new(false)); let empty = Arc::new(Condvar::new()); + let panic_handler = StringPanicHandler::new_thread_safe(); let mut verifiers: Vec> = Vec::new(); let thread_count = max(::num_cpus::get(), 3) - 2; @@ -123,11 +126,22 @@ impl BlockQueue { let ready_signal = ready_signal.clone(); let empty = empty.clone(); let deleting = deleting.clone(); - verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)) - .expect("Error starting block verification thread")); + let panic_handler = panic_handler.clone(); + verifiers.push( + thread::Builder::new() + .name(format!("Verifier #{}", i)) + .spawn(move || { + let mut panic = panic_handler.lock().unwrap(); + panic.catch_panic(move || { + BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) + }).unwrap() + }) + .expect("Error starting block verification thread") + ); } BlockQueue { engine: engine, + panic_handler: panic_handler, ready_signal: ready_signal.clone(), more_to_verify: more_to_verify.clone(), verification: verification.clone(), @@ -150,7 +164,7 @@ impl BlockQueue { while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { lock = wait.wait(lock).unwrap(); } - + if deleting.load(AtomicOrdering::Relaxed) { return; } @@ -324,6 +338,12 @@ impl BlockQueue { } } +impl MayPanic for BlockQueue { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl Drop for BlockQueue { fn drop(&mut self) { self.clear(); diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 46b53e7b9..d657dce6b 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -17,6 +17,7 @@ //! Blockchain database client. use util::*; +use util::panics::*; use rocksdb::{Options, DB, DBCompactionStyle}; use blockchain::{BlockChain, BlockProvider, CacheSize}; use views::BlockView; @@ -157,7 +158,8 @@ pub struct Client { state_db: Mutex, block_queue: RwLock, report: RwLock, - import_lock: Mutex<()> + import_lock: Mutex<()>, + panic_handler: SafeStringPanicHandler, } const HISTORY: u64 = 1000; @@ -198,19 +200,26 @@ impl Client { let mut state_path = path.to_path_buf(); state_path.push("state"); let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); - + let engine = Arc::new(try!(spec.to_engine())); let mut state_db = JournalDB::new_with_arc(db.clone()); if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) { state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } + + let block_queue = BlockQueue::new(engine.clone(), message_channel); + let panic_handler = StringPanicHandler::new_thread_safe(); + let panic = panic_handler.clone(); + block_queue.on_panic(move |t: &String| panic.lock().unwrap().notify_all(t)); + Ok(Arc::new(Client { chain: chain, - engine: engine.clone(), + engine: engine, state_db: Mutex::new(state_db), - block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), + block_queue: RwLock::new(block_queue), report: RwLock::new(Default::default()), import_lock: Mutex::new(()), + panic_handler: panic_handler })) } @@ -348,12 +357,12 @@ impl BlockChainClient for Client { fn block_status(&self, hash: &H256) -> BlockStatus { if self.chain.read().unwrap().is_known(&hash) { - BlockStatus::InChain - } else { - self.block_queue.read().unwrap().block_status(hash) + BlockStatus::InChain + } else { + self.block_queue.read().unwrap().block_status(hash) } } - + fn block_total_difficulty(&self, hash: &H256) -> Option { self.chain.read().unwrap().block_details(hash).map(|d| d.total_difficulty) } @@ -423,3 +432,9 @@ impl BlockChainClient for Client { } } } + +impl MayPanic for Client { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} diff --git a/parity/main.rs b/parity/main.rs index d423caa64..033621362 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -39,9 +39,11 @@ use rlog::{LogLevelFilter}; use env_logger::LogBuilder; use ctrlc::CtrlC; use util::*; +use util::panics::MayPanic; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; +use ethcore::spec; use ethcore::blockchain::CacheSize; use ethsync::EthSync; @@ -84,7 +86,7 @@ fn setup_log(init: &str) { #[cfg(feature = "rpc")] fn setup_rpc_server(client: Arc, sync: Arc, url: &str) { use rpc::v1::*; - + let mut server = rpc::HttpServer::new(1); server.add_delegate(Web3Client::new().to_delegate()); server.add_delegate(EthClient::new(client.clone()).to_delegate()); @@ -97,46 +99,96 @@ fn setup_rpc_server(client: Arc, sync: Arc, url: &str) { fn setup_rpc_server(_client: Arc, _sync: Arc, _url: &str) { } -fn main() { - let args: Args = Args::docopt().decode().unwrap_or_else(|e| e.exit()); - - setup_log(&args.flag_logging); - unsafe { ::fdlimit::raise_fd_limit(); } - - let spec = ethereum::new_frontier(); - let init_nodes = match args.arg_enode.len() { - 0 => spec.nodes().clone(), - _ => args.arg_enode.clone(), - }; - let mut net_settings = NetworkConfiguration::new(); - net_settings.boot_nodes = init_nodes; - match args.flag_address { - None => { - net_settings.listen_address = SocketAddr::from_str(args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"); - net_settings.public_address = SocketAddr::from_str(args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); - } - Some(ref a) => { - net_settings.public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"); - net_settings.listen_address = net_settings.public_address.clone(); +struct Configuration { + args: Args +} +impl Configuration { + fn parse() -> Self { + Configuration { + args: Args::docopt().decode().unwrap_or_else(|e| e.exit()) } } - let mut service = ClientService::start(spec, net_settings).unwrap(); - let client = service.client().clone(); - client.configure_cache(args.flag_cache_pref_size, args.flag_cache_max_size); - let sync = EthSync::register(service.network(), client); - if args.flag_jsonrpc { - setup_rpc_server(service.client(), sync.clone(), &args.flag_jsonrpc_url); - } - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync }); - service.io().register_handler(io_handler).expect("Error registering IO handler"); + fn get_init_nodes(&self, spec: &spec::Spec) -> Vec { + match self.args.arg_enode.len() { + 0 => spec.nodes().clone(), + _ => self.args.arg_enode.clone(), + } + } + + fn get_net_addresses(&self) -> (SocketAddr, SocketAddr) { + let listen_address; + let public_address; + + match self.args.flag_address { + None => { + listen_address = SocketAddr::from_str(self.args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"); + public_address = SocketAddr::from_str(self.args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); + } + Some(ref a) => { + public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"); + listen_address = public_address.clone(); + } + }; + + (listen_address, public_address) + } +} + +fn wait_for_exit(client: Arc) { let exit = Arc::new(Condvar::new()); + // Handle possible exits let e = exit.clone(); CtrlC::set_handler(move || { e.notify_all(); }); + let e = exit.clone(); + client.on_panic(move |_t: &String| { e.notify_all(); }); + // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); } +fn main() { + let conf = Configuration::parse(); + let spec = ethereum::new_frontier(); + + // Setup logging + setup_log(&conf.args.flag_logging); + // Raise fdlimit + unsafe { ::fdlimit::raise_fd_limit(); } + + // Configure network + let init_nodes = conf.get_init_nodes(&spec); + let (listen, public) = conf.get_net_addresses(); + let mut net_settings = NetworkConfiguration::new(); + net_settings.boot_nodes = init_nodes; + net_settings.listen_address = listen; + net_settings.public_address = public; + + // Build client + let mut service = ClientService::start(spec, net_settings).unwrap(); + let client = service.client().clone(); + client.configure_cache(conf.args.flag_cache_pref_size, conf.args.flag_cache_max_size); + + // Sync + let sync = EthSync::register(service.network(), client); + + // Setup rpc + if conf.args.flag_jsonrpc { + setup_rpc_server(service.client(), sync.clone(), &conf.args.flag_jsonrpc_url); + } + + // Register IO handler + let io_handler = Arc::new(ClientIoHandler { + client: service.client(), + info: Default::default(), + sync: sync + }); + service.io().register_handler(io_handler).expect("Error registering IO handler"); + + // Handle exit + wait_for_exit(service.client()); +} + struct Informant { chain_info: RwLock>, cache_info: RwLock>, @@ -200,7 +252,7 @@ struct ClientIoHandler { } impl IoHandler for ClientIoHandler { - fn initialize(&self, io: &IoContext) { + fn initialize(&self, io: &IoContext) { io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); } diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 4f16efd30..40cdbc368 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -31,16 +31,16 @@ //! //! impl IoHandler for MyHandler { //! fn initialize(&self, io: &IoContext) { -//! io.register_timer(0, 1000).unwrap(); -//! } +//! io.register_timer(0, 1000).unwrap(); +//! } //! -//! fn timeout(&self, _io: &IoContext, timer: TimerToken) { -//! println!("Timeout {}", timer); -//! } +//! fn timeout(&self, _io: &IoContext, timer: TimerToken) { +//! println!("Timeout {}", timer); +//! } //! -//! fn message(&self, _io: &IoContext, message: &MyMessage) { -//! println!("Message {}", message.data); -//! } +//! fn message(&self, _io: &IoContext, message: &MyMessage) { +//! println!("Message {}", message.data); +//! } //! } //! //! fn main () { @@ -70,7 +70,7 @@ impl From<::mio::NotifyError>> for IoError } } -/// Generic IO handler. +/// Generic IO handler. /// All the handler function are called from within IO event loop. /// `Message` type is used as notification data pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + 'static { @@ -82,7 +82,7 @@ pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + ' fn message(&self, _io: &IoContext, _message: &Message) {} /// Called when an IO stream gets closed fn stream_hup(&self, _io: &IoContext, _stream: StreamToken) {} - /// Called when an IO stream can be read from + /// Called when an IO stream can be read from fn stream_readable(&self, _io: &IoContext, _stream: StreamToken) {} /// Called when an IO stream can be written to fn stream_writable(&self, _io: &IoContext, _stream: StreamToken) {} diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 71b2520ed..1f4eeea09 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -25,6 +25,7 @@ use io::{IoError, IoHandler}; use arrayvec::*; use crossbeam::sync::chase_lev; use io::worker::{Worker, Work, WorkType}; +use panics::*; /// Timer ID pub type TimerToken = usize; @@ -164,7 +165,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { let num_workers = 4; let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready = Arc::new(Condvar::new()); - let workers = (0..num_workers).map(|i| + let workers = (0..num_workers).map(|i| Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); let mut io = IoManager { @@ -306,19 +307,32 @@ impl IoChannel where Message: Send + Clone { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type pub struct IoService where Message: Send + Sync + Clone + 'static { + panic_handler: SafeStringPanicHandler, thread: Option>, host_channel: Sender>, } +impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl IoService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop pub fn start() -> Result, UtilError> { + let panic_handler = StringPanicHandler::new_thread_safe(); let mut event_loop = EventLoop::new().unwrap(); let channel = event_loop.channel(); + let panic = panic_handler.clone(); let thread = thread::spawn(move || { - IoManager::::start(&mut event_loop).unwrap(); //TODO: + let mut panic = panic.lock().unwrap(); + panic.catch_panic(move || { + IoManager::::start(&mut event_loop).unwrap(); + }).unwrap() }); Ok(IoService { + panic_handler: panic_handler, thread: Some(thread), host_channel: channel }) diff --git a/util/src/io/worker.rs b/util/src/io/worker.rs index fa8a0fa2c..84979140b 100644 --- a/util/src/io/worker.rs +++ b/util/src/io/worker.rs @@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use crossbeam::sync::chase_lev; use io::service::{HandlerId, IoChannel, IoContext}; use io::{IoHandler}; +use panics::*; pub enum WorkType { Readable, @@ -43,32 +44,41 @@ pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, + panic_handler: SafeStringPanicHandler, } impl Worker { /// Creates a new worker instance. - pub fn new(index: usize, - stealer: chase_lev::Stealer>, + pub fn new(index: usize, + stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, - wait_mutex: Arc>) -> Worker + wait_mutex: Arc>) -> Worker where Message: Send + Sync + Clone + 'static { + let panic_handler = StringPanicHandler::new_thread_safe(); let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { + panic_handler: panic_handler.clone(), thread: None, wait: wait.clone(), deleting: deleting.clone(), }; + let panic_handler = panic_handler.clone(); worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( - move || Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)) + move || { + let mut panic = panic_handler.lock().unwrap(); + panic.catch_panic(move || { + Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) + }).unwrap() + }) .expect("Error creating worker thread")); worker } fn work_loop(stealer: chase_lev::Stealer>, - channel: IoChannel, wait: Arc, - wait_mutex: Arc>, - deleting: Arc) + channel: IoChannel, wait: Arc, + wait_mutex: Arc>, + deleting: Arc) where Message: Send + Sync + Clone + 'static { while !deleting.load(AtomicOrdering::Relaxed) { { @@ -105,6 +115,12 @@ impl Worker { } } +impl MayPanic for Worker { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.panic_handler.on_panic(closure); + } +} + impl Drop for Worker { fn drop(&mut self) { self.deleting.store(true, AtomicOrdering::Relaxed); diff --git a/util/src/lib.rs b/util/src/lib.rs index b48352582..260ef4301 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -21,7 +21,8 @@ #![feature(plugin)] #![plugin(clippy)] #![allow(needless_range_loop, match_bool)] -#![feature(std_panic, recover)] +#![feature(catch_panic)] + //! Ethcore-util library //! //! ### Rust version: diff --git a/util/src/panics.rs b/util/src/panics.rs index 4e1365636..dee5f3076 100644 --- a/util/src/panics.rs +++ b/util/src/panics.rs @@ -17,10 +17,9 @@ //! Panic utilities use std::thread; -use std::panic; -use std::sync::Mutex; -use std::any::Any; use std::ops::DerefMut; +use std::any::Any; +use std::sync::{Arc, Mutex}; pub trait OnPanicListener: Send + Sync + 'static { fn call(&mut self, arg: &T); @@ -33,26 +32,37 @@ impl OnPanicListener for F } } -pub trait ArgsConverter { +pub trait ArgsConverter : Send + Sync { fn convert(&self, t: &Box) -> Option; } pub trait MayPanic { - fn on_panic(&mut self, closure: F) + fn on_panic(&self, closure: F) where F: OnPanicListener; } pub trait PanicHandler> : MayPanic{ - fn new(converter: C) -> Self; + fn with_converter(converter: C) -> Self; fn catch_panic(&mut self, g: G) -> thread::Result - where G: FnOnce() -> R + panic::RecoverSafe; + where G: FnOnce() -> R + Send + 'static; + fn notify_all(&mut self, &T); } +pub type SafeStringPanicHandler = Arc>; + +impl MayPanic for SafeStringPanicHandler { + fn on_panic(&self, closure: F) where F: OnPanicListener { + self.lock().unwrap().on_panic(closure); + } +} pub struct StringConverter; impl ArgsConverter for StringConverter { fn convert(&self, t: &Box) -> Option { - t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()) + let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()); + let as_string = t.downcast_ref::().cloned(); + + as_str.or(as_string) } } @@ -62,86 +72,152 @@ pub struct BasePanicHandler listeners: Mutex>>> } -impl BasePanicHandler - where C: ArgsConverter, T: 'static { - fn notify_all(&mut self, res: Option) { - if let None = res { - return; - } - let r = res.unwrap(); - let mut listeners = self.listeners.lock().unwrap(); - for listener in listeners.deref_mut() { - listener.call(&r); - } - } -} - impl PanicHandler for BasePanicHandler where C: ArgsConverter, T: 'static { - fn new(converter: C) -> Self { + fn with_converter(converter: C) -> Self { BasePanicHandler { converter: converter, listeners: Mutex::new(vec![]) } } - fn catch_panic(&mut self, g: G) -> thread::Result - where G: FnOnce() -> R + panic::RecoverSafe { - let result = panic::recover(g); + #[allow(deprecated)] + // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) + fn catch_panic(&mut self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + let result = thread::catch_panic(g); - println!("After calling function"); - if let Err(ref e) = result { - let res = self.converter.convert(e); - println!("Got error. Notifying"); - self.notify_all(res); + if let Err(ref e) = result { + let res = self.converter.convert(e); + if let Some(r) = res { + self.notify_all(&r); } - - result } + + result + } + + fn notify_all(&mut self, r: &T) { + let mut listeners = self.listeners.lock().unwrap(); + for listener in listeners.deref_mut() { + listener.call(r); + } + } } impl MayPanic for BasePanicHandler where C: ArgsConverter, T: 'static { - fn on_panic(&mut self, closure: F) + fn on_panic(&self, closure: F) where F: OnPanicListener { self.listeners.lock().unwrap().push(Box::new(closure)); } } +pub struct StringPanicHandler { + handler: BasePanicHandler +} + +impl StringPanicHandler { + pub fn new_thread_safe() -> SafeStringPanicHandler { + Arc::new(Mutex::new(Self::new())) + } + + pub fn new () -> Self { + Self::with_converter(StringConverter) + } +} + +impl PanicHandler for StringPanicHandler { + + fn with_converter(converter: StringConverter) -> Self { + StringPanicHandler { + handler: BasePanicHandler::with_converter(converter) + } + } + + fn catch_panic(&mut self, g: G) -> thread::Result where G: FnOnce() -> R + Send + 'static { + self.handler.catch_panic(g) + } + + fn notify_all(&mut self, r: &String) { + self.handler.notify_all(r); + } +} + +impl MayPanic for StringPanicHandler { + fn on_panic(&self, closure: F) + where F: OnPanicListener { + self.handler.on_panic(closure) + } +} + #[test] fn should_notify_listeners_about_panic () { - use std::sync::{Arc, RwLock}; - + use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = BasePanicHandler::new(StringConverter); + let mut p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when - p.catch_panic(|| panic!("Panic!")); + p.catch_panic(|| panic!("Panic!")).unwrap_err(); // then assert!(invocations.read().unwrap()[0] == "Panic!"); } +#[test] +fn should_notify_listeners_about_panic_when_string_is_dynamic () { + use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let mut p = StringPanicHandler::new(); + p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + + // when + p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic: 1"); +} + #[test] fn should_notify_listeners_about_panic_in_other_thread () { use std::thread; - use std::sync::{Arc, RwLock}; + use std::sync::RwLock; // given let invocations = Arc::new(RwLock::new(vec![])); let i = invocations.clone(); - let mut p = BasePanicHandler::new(StringConverter); + let mut p = StringPanicHandler::new(); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); // when let t = thread::spawn(move || - p.catch_panic(|| panic!("Panic!")) + p.catch_panic(|| panic!("Panic!")).unwrap() ); - t.join(); + t.join().unwrap_err(); + + // then + assert!(invocations.read().unwrap()[0] == "Panic!"); +} + +#[test] +fn should_forward_panics () { +use std::sync::RwLock; + // given + let invocations = Arc::new(RwLock::new(vec![])); + let i = invocations.clone(); + let mut p = StringPanicHandler::new(); + p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); + + let mut p2 = StringPanicHandler::new(); + p2.on_panic(move |t: &String| p.notify_all(t)); + + // when + p2.catch_panic(|| panic!("Panic!")).unwrap_err(); // then assert!(invocations.read().unwrap()[0] == "Panic!");