Merge pull request #396 from ethcore/close_gently

Catching panics.
This commit is contained in:
Nikolay Volf 2016-02-10 18:55:53 +03:00
commit 66a370af9b
10 changed files with 403 additions and 71 deletions

View File

@ -26,6 +26,7 @@ use views::*;
use header::*; use header::*;
use service::*; use service::*;
use client::BlockStatus; use client::BlockStatus;
use util::panics::*;
/// Block queue status /// Block queue status
#[derive(Debug)] #[derive(Debug)]
@ -59,6 +60,7 @@ impl BlockQueueInfo {
/// A queue of blocks. Sits between network or other I/O and the BlockChain. /// A queue of blocks. Sits between network or other I/O and the BlockChain.
/// Sorts them ready for blockchain insertion. /// Sorts them ready for blockchain insertion.
pub struct BlockQueue { pub struct BlockQueue {
panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>, engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>, more_to_verify: Arc<Condvar>,
verification: Arc<Mutex<Verification>>, verification: Arc<Mutex<Verification>>,
@ -113,6 +115,7 @@ impl BlockQueue {
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let empty = Arc::new(Condvar::new()); let empty = Arc::new(Condvar::new());
let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;
@ -123,11 +126,21 @@ impl BlockQueue {
let ready_signal = ready_signal.clone(); let ready_signal = ready_signal.clone();
let empty = empty.clone(); let empty = empty.clone();
let deleting = deleting.clone(); let deleting = deleting.clone();
verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)) let panic_handler = panic_handler.clone();
.expect("Error starting block verification thread")); verifiers.push(
thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)
}).unwrap()
})
.expect("Error starting block verification thread")
);
} }
BlockQueue { BlockQueue {
engine: engine, engine: engine,
panic_handler: panic_handler,
ready_signal: ready_signal.clone(), ready_signal: ready_signal.clone(),
more_to_verify: more_to_verify.clone(), more_to_verify: more_to_verify.clone(),
verification: verification.clone(), verification: verification.clone(),
@ -150,7 +163,7 @@ impl BlockQueue {
while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) {
lock = wait.wait(lock).unwrap(); lock = wait.wait(lock).unwrap();
} }
if deleting.load(AtomicOrdering::Relaxed) { if deleting.load(AtomicOrdering::Relaxed) {
return; return;
} }
@ -324,6 +337,12 @@ impl BlockQueue {
} }
} }
impl MayPanic for BlockQueue {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}
impl Drop for BlockQueue { impl Drop for BlockQueue {
fn drop(&mut self) { fn drop(&mut self) {
self.clear(); self.clear();

View File

@ -17,6 +17,7 @@
//! Blockchain database client. //! Blockchain database client.
use util::*; use util::*;
use util::panics::*;
use rocksdb::{Options, DB, DBCompactionStyle}; use rocksdb::{Options, DB, DBCompactionStyle};
use blockchain::{BlockChain, BlockProvider, CacheSize, TransactionId}; use blockchain::{BlockChain, BlockProvider, CacheSize, TransactionId};
use views::BlockView; use views::BlockView;
@ -164,7 +165,8 @@ pub struct Client {
state_db: Mutex<JournalDB>, state_db: Mutex<JournalDB>,
block_queue: RwLock<BlockQueue>, block_queue: RwLock<BlockQueue>,
report: RwLock<ClientReport>, report: RwLock<ClientReport>,
import_lock: Mutex<()> import_lock: Mutex<()>,
panic_handler: Arc<PanicHandler>,
} }
const HISTORY: u64 = 1000; const HISTORY: u64 = 1000;
@ -205,19 +207,25 @@ impl Client {
let mut state_path = path.to_path_buf(); let mut state_path = path.to_path_buf();
state_path.push("state"); state_path.push("state");
let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap());
let engine = Arc::new(try!(spec.to_engine())); let engine = Arc::new(try!(spec.to_engine()));
let mut state_db = JournalDB::new_with_arc(db.clone()); let mut state_db = JournalDB::new_with_arc(db.clone());
if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) { if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) {
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
} }
let block_queue = BlockQueue::new(engine.clone(), message_channel);
let panic_handler = PanicHandler::new_in_arc();
panic_handler.forward_from(&block_queue);
Ok(Arc::new(Client { Ok(Arc::new(Client {
chain: chain, chain: chain,
engine: engine.clone(), engine: engine,
state_db: Mutex::new(state_db), state_db: Mutex::new(state_db),
block_queue: RwLock::new(BlockQueue::new(engine, message_channel)), block_queue: RwLock::new(block_queue),
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
import_lock: Mutex::new(()), import_lock: Mutex::new(()),
panic_handler: panic_handler
})) }))
} }
@ -355,12 +363,12 @@ impl BlockChainClient for Client {
fn block_status(&self, hash: &H256) -> BlockStatus { fn block_status(&self, hash: &H256) -> BlockStatus {
if self.chain.read().unwrap().is_known(&hash) { if self.chain.read().unwrap().is_known(&hash) {
BlockStatus::InChain BlockStatus::InChain
} else { } else {
self.block_queue.read().unwrap().block_status(hash) self.block_queue.read().unwrap().block_status(hash)
} }
} }
fn block_total_difficulty(&self, hash: &H256) -> Option<U256> { fn block_total_difficulty(&self, hash: &H256) -> Option<U256> {
self.chain.read().unwrap().block_details(hash).map(|d| d.total_difficulty) self.chain.read().unwrap().block_details(hash).map(|d| d.total_difficulty)
} }
@ -438,3 +446,9 @@ impl BlockChainClient for Client {
} }
} }
} }
impl MayPanic for Client {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}

