From 9a626c84bc0563d361421f4232219145ddf45cd7 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 2 Jun 2016 12:44:05 +0200 Subject: [PATCH 1/3] fixed #1204 --- rpc/src/v1/impls/eth_filter.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index 4f44f5193..b34a4f703 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -169,8 +169,9 @@ impl EthFilter for EthFilterClient where logs.extend(new_pending_logs); } - // save current block number as next from block number - *block_number = current_number; + // save the number of the next block as a first block from which + // we want to get logs + *block_number = current_number + 1; to_value(&logs) } From 35753f22f7c9524fa420b870a3189bbc13844e15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 2 Jun 2016 15:58:21 +0200 Subject: [PATCH 2/3] Removing leftovers of ethminer (#1207) --- cov.sh | 2 -- doc.sh | 1 - hook.sh | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/cov.sh b/cov.sh index 084e95284..a13db50b9 100755 --- a/cov.sh +++ b/cov.sh @@ -22,7 +22,6 @@ cargo test \ -p ethsync \ -p ethcore-rpc \ -p parity \ - -p ethminer \ -p ethcore-signer \ -p ethcore-dapps \ --no-run || exit $? @@ -37,5 +36,4 @@ kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage t kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_rpc-* kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_signer-* kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_dapps-* -kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethminer-* xdg-open target/coverage/index.html diff --git a/doc.sh b/doc.sh index 0b75f6c38..fb39ef272 100755 --- a/doc.sh +++ b/doc.sh @@ -10,4 +10,3 @@ cargo doc --no-deps --verbose \ -p ethcore-signer \ -p ethcore-dapps \ -p parity \ - -p ethminer diff --git a/hook.sh b/hook.sh index 978f0ca23..adb763f9d 100755 --- a/hook.sh +++ b/hook.sh @@ -7,6 +7,6 @@ echo "set -e" >> $FILE echo "cargo build --features dev" >> $FILE # Build tests echo "cargo test --no-run --features dev \\" >> $FILE -echo " -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer -p ethcore-dapps -p ethcore-signer" >> $FILE +echo " -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethcore-dapps -p ethcore-signer" >> $FILE echo "" >> $FILE chmod +x $FILE From 18dac64abb82fd524a2c0ac27542f0c34be0b4ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 2 Jun 2016 17:05:13 +0200 Subject: [PATCH 3/3] Simple WebSockets notification about new request (#1202) * Splitting methods requiring signing into separate trait * Single place where RPC apis are created. * Separating eth_filter * Separating eth_signing * Stubs for Personal Signer methods * Test for EthSigningQueueClient * TransactionConfirmation API * Exposing PersonalSigner API * Defining ApiSets dependent on context * Removing types * Supporting sending notification to WS connected SystemUIs * Sending a notification on every new messages * Adding logs to signing queue * Shutting down broadcaster * Refactoring the signing queue * Fixing wait loop in case of spurious wake-ups. --- parity/signer.rs | 2 +- rpc/src/v1/helpers/signing_queue.rs | 327 +++++++++++++++++++++++++--- rpc/src/v1/impls/eth.rs | 2 +- rpc/src/v1/impls/eth_signing.rs | 8 +- rpc/src/v1/impls/mod.rs | 10 +- rpc/src/v1/impls/personal.rs | 2 +- rpc/src/v1/impls/personal_signer.rs | 18 +- signer/src/lib.rs | 7 +- signer/src/ws_server/mod.rs | 41 ++-- 9 files changed, 348 insertions(+), 69 deletions(-) diff --git a/parity/signer.rs b/parity/signer.rs index d549b89cb..a7de993fb 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -51,7 +51,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> SignerServer { }); let start_result = { - let server = signer::ServerBuilder::new(); + let server = signer::ServerBuilder::new(deps.apis.signer_queue.clone()); let server = rpc_apis::setup_rpc(server, deps.apis, rpc_apis::ApiSet::SafeContext); server.start(addr) }; diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index eee4328ee..0ded8998c 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -14,78 +14,281 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Mutex; +use std::thread; +use std::time::{Instant, Duration}; +use std::sync::{mpsc, Mutex, RwLock, Arc}; use std::collections::HashMap; use v1::types::{TransactionRequest, TransactionConfirmation}; -use util::U256; +use util::{U256, H256}; + + +/// Possible events happening in the queue that can be listened to. +#[derive(Debug, PartialEq)] +pub enum QueueEvent { + /// Receiver should stop work upon receiving `Finish` message. + Finish, + /// Informs about new request. + NewRequest(U256), + /// Request rejected. + RequestRejected(U256), + /// Request resolved. + RequestConfirmed(U256), +} + +/// Defines possible errors returned from queue receiving method. +#[derive(Debug, PartialEq)] +pub enum QueueError { + /// Returned when method has been already used (no receiver available). + AlreadyUsed, + /// Returned when receiver encounters an error. + ReceiverError(mpsc::RecvError), +} + +/// Message Receiver type +pub type QueueEventReceiver = mpsc::Receiver; /// A queue of transactions awaiting to be confirmed and signed. pub trait SigningQueue: Send + Sync { /// Add new request to the queue. - fn add_request(&self, transaction: TransactionRequest) -> U256; + /// Returns a `ConfirmationPromise` that can be used to await for resolution of given request. + fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise; - /// Remove request from the queue. - fn remove_request(&self, id: U256) -> Option; + /// Removes a request from the queue. + /// Notifies possible token holders that transaction was rejected. + fn request_rejected(&self, id: U256) -> Option; + + /// Removes a request from the queue. + /// Notifies possible token holders that transaction was confirmed and given hash was assigned. + fn request_confirmed(&self, id: U256, hash: H256) -> Option; + + /// Returns a request if it is contained in the queue. + fn peek(&self, id: &U256) -> Option; /// Return copy of all the requests in the queue. fn requests(&self) -> Vec; } -/// Queue for all unconfirmed transactions. -pub struct ConfirmationsQueue { - id: Mutex, - queue: Mutex>, +#[derive(Debug, PartialEq)] +enum ConfirmationResult { + /// The transaction has not yet been confirmed nor rejected. + Waiting, + /// The transaction has been rejected. + Rejected, + /// The transaction has been confirmed. + Confirmed(H256), } -impl Default for ConfirmationsQueue { - fn default() -> Self { - ConfirmationsQueue { - id: Mutex::new(U256::from(0)), - queue: Mutex::new(HashMap::new()), +/// Time you need to confirm the transaction in UI. +/// This is the amount of time token holder will wait before +/// returning `None`. +/// Unless we have a multi-threaded RPC this will lock +/// any other incoming call! +const QUEUE_TIMEOUT_DURATION_SEC : u64 = 20; + +/// A handle to submitted request. +/// Allows to block and wait for a resolution of that request. +pub struct ConfirmationToken { + result: Arc>, + handle: thread::Thread, + request: TransactionConfirmation, +} + +pub struct ConfirmationPromise { + id: U256, + result: Arc>, +} + +impl ConfirmationToken { + /// Submit solution to all listeners + fn resolve(&self, result: Option) { + let mut res = self.result.lock().unwrap(); + *res = result.map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h)); + // Notify listener + self.handle.unpark(); + } + + fn as_promise(&self) -> ConfirmationPromise { + ConfirmationPromise { + id: self.request.id, + result: self.result.clone(), } } } -impl SigningQueue for ConfirmationsQueue { - fn add_request(&self, transaction: TransactionRequest) -> U256 { +impl ConfirmationPromise { + /// Blocks current thread and awaits for + /// resolution of the transaction (rejected / confirmed) + /// Returns `None` if transaction was rejected or timeout reached. + /// Returns `Some(hash)` if transaction was confirmed. + pub fn wait_with_timeout(&self) -> Option { + let timeout = Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); + let deadline = Instant::now() + timeout; + + info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id); + loop { + let now = Instant::now(); + if now >= deadline { + break; + } + // Park thread (may wake up spuriously) + thread::park_timeout(deadline - now); + // Take confirmation result + let res = self.result.lock().unwrap(); + // Check the result + match *res { + ConfirmationResult::Rejected => return None, + ConfirmationResult::Confirmed(h) => return Some(h), + ConfirmationResult::Waiting => continue, + } + } + // We reached the timeout. Just return `None` + trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", self.id); + None + } +} + +/// Queue for all unconfirmed transactions. +pub struct ConfirmationsQueue { + id: Mutex, + queue: RwLock>, + sender: Mutex>, + receiver: Mutex>>, +} + +impl Default for ConfirmationsQueue { + fn default() -> Self { + let (send, recv) = mpsc::channel(); + + ConfirmationsQueue { + id: Mutex::new(U256::from(0)), + queue: RwLock::new(HashMap::new()), + sender: Mutex::new(send), + receiver: Mutex::new(Some(recv)), + } + } +} + +impl ConfirmationsQueue { + /// Blocks the thread and starts listening for notifications regarding all actions in the queue. + /// For each event, `listener` callback will be invoked. + /// This method can be used only once (only single consumer of events can exist). + pub fn start_listening(&self, listener: F) -> Result<(), QueueError> + where F: Fn(QueueEvent) -> () { + let recv = self.receiver.lock().unwrap().take(); + if let None = recv { + return Err(QueueError::AlreadyUsed); + } + let recv = recv.expect("Check for none is done earlier."); + + loop { + let message = try!(recv.recv().map_err(|e| QueueError::ReceiverError(e))); + if let QueueEvent::Finish = message { + return Ok(()); + } + + listener(message); + } + } + + /// Notifies consumer that the communcation is over. + /// No more events will be sent after this function is invoked. + pub fn finish(&self) { + self.notify(QueueEvent::Finish); + } + + /// Notifies receiver about the event happening in this queue. + fn notify(&self, message: QueueEvent) { + // We don't really care about the result + let _ = self.sender.lock().unwrap().send(message); + } + + /// Removes transaction from this queue and notifies `ConfirmationPromise` holders about the result. + /// Notifies also a receiver about that event. + fn remove(&self, id: U256, result: Option) -> Option { + let token = self.queue.write().unwrap().remove(&id); + + if let Some(token) = token { + // notify receiver about the event + self.notify(result.map_or_else( + || QueueEvent::RequestRejected(id), + |_| QueueEvent::RequestConfirmed(id) + )); + // notify token holders about resolution + token.resolve(result); + // return a result + return Some(token.request.clone()); + } + None + } +} + +impl Drop for ConfirmationsQueue { + fn drop(&mut self) { + self.finish(); + } +} + +impl SigningQueue for ConfirmationsQueue { + fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise { // Increment id let id = { let mut last_id = self.id.lock().unwrap(); *last_id = *last_id + U256::from(1); *last_id }; - let mut queue = self.queue.lock().unwrap(); - queue.insert(id, TransactionConfirmation { - id: id, - transaction: transaction, - }); - id + // Add request to queue + let res = { + let mut queue = self.queue.write().unwrap(); + queue.insert(id, ConfirmationToken { + result: Arc::new(Mutex::new(ConfirmationResult::Waiting)), + handle: thread::current(), + request: TransactionConfirmation { + id: id, + transaction: transaction, + }, + }); + debug!(target: "own_tx", "Signer: New transaction ({:?}) in confirmation queue.", id); + queue.get(&id).map(|token| token.as_promise()).expect("Token was just inserted.") + }; + // Notify listeners + self.notify(QueueEvent::NewRequest(id)); + res + } - fn remove_request(&self, id: U256) -> Option { - self.queue.lock().unwrap().remove(&id) + fn peek(&self, id: &U256) -> Option { + self.queue.read().unwrap().get(id).map(|token| token.request.clone()) + } + + fn request_rejected(&self, id: U256) -> Option { + debug!(target: "own_tx", "Signer: Transaction rejected ({:?}).", id); + self.remove(id, None) + } + + fn request_confirmed(&self, id: U256, hash: H256) -> Option { + debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id); + self.remove(id, Some(hash)) } fn requests(&self) -> Vec { - let queue = self.queue.lock().unwrap(); - queue.values().cloned().collect() + let queue = self.queue.read().unwrap(); + queue.values().map(|token| token.request.clone()).collect() } } #[cfg(test)] mod test { + use std::time::Duration; + use std::thread; + use std::sync::{Arc, Mutex}; use util::hash::Address; - use util::numbers::U256; + use util::numbers::{U256, H256}; use v1::types::TransactionRequest; use super::*; - #[test] - fn should_work_for_hashset() { - // given - let queue = ConfirmationsQueue::default(); - - let request = TransactionRequest { + fn request() -> TransactionRequest { + TransactionRequest { from: Address::from(1), to: Some(Address::from(2)), gas_price: None, @@ -93,7 +296,63 @@ mod test { value: Some(U256::from(10_000_000)), data: None, nonce: None, - }; + } + } + + #[test] + fn should_wait_for_hash() { + // given + let queue = Arc::new(ConfirmationsQueue::default()); + let request = request(); + + // when + let q = queue.clone(); + let handle = thread::spawn(move || { + let v = q.add_request(request); + v.wait_with_timeout().expect("Should return hash") + }); + + let id = U256::from(1); + while queue.peek(&id).is_none() { + // Just wait for the other thread to start + thread::sleep(Duration::from_millis(100)); + } + queue.request_confirmed(id, H256::from(1)); + + // then + assert_eq!(handle.join().expect("Thread should finish nicely"), H256::from(1)); + } + + #[test] + fn should_receive_notification() { + // given + let received = Arc::new(Mutex::new(None)); + let queue = Arc::new(ConfirmationsQueue::default()); + let request = request(); + + // when + let q = queue.clone(); + let r = received.clone(); + let handle = thread::spawn(move || { + q.start_listening(move |notification| { + let mut v = r.lock().unwrap(); + *v = Some(notification); + }).expect("Should be closed nicely.") + }); + queue.add_request(request); + queue.finish(); + + // then + handle.join().expect("Thread should finish nicely"); + let r = received.lock().unwrap().take(); + assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1)))); + } + + #[test] + fn should_add_transactions() { + // given + let queue = ConfirmationsQueue::default(); + let request = request(); // when queue.add_request(request.clone()); diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 49cbafe27..ec98d341e 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -499,7 +499,7 @@ impl Eth for EthClient where .and_then(|(raw_transaction, )| { let raw_transaction = raw_transaction.to_vec(); match UntrustedRlp::new(&raw_transaction).as_val() { - Ok(signed_transaction) => dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), signed_transaction), + Ok(signed_transaction) => to_value(&dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), signed_transaction)), Err(_) => to_value(&H256::zero()), } }) diff --git a/rpc/src/v1/impls/eth_signing.rs b/rpc/src/v1/impls/eth_signing.rs index f0973484f..f8c3c343d 100644 --- a/rpc/src/v1/impls/eth_signing.rs +++ b/rpc/src/v1/impls/eth_signing.rs @@ -53,9 +53,9 @@ impl EthSigning for EthSigningQueueClient { from_params::<(TransactionRequest, )>(params) .and_then(|(request, )| { let queue = take_weak!(self.queue); - queue.add_request(request); - // TODO [ToDr] Block and wait for confirmation? - to_value(&H256::zero()) + let id = queue.add_request(request); + let result = id.wait_with_timeout(); + to_value(&result.unwrap_or_else(H256::new)) }) } } @@ -102,7 +102,7 @@ impl EthSigning for EthSigningUnsafeClient where .and_then(|(request, )| { let accounts = take_weak!(self.accounts); match accounts.account_secret(&request.from) { - Ok(secret) => sign_and_dispatch(&self.client, &self.miner, request, secret), + Ok(secret) => to_value(&sign_and_dispatch(&*take_weak!(self.client), &*take_weak!(self.miner), request, secret)), Err(_) => to_value(&H256::zero()) } }) diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index 33c434122..9e154a1c5 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -52,16 +52,14 @@ pub use self::traces::TracesClient; pub use self::rpc::RpcClient; use v1::types::TransactionRequest; -use std::sync::Weak; use ethcore::miner::{AccountDetails, MinerService}; use ethcore::client::MiningBlockChainClient; use ethcore::transaction::{Action, SignedTransaction, Transaction}; use util::numbers::*; use util::rlp::encode; use util::bytes::ToPretty; -use jsonrpc_core::{Error, to_value, Value}; -fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedTransaction) -> Result +fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedTransaction) -> H256 where C: MiningBlockChainClient, M: MinerService { let hash = signed_transaction.hash(); @@ -72,13 +70,11 @@ fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedT } }); - to_value(&import.map(|_| hash).unwrap_or(H256::zero())) + import.map(|_| hash).unwrap_or(H256::zero()) } -fn sign_and_dispatch(client: &Weak, miner: &Weak, request: TransactionRequest, secret: H256) -> Result +fn sign_and_dispatch(client: &C, miner: &M, request: TransactionRequest, secret: H256) -> H256 where C: MiningBlockChainClient, M: MinerService { - let client = take_weak!(client); - let miner = take_weak!(miner); let signed_transaction = { Transaction { diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index e5c3cd1a5..93d13aed7 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -83,7 +83,7 @@ impl Personal for PersonalClient .and_then(|(request, password)| { let accounts = take_weak!(self.accounts); match accounts.locked_account_secret(&request.from, &password) { - Ok(secret) => sign_and_dispatch(&self.client, &self.miner, request, secret), + Ok(secret) => to_value(&sign_and_dispatch(&*take_weak!(self.client), &*take_weak!(self.miner), request, secret)), Err(_) => to_value(&H256::zero()), } }) diff --git a/rpc/src/v1/impls/personal_signer.rs b/rpc/src/v1/impls/personal_signer.rs index 2d52e07f9..148330ced 100644 --- a/rpc/src/v1/impls/personal_signer.rs +++ b/rpc/src/v1/impls/personal_signer.rs @@ -63,19 +63,27 @@ impl PersonalSigner for SignerClient Some(sign_and_dispatch(&self.client, &self.miner, request, secret)), + Ok(secret) => { + let hash = sign_and_dispatch(&*client, &*miner, request, secret); + queue.request_confirmed(id, hash); + Some(to_value(&hash)) + }, Err(_) => None } }) - .unwrap_or_else(|| to_value(&H256::zero())) + .unwrap_or_else(|| { + queue.request_rejected(id); + to_value(&H256::zero()) + }) } ) } @@ -84,7 +92,7 @@ impl PersonalSigner for SignerClient(params).and_then( |(id, )| { let queue = take_weak!(self.queue); - let res = queue.remove_request(id); + let res = queue.request_rejected(id); to_value(&res.is_some()) } ) diff --git a/signer/src/lib.rs b/signer/src/lib.rs index e2df72bcc..8391d42b4 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -30,12 +30,15 @@ //! //! ``` //! extern crate ethcore_signer; +//! extern crate ethcore_rpc; //! +//! use std::sync::Arc; //! use ethcore_signer::ServerBuilder; +//! use ethcore_rpc::ConfirmationsQueue; //! //! fn main() { -//! let builder = ServerBuilder::new(); -//! let _server = builder.start("127.0.0.1:8084".parse().unwrap()).unwrap(); +//! let queue = Arc::new(ConfirmationsQueue::default()); +//! let _server = ServerBuilder::new(queue).start("127.0.0.1:8084".parse().unwrap()); //! } //! ``` diff --git a/signer/src/ws_server/mod.rs b/signer/src/ws_server/mod.rs index bc8fb33f8..c987d7a87 100644 --- a/signer/src/ws_server/mod.rs +++ b/signer/src/ws_server/mod.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::net::SocketAddr; use util::panics::{PanicHandler, OnPanicListener, MayPanic}; use jsonrpc_core::{IoHandler, IoDelegate}; -use rpc::Extendable; +use rpc::{Extendable, ConfirmationsQueue}; mod session; @@ -49,15 +49,10 @@ impl From for ServerError { /// Builder for `WebSockets` server pub struct ServerBuilder { + queue: Arc, handler: Arc, } -impl Default for ServerBuilder { - fn default() -> Self { - ServerBuilder::new() - } -} - impl Extendable for ServerBuilder { fn add_delegate(&self, delegate: IoDelegate) { self.handler.add_delegate(delegate); @@ -66,30 +61,32 @@ impl Extendable for ServerBuilder { impl ServerBuilder { /// Creates new `ServerBuilder` - pub fn new() -> Self { + pub fn new(queue: Arc) -> Self { ServerBuilder { - handler: Arc::new(IoHandler::new()) + queue: queue, + handler: Arc::new(IoHandler::new()), } } /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. pub fn start(self, addr: SocketAddr) -> Result { - Server::start(addr, self.handler) + Server::start(addr, self.handler, self.queue) } } /// `WebSockets` server implementation. pub struct Server { handle: Option>>, - broadcaster: ws::Sender, + broadcaster_handle: Option>, + queue: Arc, panic_handler: Arc, } impl Server { /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. - pub fn start(addr: SocketAddr, handler: Arc) -> Result { + fn start(addr: SocketAddr, handler: Arc, queue: Arc) -> Result { let config = { let mut config = ws::Settings::default(); config.max_connections = 5; @@ -103,6 +100,7 @@ impl Server { let panic_handler = PanicHandler::new_in_arc(); let ph = panic_handler.clone(); let broadcaster = ws.broadcaster(); + // Spawn a thread with event loop let handle = thread::spawn(move || { ph.catch_panic(move || { @@ -110,10 +108,24 @@ impl Server { }).unwrap() }); + // Spawn a thread for broadcasting + let ph = panic_handler.clone(); + let q = queue.clone(); + let broadcaster_handle = thread::spawn(move || { + ph.catch_panic(move || { + q.start_listening(|_message| { + // TODO [ToDr] Some better structure here for messages. + broadcaster.send("new_message").unwrap(); + }).expect("It's the only place we are running start_listening. It shouldn't fail."); + broadcaster.shutdown().expect("Broadcaster should close gently.") + }).unwrap() + }); + // Return a handle Ok(Server { handle: Some(handle), - broadcaster: broadcaster, + broadcaster_handle: Some(broadcaster_handle), + queue: queue, panic_handler: panic_handler, }) } @@ -127,7 +139,8 @@ impl MayPanic for Server { impl Drop for Server { fn drop(&mut self) { - self.broadcaster.shutdown().expect("WsServer should close nicely."); + self.queue.finish(); + self.broadcaster_handle.take().unwrap().join().unwrap(); self.handle.take().unwrap().join().unwrap(); } }