Removing overengineered stuff

This commit is contained in:
Tomusdrw 2016-02-10 15:28:43 +01:00
parent 0d121dd51a
commit 7925642b1b
6 changed files with 65 additions and 108 deletions

View File

@ -60,7 +60,7 @@ impl BlockQueueInfo {
/// A queue of blocks. Sits between network or other I/O and the BlockChain. /// A queue of blocks. Sits between network or other I/O and the BlockChain.
/// Sorts them ready for blockchain insertion. /// Sorts them ready for blockchain insertion.
pub struct BlockQueue { pub struct BlockQueue {
panic_handler: Arc<StringPanicHandler>, panic_handler: Arc<PanicHandler>,
engine: Arc<Box<Engine>>, engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>, more_to_verify: Arc<Condvar>,
verification: Arc<Mutex<Verification>>, verification: Arc<Mutex<Verification>>,
@ -115,7 +115,7 @@ impl BlockQueue {
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let empty = Arc::new(Condvar::new()); let empty = Arc::new(Condvar::new());
let panic_handler = StringPanicHandler::new_arc(); let panic_handler = PanicHandler::new_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;
@ -337,8 +337,8 @@ impl BlockQueue {
} }
} }
impl MayPanic<String> for BlockQueue { impl MayPanic for BlockQueue {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> { fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure); self.panic_handler.on_panic(closure);
} }
} }

View File

@ -162,7 +162,7 @@ pub struct Client {
block_queue: RwLock<BlockQueue>, block_queue: RwLock<BlockQueue>,
report: RwLock<ClientReport>, report: RwLock<ClientReport>,
import_lock: Mutex<()>, import_lock: Mutex<()>,
panic_handler: Arc<StringPanicHandler>, panic_handler: Arc<PanicHandler>,
} }
const HISTORY: u64 = 1000; const HISTORY: u64 = 1000;
@ -211,9 +211,9 @@ impl Client {
} }
let block_queue = BlockQueue::new(engine.clone(), message_channel); let block_queue = BlockQueue::new(engine.clone(), message_channel);
let panic_handler = StringPanicHandler::new_arc(); let panic_handler = PanicHandler::new_arc();
let panic = panic_handler.clone(); let panic = panic_handler.clone();
block_queue.on_panic(move |t: &String| panic.notify_all(t)); block_queue.on_panic(move |t| panic.notify_all(t));
Ok(Arc::new(Client { Ok(Arc::new(Client {
chain: chain, chain: chain,
@ -440,8 +440,8 @@ impl BlockChainClient for Client {
} }
} }
impl MayPanic<String> for Client { impl MayPanic for Client {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> { fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure); self.panic_handler.on_panic(closure);
} }
} }

View File

@ -169,7 +169,7 @@ fn wait_for_exit(client: Arc<Client>) {
let e = exit.clone(); let e = exit.clone();
CtrlC::set_handler(move || { e.notify_all(); }); CtrlC::set_handler(move || { e.notify_all(); });
let e = exit.clone(); let e = exit.clone();
client.on_panic(move |_t: &String| { e.notify_all(); }); client.on_panic(move |_reason| { e.notify_all(); });
// Wait for signal // Wait for signal
let mutex = Mutex::new(()); let mutex = Mutex::new(());
let _ = exit.wait(mutex.lock().unwrap()).unwrap(); let _ = exit.wait(mutex.lock().unwrap()).unwrap();

View File

@ -307,13 +307,13 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
/// General IO Service. Starts an event loop and dispatches IO requests. /// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type /// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static { pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: Arc<StringPanicHandler>, panic_handler: Arc<PanicHandler>,
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>, host_channel: Sender<IoMessage<Message>>,
} }
impl<Message> MayPanic<String> for IoService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> { fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure); self.panic_handler.on_panic(closure);
} }
} }
@ -321,7 +321,7 @@ impl<Message> MayPanic<String> for IoService<Message> where Message: Send + Sync
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop /// Starts IO event loop
pub fn start() -> Result<IoService<Message>, UtilError> { pub fn start() -> Result<IoService<Message>, UtilError> {
let panic_handler = StringPanicHandler::new_arc(); let panic_handler = PanicHandler::new_arc();
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel(); let channel = event_loop.channel();
let panic = panic_handler.clone(); let panic = panic_handler.clone();

View File

@ -44,7 +44,7 @@ pub struct Worker {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>, wait: Arc<Condvar>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
panic_handler: Arc<StringPanicHandler>, panic_handler: Arc<PanicHandler>,
} }
impl Worker { impl Worker {
@ -55,7 +55,7 @@ impl Worker {
wait: Arc<Condvar>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>) -> Worker wait_mutex: Arc<Mutex<()>>) -> Worker
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
let panic_handler = StringPanicHandler::new_arc(); let panic_handler = PanicHandler::new_arc();
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker { let mut worker = Worker {
panic_handler: panic_handler.clone(), panic_handler: panic_handler.clone(),
@ -114,8 +114,8 @@ impl Worker {
} }
} }
impl MayPanic<String> for Worker { impl MayPanic for Worker {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> { fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
self.panic_handler.on_panic(closure); self.panic_handler.on_panic(closure);
} }
} }

