From 84882922b462559378ac7a551d7d19bad550ce69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 30 May 2016 19:30:16 +0200 Subject: [PATCH] Sending a notification on every new messages --- parity/signer.rs | 2 +- signer/src/ws_server/mod.rs | 35 +++++++++++++++++++---------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/parity/signer.rs b/parity/signer.rs index d549b89cb..978e4e636 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -51,7 +51,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> SignerServer { }); let start_result = { - let server = signer::ServerBuilder::new(); + let server = signer::ServerBuilder::new(daps.apis.signer_queue.clone()); let server = rpc_apis::setup_rpc(server, deps.apis, rpc_apis::ApiSet::SafeContext); server.start(addr) }; diff --git a/signer/src/ws_server/mod.rs b/signer/src/ws_server/mod.rs index bc8fb33f8..910c1af85 100644 --- a/signer/src/ws_server/mod.rs +++ b/signer/src/ws_server/mod.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::net::SocketAddr; use util::panics::{PanicHandler, OnPanicListener, MayPanic}; use jsonrpc_core::{IoHandler, IoDelegate}; -use rpc::Extendable; +use rpc::{Extendable, ConfirmationsQueue}; mod session; @@ -49,15 +49,10 @@ impl From for ServerError { /// Builder for `WebSockets` server pub struct ServerBuilder { + queue: Arc, handler: Arc, } -impl Default for ServerBuilder { - fn default() -> Self { - ServerBuilder::new() - } -} - impl Extendable for ServerBuilder { fn add_delegate(&self, delegate: IoDelegate) { self.handler.add_delegate(delegate); @@ -66,30 +61,40 @@ impl Extendable for ServerBuilder { impl ServerBuilder { /// Creates new `ServerBuilder` - pub fn new() -> Self { + pub fn new(queue: Arc) -> Self { ServerBuilder { - handler: Arc::new(IoHandler::new()) + queue: queue, + handler: Arc::new(IoHandler::new()), } } /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. pub fn start(self, addr: SocketAddr) -> Result { - Server::start(addr, self.handler) + Server::start(addr, self.handler).and_then(|(server, broadcaster)| { + // Fire up queue notifications broadcasting + let queue = self.queue.clone(); + thread::spawn(move || { + queue.start_listening(|_message| { + broadcaster.send("new_message").unwrap(); + }).expect("It's the only place we are running start_listening. It shouldn't fail."); + }).expect("We should be able to create the thread"); + + Ok(server) + }) } } /// `WebSockets` server implementation. pub struct Server { handle: Option>>, - broadcaster: ws::Sender, panic_handler: Arc, } impl Server { /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. - pub fn start(addr: SocketAddr, handler: Arc) -> Result { + fn start(addr: SocketAddr, handler: Arc) -> Result<(Server, ws::Sender), ServerError> { let config = { let mut config = ws::Settings::default(); config.max_connections = 5; @@ -111,11 +116,10 @@ impl Server { }); // Return a handle - Ok(Server { + Ok((Server { handle: Some(handle), - broadcaster: broadcaster, panic_handler: panic_handler, - }) + }, broadcaster)) } } @@ -127,7 +131,6 @@ impl MayPanic for Server { impl Drop for Server { fn drop(&mut self) { - self.broadcaster.shutdown().expect("WsServer should close nicely."); self.handle.take().unwrap().join().unwrap(); } }