Removing unecessary locks causing dead-locks

This commit is contained in:
Tomusdrw 2016-02-10 14:49:31 +01:00
parent 31bcc541d0
commit 0d121dd51a
5 changed files with 24 additions and 35 deletions

View File

@ -60,7 +60,7 @@ impl BlockQueueInfo {
/// A queue of blocks. Sits between network or other I/O and the BlockChain. /// A queue of blocks. Sits between network or other I/O and the BlockChain.
/// Sorts them ready for blockchain insertion. /// Sorts them ready for blockchain insertion.
pub struct BlockQueue { pub struct BlockQueue {
panic_handler: SafeStringPanicHandler, panic_handler: Arc<StringPanicHandler>,
engine: Arc<Box<Engine>>, engine: Arc<Box<Engine>>,
more_to_verify: Arc<Condvar>, more_to_verify: Arc<Condvar>,
verification: Arc<Mutex<Verification>>, verification: Arc<Mutex<Verification>>,
@ -115,7 +115,7 @@ impl BlockQueue {
let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel });
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let empty = Arc::new(Condvar::new()); let empty = Arc::new(Condvar::new());
let panic_handler = StringPanicHandler::new_thread_safe(); let panic_handler = StringPanicHandler::new_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2; let thread_count = max(::num_cpus::get(), 3) - 2;
@ -131,8 +131,7 @@ impl BlockQueue {
thread::Builder::new() thread::Builder::new()
.name(format!("Verifier #{}", i)) .name(format!("Verifier #{}", i))
.spawn(move || { .spawn(move || {
let mut panic = panic_handler.lock().unwrap(); panic_handler.catch_panic(move || {
panic.catch_panic(move || {
BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty) BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty)
}).unwrap() }).unwrap()
}) })

View File

@ -162,7 +162,7 @@ pub struct Client {
block_queue: RwLock<BlockQueue>, block_queue: RwLock<BlockQueue>,
report: RwLock<ClientReport>, report: RwLock<ClientReport>,
import_lock: Mutex<()>, import_lock: Mutex<()>,
panic_handler: SafeStringPanicHandler, panic_handler: Arc<StringPanicHandler>,
} }
const HISTORY: u64 = 1000; const HISTORY: u64 = 1000;
@ -211,9 +211,9 @@ impl Client {
} }
let block_queue = BlockQueue::new(engine.clone(), message_channel); let block_queue = BlockQueue::new(engine.clone(), message_channel);
let panic_handler = StringPanicHandler::new_thread_safe(); let panic_handler = StringPanicHandler::new_arc();
let panic = panic_handler.clone(); let panic = panic_handler.clone();
block_queue.on_panic(move |t: &String| panic.lock().unwrap().notify_all(t)); block_queue.on_panic(move |t: &String| panic.notify_all(t));
Ok(Arc::new(Client { Ok(Arc::new(Client {
chain: chain, chain: chain,

View File

@ -307,7 +307,7 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
/// General IO Service. Starts an event loop and dispatches IO requests. /// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type /// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static { pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: SafeStringPanicHandler, panic_handler: Arc<StringPanicHandler>,
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>, host_channel: Sender<IoMessage<Message>>,
} }
@ -321,12 +321,11 @@ impl<Message> MayPanic<String> for IoService<Message> where Message: Send + Sync
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop /// Starts IO event loop
pub fn start() -> Result<IoService<Message>, UtilError> { pub fn start() -> Result<IoService<Message>, UtilError> {
let panic_handler = StringPanicHandler::new_thread_safe(); let panic_handler = StringPanicHandler::new_arc();
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel(); let channel = event_loop.channel();
let panic = panic_handler.clone(); let panic = panic_handler.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let mut panic = panic.lock().unwrap();
panic.catch_panic(move || { panic.catch_panic(move || {
IoManager::<Message>::start(&mut event_loop).unwrap(); IoManager::<Message>::start(&mut event_loop).unwrap();
}).unwrap() }).unwrap()

View File

@ -44,7 +44,7 @@ pub struct Worker {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>, wait: Arc<Condvar>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
panic_handler: SafeStringPanicHandler, panic_handler: Arc<StringPanicHandler>,
} }
impl Worker { impl Worker {
@ -55,7 +55,7 @@ impl Worker {
wait: Arc<Condvar>, wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>) -> Worker wait_mutex: Arc<Mutex<()>>) -> Worker
where Message: Send + Sync + Clone + 'static { where Message: Send + Sync + Clone + 'static {
let panic_handler = StringPanicHandler::new_thread_safe(); let panic_handler = StringPanicHandler::new_arc();
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker { let mut worker = Worker {
panic_handler: panic_handler.clone(), panic_handler: panic_handler.clone(),
@ -66,8 +66,7 @@ impl Worker {
let panic_handler = panic_handler.clone(); let panic_handler = panic_handler.clone();
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn( worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
move || { move || {
let mut panic = panic_handler.lock().unwrap(); panic_handler.catch_panic(move || {
panic.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap() }).unwrap()
}) })

View File

@ -43,17 +43,9 @@ pub trait MayPanic<T> {
pub trait PanicHandler<T, C: ArgsConverter<T>> : MayPanic<T>{ pub trait PanicHandler<T, C: ArgsConverter<T>> : MayPanic<T>{
fn with_converter(converter: C) -> Self; fn with_converter(converter: C) -> Self;
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R> fn catch_panic<G, R>(&self, g: G) -> thread::Result<R>
where G: FnOnce() -> R + Send + 'static; where G: FnOnce() -> R + Send + 'static;
fn notify_all(&mut self, &T); fn notify_all(&self, &T);
}
pub type SafeStringPanicHandler = Arc<Mutex<StringPanicHandler>>;
impl MayPanic<String> for SafeStringPanicHandler {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener<String> {
self.lock().unwrap().on_panic(closure);
}
} }
pub struct StringConverter; pub struct StringConverter;
@ -84,7 +76,7 @@ impl<T, C> PanicHandler<T, C> for BasePanicHandler<T, C>
#[allow(deprecated)] #[allow(deprecated)]
// TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex) // TODO [todr] catch_panic is deprecated but panic::recover has different bounds (not allowing mutex)
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static { fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
let result = thread::catch_panic(g); let result = thread::catch_panic(g);
if let Err(ref e) = result { if let Err(ref e) = result {
@ -97,7 +89,7 @@ impl<T, C> PanicHandler<T, C> for BasePanicHandler<T, C>
result result
} }
fn notify_all(&mut self, r: &T) { fn notify_all(&self, r: &T) {
let mut listeners = self.listeners.lock().unwrap(); let mut listeners = self.listeners.lock().unwrap();
for listener in listeners.deref_mut() { for listener in listeners.deref_mut() {
listener.call(r); listener.call(r);
@ -118,8 +110,8 @@ pub struct StringPanicHandler {
} }
impl StringPanicHandler { impl StringPanicHandler {
pub fn new_thread_safe() -> SafeStringPanicHandler { pub fn new_arc() -> Arc<StringPanicHandler> {
Arc::new(Mutex::new(Self::new())) Arc::new(Self::new())
} }
pub fn new () -> Self { pub fn new () -> Self {
@ -135,11 +127,11 @@ impl PanicHandler<String, StringConverter> for StringPanicHandler {
} }
} }
fn catch_panic<G, R>(&mut self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static { fn catch_panic<G, R>(&self, g: G) -> thread::Result<R> where G: FnOnce() -> R + Send + 'static {
self.handler.catch_panic(g) self.handler.catch_panic(g)
} }
fn notify_all(&mut self, r: &String) { fn notify_all(&self, r: &String) {
self.handler.notify_all(r); self.handler.notify_all(r);
} }
} }
@ -157,7 +149,7 @@ fn should_notify_listeners_about_panic () {
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let mut p = StringPanicHandler::new(); let p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
// when // when
@ -173,7 +165,7 @@ fn should_notify_listeners_about_panic_when_string_is_dynamic () {
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let mut p = StringPanicHandler::new(); let p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
// when // when
@ -191,7 +183,7 @@ fn should_notify_listeners_about_panic_in_other_thread () {
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let mut p = StringPanicHandler::new(); let p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
// when // when
@ -210,10 +202,10 @@ use std::sync::RwLock;
// given // given
let invocations = Arc::new(RwLock::new(vec![])); let invocations = Arc::new(RwLock::new(vec![]));
let i = invocations.clone(); let i = invocations.clone();
let mut p = StringPanicHandler::new(); let p = StringPanicHandler::new();
p.on_panic(move |t: &String| i.write().unwrap().push(t.clone())); p.on_panic(move |t: &String| i.write().unwrap().push(t.clone()));
let mut p2 = StringPanicHandler::new(); let p2 = StringPanicHandler::new();
p2.on_panic(move |t: &String| p.notify_all(t)); p2.on_panic(move |t: &String| p.notify_all(t));
// when // when