View File

@ -21,126 +21,83 @@ use std::ops::DerefMut;
use std::any::Any; use std::any::Any;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub trait OnPanicListener<T>: Send + Sync + 'static { /// Thread-safe closure for handling possible panics
fn call(&mut self, arg: &T); pub trait OnPanicListener: Send + Sync + 'static {
/// Invoke listener
fn call(&mut self, arg: &str);
} }
impl<F, T> OnPanicListener<T> for F /// Trait indicating that the structure catches some of the panics (most probably from spawned threads)
where F: FnMut(&T) + Send + Sync + 'static { /// and it's possbile to be notified when one of the threads panics.
fn call(&mut self, arg: &T) { pub trait MayPanic {
self(arg) /// `closure` will be invoked whenever panic in thread is caught
}
}
pub trait ArgsConverter<T> : Send + Sync {
fn convert(&self, t: &Box<Any + Send>) -> Option<T>;
}
pub trait MayPanic<T> {
fn on_panic<F>(&self, closure: F) fn on_panic<F>(&self, closure: F)
where F: OnPanicListener<T>; where F: OnPanicListener;
} }
pub trait PanicHandler<T, C: ArgsConverter<T>> : MayPanic<T>{ /// Structure that allows to catch panics and notify listeners
fn with_converter(converter: C) -> Self; pub struct PanicHandler {
fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> listeners: Mutex<Vec<Box<OnPanicListener>>>
where G: FnOnce() -> R + Send + 'static;
fn notify_all(&self, &T);
} }
pub struct StringConverter; impl PanicHandler {
impl ArgsConverter<String> for StringConverter { /// Creates new `PanicHandler` wrapped in `Arc`
fn convert(&self, t: &Box<Any + Send>) -> Option<String> { pub fn new_arc() -> Arc<PanicHandler> {
let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned()); Arc::new(Self::new())
let as_string = t.downcast_ref::<String>().cloned();
as_str.or(as_string)
} }
}
pub struct BasePanicHandler<T, C> /// Creates new `PanicHandler`
where C: ArgsConverter<T>, T: 'static { pub fn new() -> PanicHandler {
converter: C, PanicHandler {
listeners: Mutex<Vec<Box<OnPanicListener<T>>>>
}
impl<T, C> PanicHandler<T, C> for BasePanicHandler<T, C>
where C: ArgsConverter<T>, T: 'static {
fn with_converter(converter: C) -> Self {
BasePanicHandler {
converter: converter,
listeners: Mutex::new(vec![]) listeners: Mutex::new(vec![])
} }
} }
/// Invoke closure and catch any possible panics.
/// In case of panic notifies all listeners about it.
#[allow(deprecated)] #[allow(deprecated)]
// TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex)
fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static { pub fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
let result = thread::catch_panic(g); let result = thread::catch_panic(g);
if let Err(ref e) = result { if let Err(ref e) = result {
let res = self.converter.convert(e); let res = convert_to_string(e);
if let Some(r) = res { if let Some(r) = res {
self.notify_all(&r); self.notify_all(r);
} }
} }
result result
} }
fn notify_all(&self, r: &T) { /// Notify listeners about panic
pub fn notify_all(&self, r: String) {
let mut listeners = self.listeners.lock().unwrap(); let mut listeners = self.listeners.lock().unwrap();
for listener in listeners.deref_mut() { for listener in listeners.deref_mut() {
listener.call(r); listener.call(&r);
} }
} }
} }
impl<T, C> MayPanic<T> for BasePanicHandler<T, C> impl MayPanic for PanicHandler {
where C: ArgsConverter<T>, T: 'static {
fn on_panic<F>(&self, closure: F) fn on_panic<F>(&self, closure: F)
where F: OnPanicListener<T> { where F: OnPanicListener {
self.listeners.lock().unwrap().push(Box::new(closure)); self.listeners.lock().unwrap().push(Box::new(closure));
} }
} }
pub struct StringPanicHandler { impl<F> OnPanicListener for F
handler: BasePanicHandler<String, StringConverter> where F: FnMut(String) + Send + Sync + 'static {
} fn call(&mut self, arg: &str) {
self(arg.to_owned())
impl StringPanicHandler {
pub fn new_arc() -> Arc<StringPanicHandler> {
Arc::new(Self::new())
}
pub fn new () -> Self {
Self::with_converter(StringConverter)
} }
} }
impl PanicHandler<String, StringConverter> for StringPanicHandler { fn convert_to_string(t: &Box<Any + Send>) -> Option<String> {
let as_str = t.downcast_ref::<&'static str>().map(|t| t.clone().to_owned());
let as_string = t.downcast_ref::<String>().cloned();
fn with_converter(converter: StringConverter) -> Self { as_str.or(as_string)
StringPanicHandler {
handler: BasePanicHandler::with_converter(converter)
}
}
fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
self.handler.catch_panic(g)
}
fn notify_all(&self, r: &String) {
self.handler.notify_all(r);
}
}
impl MayPanic<String> for StringPanicHandler {
fn on_panic<F>(&self, closure: F)
where F: OnPanicListener<String> {
self.handler.on_panic(closure)
}
} }
#[test] #[test]
@ -149,8 +106,8 @@ fn should_notify_listeners_about_panic () {
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let p = StringPanicHandler::new(); let p = PanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t| i.write().unwrap().push(t));
// when // when
p.catch_panic(|| panic!("Panic!")).unwrap_err(); p.catch_panic(|| panic!("Panic!")).unwrap_err();
@ -165,8 +122,8 @@ fn should_notify_listeners_about_panic_when_string_is_dynamic () {
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let p = StringPanicHandler::new(); let p = PanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t| i.write().unwrap().push(t));
// when // when
p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err(); p.catch_panic(|| panic!("Panic: {}", 1)).unwrap_err();
@ -183,8 +140,8 @@ fn should_notify_listeners_about_panic_in_other_thread () {
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let p = StringPanicHandler::new(); let p = PanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t| i.write().unwrap().push(t));
// when // when
let t = thread::spawn(move || let t = thread::spawn(move ||
@ -202,11 +159,11 @@ use std::sync::RwLock;
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let p = StringPanicHandler::new(); let p = PanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t| i.write().unwrap().push(t));
let p2 = StringPanicHandler::new(); let p2 = PanicHandler::new();
p2.on_panic(move |t: &String| p.notify_all(t)); p2.on_panic(move |t| p.notify_all(t));
// when // when
p2.catch_panic(|| panic!("Panic!")).unwrap_err(); p2.catch_panic(|| panic!("Panic!")).unwrap_err();