From b4bc395c6efd85bae818fffcb64c73462231172a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 30 May 2016 20:23:19 +0200 Subject: [PATCH] Adding logs to signing queue --- parity/signer.rs | 2 +- rpc/src/v1/helpers/signing_queue.rs | 8 ++++++- signer/src/lib.rs | 8 +++++-- signer/src/ws_server/mod.rs | 37 ++++++++++++++++++----------- 4 files changed, 37 insertions(+), 18 deletions(-) diff --git a/parity/signer.rs b/parity/signer.rs index 978e4e636..a7de993fb 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -51,7 +51,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> SignerServer { }); let start_result = { - let server = signer::ServerBuilder::new(daps.apis.signer_queue.clone()); + let server = signer::ServerBuilder::new(deps.apis.signer_queue.clone()); let server = rpc_apis::setup_rpc(server, deps.apis, rpc_apis::ApiSet::SafeContext); server.start(addr) }; diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index 65aef8a33..b0625b170 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -166,6 +166,7 @@ impl SigningQueue for ConfirmationsQueue { id: id, transaction: transaction, }); + debug!(target: "own_tx", "Signer: New transaction ({:?}) in confirmation queue.", id); } // Notify listeners self.notify(QueueMessage::NewRequest(id)); @@ -177,12 +178,14 @@ impl SigningQueue for ConfirmationsQueue { } fn request_rejected(&self, id: U256) -> Option { + debug!(target: "own_tx", "Signer: Transaction rejected ({:?}).", id); let o = self.remove(id); self.update_status(id, QueueStatus::Rejected); o } fn request_confirmed(&self, id: U256, hash: H256) -> Option { + debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id); let o = self.remove(id); self.update_status(id, QueueStatus::Confirmed(hash)); o @@ -213,6 +216,7 @@ impl SigningQueue for ConfirmationsQueue { } } + info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", id); // Now wait for a response let deadline = Instant::now() + Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); while Instant::now() < deadline { @@ -233,7 +237,9 @@ impl SigningQueue for ConfirmationsQueue { }, } } - // We reached the timeout. Just return `None` + // We reached the timeout. Just return `None` and make sure to remove waiting. + trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", id); + self.waiters.write().unwrap().remove(&id); None } } diff --git a/signer/src/lib.rs b/signer/src/lib.rs index 74018d9b1..8391d42b4 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -30,11 +30,15 @@ //! //! ``` //! extern crate ethcore_signer; +//! extern crate ethcore_rpc; //! -//! use ethcore_signer::Server; +//! use std::sync::Arc; +//! use ethcore_signer::ServerBuilder; +//! use ethcore_rpc::ConfirmationsQueue; //! //! fn main() { -//! let _server = Server::start("127.0.0.1:8084".parse().unwrap()); +//! let queue = Arc::new(ConfirmationsQueue::default()); +//! let _server = ServerBuilder::new(queue).start("127.0.0.1:8084".parse().unwrap()); //! } //! ``` diff --git a/signer/src/ws_server/mod.rs b/signer/src/ws_server/mod.rs index 910c1af85..0a4499af8 100644 --- a/signer/src/ws_server/mod.rs +++ b/signer/src/ws_server/mod.rs @@ -71,30 +71,22 @@ impl ServerBuilder { /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. pub fn start(self, addr: SocketAddr) -> Result { - Server::start(addr, self.handler).and_then(|(server, broadcaster)| { - // Fire up queue notifications broadcasting - let queue = self.queue.clone(); - thread::spawn(move || { - queue.start_listening(|_message| { - broadcaster.send("new_message").unwrap(); - }).expect("It's the only place we are running start_listening. It shouldn't fail."); - }).expect("We should be able to create the thread"); - - Ok(server) - }) + Server::start(addr, self.handler, self.queue) } } /// `WebSockets` server implementation. pub struct Server { handle: Option>>, + broadcaster_handle: Option>, + queue: Arc, panic_handler: Arc, } impl Server { /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. - fn start(addr: SocketAddr, handler: Arc) -> Result<(Server, ws::Sender), ServerError> { + fn start(addr: SocketAddr, handler: Arc, queue: Arc) -> Result { let config = { let mut config = ws::Settings::default(); config.max_connections = 5; @@ -108,6 +100,7 @@ impl Server { let panic_handler = PanicHandler::new_in_arc(); let ph = panic_handler.clone(); let broadcaster = ws.broadcaster(); + // Spawn a thread with event loop let handle = thread::spawn(move || { ph.catch_panic(move || { @@ -115,11 +108,25 @@ impl Server { }).unwrap() }); + // Spawn a thread for broadcasting + let ph = panic_handler.clone(); + let q = queue.clone(); + let broadcaster_handle = thread::spawn(move || { + ph.catch_panic(move || { + q.start_listening(|_message| { + // TODO [ToDr] Some better structure here for messages. + broadcaster.send("new_message").unwrap(); + }).expect("It's the only place we are running start_listening. It shouldn't fail.") + }).unwrap() + }); + // Return a handle - Ok((Server { + Ok(Server { handle: Some(handle), + broadcaster_handle: Some(broadcaster_handle), + queue: queue, panic_handler: panic_handler, - }, broadcaster)) + }) } } @@ -131,6 +138,8 @@ impl MayPanic for Server { impl Drop for Server { fn drop(&mut self) { + self.queue.finish(); + self.broadcaster_handle.take().unwrap().join().unwrap(); self.handle.take().unwrap().join().unwrap(); } }