View File

@ -17,6 +17,7 @@
//! Creates and registers client and network services. //! Creates and registers client and network services.
use util::*; use util::*;
use util::panics::*;
use spec::Spec; use spec::Spec;
use error::*; use error::*;
use std::env; use std::env;
@ -27,7 +28,7 @@ use client::Client;
pub enum SyncMessage { pub enum SyncMessage {
/// New block has been imported into the blockchain /// New block has been imported into the blockchain
NewChainBlock(Bytes), //TODO: use Cow NewChainBlock(Bytes), //TODO: use Cow
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,
} }
@ -38,17 +39,22 @@ pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
pub struct ClientService { pub struct ClientService {
net_service: NetworkService<SyncMessage>, net_service: NetworkService<SyncMessage>,
client: Arc<Client>, client: Arc<Client>,
panic_handler: Arc<PanicHandler>
} }
impl ClientService { impl ClientService {
/// Start the service in a separate thread. /// Start the service in a separate thread.
pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result<ClientService, Error> { pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result<ClientService, Error> {
let panic_handler = PanicHandler::new_in_arc();
let mut net_service = try!(NetworkService::start(net_config)); let mut net_service = try!(NetworkService::start(net_config));
panic_handler.forward_from(&net_service);
info!("Starting {}", net_service.host_info()); info!("Starting {}", net_service.host_info());
info!("Configured for {} using {} engine", spec.name, spec.engine_name); info!("Configured for {} using {} engine", spec.name, spec.engine_name);
let mut dir = env::home_dir().unwrap(); let mut dir = env::home_dir().unwrap();
dir.push(".parity"); dir.push(".parity");
let client = try!(Client::new(spec, &dir, net_service.io().channel())); let client = try!(Client::new(spec, &dir, net_service.io().channel()));
panic_handler.forward_from(client.deref());
let client_io = Arc::new(ClientIoHandler { let client_io = Arc::new(ClientIoHandler {
client: client.clone() client: client.clone()
}); });
@ -57,6 +63,7 @@ impl ClientService {
Ok(ClientService { Ok(ClientService {
net_service: net_service, net_service: net_service,
client: client, client: client,
panic_handler: panic_handler,
}) })
} }
@ -81,6 +88,12 @@ impl ClientService {
} }
} }
impl MayPanic for ClientService {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}
/// IO interface for the Client handler /// IO interface for the Client handler
struct ClientIoHandler { struct ClientIoHandler {
client: Arc<Client> client: Arc<Client>

View File

@ -40,6 +40,7 @@ use rlog::{LogLevelFilter};
use env_logger::LogBuilder; use env_logger::LogBuilder;
use ctrlc::CtrlC; use ctrlc::CtrlC;
use util::*; use util::*;
use util::panics::MayPanic;
use ethcore::spec::*; use ethcore::spec::*;
use ethcore::client::*; use ethcore::client::*;
use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::service::{ClientService, NetSyncMessage};
@ -92,7 +93,7 @@ fn setup_log(init: &str) {
#[cfg(feature = "rpc")] #[cfg(feature = "rpc")]
fn setup_rpc_server(client: Arc<Client>, sync: Arc<EthSync>, url: &str) { fn setup_rpc_server(client: Arc<Client>, sync: Arc<EthSync>, url: &str) {
use rpc::v1::*; use rpc::v1::*;
let mut server = rpc::HttpServer::new(1); let mut server = rpc::HttpServer::new(1);
server.add_delegate(Web3Client::new().to_delegate()); server.add_delegate(Web3Client::new().to_delegate());
server.add_delegate(EthClient::new(client.clone()).to_delegate()); server.add_delegate(EthClient::new(client.clone()).to_delegate());
@ -105,10 +106,17 @@ fn setup_rpc_server(client: Arc<Client>, sync: Arc<EthSync>, url: &str) {
fn setup_rpc_server(_client: Arc<Client>, _sync: Arc<EthSync>, _url: &str) { fn setup_rpc_server(_client: Arc<Client>, _sync: Arc<EthSync>, _url: &str) {
} }
fn main() { struct Configuration {
let args: Args = Args::docopt().decode().unwrap_or_else(|e| e.exit()); args: Args
}
impl Configuration {
fn parse() -> Self {
Configuration {
args: Args::docopt().decode().unwrap_or_else(|e| e.exit())
}
}
if args.flag_version { fn print_version(&self) {
println!("\ println!("\
Parity version {} ({}-{}-{}) Parity version {} ({}-{}-{})
Copyright 2015, 2016 Ethcore (UK) Limited Copyright 2015, 2016 Ethcore (UK) Limited
@ -118,49 +126,100 @@ There is NO WARRANTY, to the extent permitted by law.
By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\
", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os()); ", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os());
}
fn get_spec(&self) -> Spec {
match self.args.flag_chain.as_ref() {
"frontier" | "mainnet" => ethereum::new_frontier(),
"morden" | "testnet" => ethereum::new_morden(),
"olympic" => ethereum::new_olympic(),
f => Spec::from_json_utf8(contents(f).expect("Couldn't read chain specification file. Sure it exists?").as_ref()),
}
}
fn get_init_nodes(&self, spec: &Spec) -> Vec<String> {
match self.args.arg_enode.len() {
0 => spec.nodes().clone(),
_ => self.args.arg_enode.clone(),
}
}
fn get_net_addresses(&self) -> (SocketAddr, SocketAddr) {
let listen_address;
let public_address;
match self.args.flag_address {
None => {
listen_address = SocketAddr::from_str(self.args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address");
public_address = SocketAddr::from_str(self.args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address");
}
Some(ref a) => {
public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address");
listen_address = public_address.clone();
}
};
(listen_address, public_address)
}
}
fn wait_for_exit(client_service: &ClientService) {
let exit = Arc::new(Condvar::new());
// Handle possible exits
let e = exit.clone();
CtrlC::set_handler(move || { e.notify_all(); });
let e = exit.clone();
client_service.on_panic(move |_reason| { e.notify_all(); });
// Wait for signal
let mutex = Mutex::new(());
let _ = exit.wait(mutex.lock().unwrap()).unwrap();
}
fn main() {
let conf = Configuration::parse();
if conf.args.flag_version {
conf.print_version();
return; return;
} }
setup_log(&args.flag_logging); let spec = conf.get_spec();
// Setup logging
setup_log(&conf.args.flag_logging);
// Raise fdlimit
unsafe { ::fdlimit::raise_fd_limit(); } unsafe { ::fdlimit::raise_fd_limit(); }
let spec = match args.flag_chain.as_ref() { // Configure network
"frontier" | "mainnet" => ethereum::new_frontier(), let init_nodes = conf.get_init_nodes(&spec);
"morden" | "testnet" => ethereum::new_morden(), let (listen, public) = conf.get_net_addresses();
"olympic" => ethereum::new_olympic(),
f => Spec::from_json_utf8(contents(f).expect("Couldn't read chain specification file. Sure it exists?").as_ref()),
};
let init_nodes = match args.arg_enode.len() {
0 => spec.nodes().clone(),
_ => args.arg_enode.clone(),
};
let mut net_settings = NetworkConfiguration::new(); let mut net_settings = NetworkConfiguration::new();
net_settings.boot_nodes = init_nodes; net_settings.boot_nodes = init_nodes;
match args.flag_address { net_settings.listen_address = listen;
None => { net_settings.public_address = public;
net_settings.listen_address = SocketAddr::from_str(args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address");
net_settings.public_address = SocketAddr::from_str(args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address"); // Build client
}
Some(ref a) => {
net_settings.public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address");
net_settings.listen_address = net_settings.public_address.clone();
}
}
let mut service = ClientService::start(spec, net_settings).unwrap(); let mut service = ClientService::start(spec, net_settings).unwrap();
let client = service.client().clone(); let client = service.client().clone();
client.configure_cache(args.flag_cache_pref_size, args.flag_cache_max_size); client.configure_cache(conf.args.flag_cache_pref_size, conf.args.flag_cache_max_size);
// Sync
let sync = EthSync::register(service.network(), client); let sync = EthSync::register(service.network(), client);
if args.flag_jsonrpc {
setup_rpc_server(service.client(), sync.clone(), &args.flag_jsonrpc_url); // Setup rpc
if conf.args.flag_jsonrpc {
setup_rpc_server(service.client(), sync.clone(), &conf.args.flag_jsonrpc_url);
} }
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync });
// Register IO handler
let io_handler = Arc::new(ClientIoHandler {
client: service.client(),
info: Default::default(),
sync: sync
});
service.io().register_handler(io_handler).expect("Error registering IO handler"); service.io().register_handler(io_handler).expect("Error registering IO handler");
let exit = Arc::new(Condvar::new()); // Handle exit
let e = exit.clone(); wait_for_exit(&service);
CtrlC::set_handler(move || { e.notify_all(); });
let mutex = Mutex::new(());
let _ = exit.wait(mutex.lock().unwrap()).unwrap();
} }
struct Informant { struct Informant {
@ -226,7 +285,7 @@ struct ClientIoHandler {
} }
impl IoHandler<NetSyncMessage> for ClientIoHandler { impl IoHandler<NetSyncMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<NetSyncMessage>) { fn initialize(&self, io: &IoContext<NetSyncMessage>) {
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
} }

View File

@ -31,16 +31,16 @@
//! //!
//! impl IoHandler<MyMessage> for MyHandler { //! impl IoHandler<MyMessage> for MyHandler {
//! fn initialize(&self, io: &IoContext<MyMessage>) { //! fn initialize(&self, io: &IoContext<MyMessage>) {
//! io.register_timer(0, 1000).unwrap(); //! io.register_timer(0, 1000).unwrap();
//! } //! }
//! //!
//! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) { //! fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
//! println!("Timeout {}", timer); //! println!("Timeout {}", timer);
//! } //! }
//! //!
//! fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) { //! fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
//! println!("Message {}", message.data); //! println!("Message {}", message.data);
//! } //! }
//! } //! }
//! //!
//! fn main () { //! fn main () {
@ -70,7 +70,7 @@ impl<Message> From<::mio::NotifyError<service::IoMessage<Message>>> for IoError
} }
} }
/// Generic IO handler. /// Generic IO handler.
/// All the handler function are called from within IO event loop. /// All the handler function are called from within IO event loop.
/// `Message` type is used as notification data /// `Message` type is used as notification data
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + 'static { pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + 'static {
@ -82,7 +82,7 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + '
fn message(&self, _io: &IoContext<Message>, _message: &Message) {} fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
/// Called when an IO stream gets closed /// Called when an IO stream gets closed
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {} fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Called when an IO stream can be read from /// Called when an IO stream can be read from
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {} fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// Called when an IO stream can be written to /// Called when an IO stream can be written to
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {} fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}

View File

@ -25,6 +25,7 @@ use io::{IoError, IoHandler};
use arrayvec::*; use arrayvec::*;
use crossbeam::sync::chase_lev; use crossbeam::sync::chase_lev;
use io::worker::{Worker, Work, WorkType}; use io::worker::{Worker, Work, WorkType};
use panics::*;
/// Timer ID /// Timer ID
pub type TimerToken = usize; pub type TimerToken = usize;
@ -159,13 +160,21 @@ pub struct IoManager<Message> where Message: Send + Sync {
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
/// Creates a new instance and registers it with the event loop. /// Creates a new instance and registers it with the event loop.
pub fn start(event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> { pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), UtilError> {
let (worker, stealer) = chase_lev::deque(); let (worker, stealer) = chase_lev::deque();
let num_workers = 4; let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(())); let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new()); let work_ready = Arc::new(Condvar::new());
let workers = (0..num_workers).map(|i| let workers = (0..num_workers).map(|i|
Worker::new(i, stealer.clone(), IoChannel::new(event_loop.channel()), work_ready.clone(), work_ready_mutex.clone())).collect(); Worker::new(
i,
stealer.clone(),
IoChannel::new(event_loop.channel()),
work_ready.clone(),
work_ready_mutex.clone(),
panic_handler.clone()
)
).collect();
let mut io = IoManager { let mut io = IoManager {
timers: Arc::new(RwLock::new(HashMap::new())), timers: Arc::new(RwLock::new(HashMap::new())),
@ -306,19 +315,32 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
/// General IO Service. Starts an event loop and dispatches IO requests. /// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type /// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static { pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: Arc<PanicHandler>,
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>, host_channel: Sender<IoMessage<Message>>,
} }
impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop /// Starts IO event loop
pub fn start() -> Result<IoService<Message>, UtilError> { pub fn start() -> Result<IoService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel(); let channel = event_loop.channel();
let panic = panic_handler.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
IoManager::<Message>::start(&mut event_loop).unwrap(); //TODO: let p = panic.clone();
panic.catch_panic(move || {
IoManager::<Message>::start(p, &mut event_loop).unwrap();
}).unwrap()
}); });
Ok(IoService { Ok(IoService {
panic_handler: panic_handler,
thread: Some(thread), thread: Some(thread),
host_channel: channel host_channel: channel
}) })

View File

@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev; use crossbeam::sync::chase_lev;
use io::service::{HandlerId, IoChannel, IoContext}; use io::service::{HandlerId, IoChannel, IoContext};
use io::{IoHandler}; use io::{IoHandler};
use panics::*;
pub enum WorkType<Message> { pub enum WorkType<Message> {
Readable, Readable,
@ -47,12 +48,14 @@ pub struct Worker {
impl Worker { impl Worker {
/// Creates a new worker instance. /// Creates a new worker instance.
pub fn new<Message>(index: usize, pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>, stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, channel: IoChannel<Message>,
wait: Arc<Condvar>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>) -> Worker wait_mutex: Arc<Mutex<()>>,
where Message: Send + Sync + Clone + 'static { panic_handler: Arc<PanicHandler>
) -> Worker
where Message: Send + Sync + Clone + 'static {
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker { let mut worker = Worker {
thread: None, thread: None,
@ -60,15 +63,19 @@ impl Worker {
deleting: deleting.clone(), deleting: deleting.clone(),
}; };
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
move || Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)) move || {
panic_handler.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap()
})
.expect("Error creating worker thread")); .expect("Error creating worker thread"));
worker worker
} }
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>, fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>, channel: IoChannel<Message>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>, wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>) deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
while !deleting.load(AtomicOrdering::Relaxed) { while !deleting.load(AtomicOrdering::Relaxed) {
{ {

View File

@ -21,6 +21,8 @@
#![feature(plugin)] #![feature(plugin)]
#![plugin(clippy)] #![plugin(clippy)]
#![allow(needless_range_loop, match_bool)] #![allow(needless_range_loop, match_bool)]
#![feature(catch_panic)]
//! Ethcore-util library //! Ethcore-util library
//! //!
//! ### Rust version: //! ### Rust version:
@ -54,7 +56,7 @@
//! cd parity //! cd parity
//! cargo build --release //! cargo build --release
//! ``` //! ```
//! //!
//! - OSX: //! - OSX:
//! //!
//! ```bash //! ```bash
@ -129,6 +131,7 @@ pub mod semantic_version;
pub mod io; pub mod io;
pub mod network; pub mod network;
pub mod log; pub mod log;
pub mod panics;
pub use common::*; pub use common::*;
pub use misc::*; pub use misc::*;

View File

@ -16,6 +16,7 @@
use std::sync::*; use std::sync::*;
use error::*; use error::*;
use panics::*;
use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::{NetworkError}; use network::error::{NetworkError};
use network::host::{Host, NetworkIoMessage, ProtocolId}; use network::host::{Host, NetworkIoMessage, ProtocolId};
@ -27,13 +28,17 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static { pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>, io_service: IoService<NetworkIoMessage<Message>>,
host_info: String, host_info: String,
stats: Arc<NetworkStats> stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler>
} }
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop /// Starts IO event loop
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> { pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start()); let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service);
let host = Arc::new(Host::new(config)); let host = Arc::new(Host::new(config));
let stats = host.stats().clone(); let stats = host.stats().clone();
let host_info = host.client_version(); let host_info = host.client_version();
@ -43,6 +48,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
io_service: io_service, io_service: io_service,
host_info: host_info, host_info: host_info,
stats: stats, stats: stats,
panic_handler: panic_handler
}) })
} }
@ -72,3 +78,9 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
} }
} }
impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure);
}
}

183
util/src/panics.rs Normal file
View File

@ -0,0 +1,183 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Panic utilities
use std::thread;
use std::ops::DerefMut;
use std::any::Any;
use std::sync::{Arc, Mutex};
/// Thread-safe closure for handling possible panics
pub trait OnPanicListener: Send + Sync + 'static {
/// Invoke listener
fn call(&mut self, arg: &str);
}
/// Forwards panics from child
pub trait ForwardPanic {
/// Attach `on_panic` listener to `child` and rethrow all panics
fn forward_from<S>(&self, child: &S) where S : MayPanic;
}
/// Trait indicating that the structure catches some of the panics (most probably from spawned threads)
/// and it's possbile to be notified when one of the threads panics.
pub trait MayPanic {
/// `closure` will be invoked whenever panic in thread is caught
fn on_panic<F>(&self, closure: F) where F: OnPanicListener;
}
/// Structure that allows to catch panics and notify listeners
pub struct PanicHandler {
listeners: Mutex<Vec<Box<OnPanicListener>>>
}
impl PanicHandler {
/// Creates new `PanicHandler` wrapped in `Arc`
pub fn new_in_arc() -> Arc<PanicHandler> {
Arc::new(Self::new())
}
/// Creates new `PanicHandler`
pub fn new() -> PanicHandler {
PanicHandler {
listeners: Mutex::new(vec![])
}
}
/// Invoke closure and catch any possible panics.
/// In case of panic notifies all listeners about it.
#[allow(deprecated)]
// TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex)
pub fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
let result = thread::catch_panic(g);
if let Err(ref e) = result {
let res = convert_to_string(e);
if let Some(r) = res {
self.notify_all(r);
}
}
result
}
fn notify_all(&self, r: String) {
let mut listeners = self.listeners.lock().unwrap();
for listener in listeners.deref_mut() {
listener.call(&r);
}
}
}
impl MayPanic for PanicHandler {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.listeners.lock().unwrap().push(Box::new(closure));
}
}
impl ForwardPanic for Arc<PanicHandler> {
fn forward_from<S>(&self, child: &S) where S : MayPanic {
let p = self.clone();
child.on_panic(move |t| p.notify_all(t));
}
}
impl<F> OnPanicListener for F
where F: FnMut(String) + Send + Sync + 'static {
fn call(&mut self, arg: &str) {
self(arg.to_owned())
}
}
fn convert_to_string(t: &Box<Any + Send>) -> Option<String> {
let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned());
let as_string = t.downcast_ref::<String>().cloned();
as_str.or(as_string)
}
#[test]
fn should_notify_listeners_about_panic () {
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
// when
p.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
}
#[test]
fn should_notify_listeners_about_panic_when_string_is_dynamic () {
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
// when
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic: 1");
}
#[test]
fn should_notify_listeners_about_panic_in_other_thread () {
use std::thread;
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new();
p.on_panic(move |t| i.write().unwrap().push(t));
// when
let t = thread::spawn(move ||
p.catch_panic(|| panic!("Panic!")).unwrap()
);
t.join().unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
}
#[test]
fn should_forward_panics () {
use std::sync::RwLock;
// given
let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone();
let p = PanicHandler::new_in_arc();
p.on_panic(move |t| i.write().unwrap().push(t));
let p2 = PanicHandler::new();
p.forward_from(&p2);
// when
p2.catch_panic(|| panic!("Panic!")).unwrap_err();
// then
assert!(invocations.read().unwrap()[0] == "Panic!");
}