From e9bcce05a1048b3e5720e3d5a5cedcb5fbc0426d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 2 Jun 2016 12:12:31 +0200 Subject: [PATCH] Refactoring the signing queue --- rpc/src/v1/helpers/signing_queue.rs | 236 ++++++++++++++++------------ rpc/src/v1/impls/eth_signing.rs | 2 +- 2 files changed, 133 insertions(+), 105 deletions(-) diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index b0625b170..4f860e438 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -16,19 +16,23 @@ use std::thread; use std::time::{Instant, Duration}; -use std::sync::{mpsc, Mutex, RwLock}; +use std::sync::{mpsc, Mutex, RwLock, Arc}; use std::collections::HashMap; use v1::types::{TransactionRequest, TransactionConfirmation}; use util::{U256, H256}; -/// Messages that queue informs about +/// Possible events happening in the queue that can be listened to. #[derive(Debug, PartialEq)] -pub enum QueueMessage { +pub enum QueueEvent { /// Receiver should stop work upon receiving `Finish` message. Finish, - /// Informs about new transaction request. + /// Informs about new request. NewRequest(U256), + /// Request rejected. + RequestRejected(U256), + /// Request resolved. + RequestConfirmed(U256), } /// Defines possible errors returned from queue receiving method. @@ -41,19 +45,20 @@ pub enum QueueError { } /// Message Receiver type -pub type QueueMessageReceiver = mpsc::Receiver; +pub type QueueEventReceiver = mpsc::Receiver; /// A queue of transactions awaiting to be confirmed and signed. pub trait SigningQueue: Send + Sync { /// Add new request to the queue. - fn add_request(&self, transaction: TransactionRequest) -> U256; + /// Returns a `ConfirmationPromise` that can be used to await for resolution of given request. + fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise; - /// Remove request from the queue. - /// Notify possible waiters that transaction was rejected. + /// Removes a request from the queue. + /// Notifies possible token holders that transaction was rejected. fn request_rejected(&self, id: U256) -> Option; - /// Remove request from the queue. - /// Notify possible waiters that transaction was confirmed and got given hash. + /// Removes a request from the queue. + /// Notifies possible token holders that transaction was confirmed and given hash was assigned. fn request_confirmed(&self, id: U256, hash: H256) -> Option; /// Returns a request if it is contained in the queue. @@ -61,32 +66,89 @@ pub trait SigningQueue: Send + Sync { /// Return copy of all the requests in the queue. fn requests(&self) -> Vec; +} - /// Blocks for some time waiting for confirmation. - /// Returns `None` when timeout reached or transaction was rejected. - /// Returns transaction hash when transaction was confirmed. - fn wait_with_timeout(&self, id: U256) -> Option; +#[derive(Debug, PartialEq)] +enum ConfirmationResult { + /// The transaction has not yet been confirmed nor rejected. + Waiting, + /// The transaction has been rejected. + Rejected, + /// The transaction has been confirmed. + Confirmed(H256), } /// Time you need to confirm the transaction in UI. +/// This is the amount of time token holder will wait before +/// returning `None`. /// Unless we have a multi-threaded RPC this will lock /// any other incoming call! const QUEUE_TIMEOUT_DURATION_SEC : u64 = 20; -#[derive(Debug, Clone)] -enum QueueStatus { - Waiting, - Rejected, - Confirmed(H256), +/// A handle to submitted request. +/// Allows to block and wait for a resolution of that request. +pub struct ConfirmationToken { + result: Arc>, + handle: thread::Thread, + request: TransactionConfirmation, +} + +pub struct ConfirmationPromise { + id: U256, + result: Arc>, +} + +impl ConfirmationToken { + /// Submit solution to all listeners + fn resolve(&self, result: Option) { + let mut res = self.result.lock().unwrap(); + *res = result.map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h)); + // Notify listener + self.handle.unpark(); + } + + fn as_promise(&self) -> ConfirmationPromise { + ConfirmationPromise { + id: self.request.id, + result: self.result.clone(), + } + } +} + +impl ConfirmationPromise { + /// Blocks current thread and awaits for + /// resolution of the transaction (rejected / confirmed) + /// Returns `None` if transaction was rejected or timeout reached. + /// Returns `Some(hash)` if transaction was confirmed. + pub fn wait_with_timeout(&self) -> Option { + let timeout = Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); + let deadline = Instant::now() + timeout; + + info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id); + while Instant::now() < deadline { + // Park thread + thread::park_timeout(timeout); + // Take confirmation result + let res = self.result.lock().unwrap(); + // Check the result + match *res { + ConfirmationResult::Rejected => return None, + ConfirmationResult::Confirmed(h) => return Some(h), + ConfirmationResult::Waiting => continue, + } + } + // We reached the timeout. Just return `None` and make sure to remove waiting. + trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", self.id); + None + } } /// Queue for all unconfirmed transactions. pub struct ConfirmationsQueue { id: Mutex, - waiters: RwLock>, - queue: RwLock>, - sender: Mutex>, - receiver: Mutex>>, + queue: RwLock>, + sender: Mutex>, + receiver: Mutex>>, } impl Default for ConfirmationsQueue { @@ -95,7 +157,6 @@ impl Default for ConfirmationsQueue { ConfirmationsQueue { id: Mutex::new(U256::from(0)), - waiters: RwLock::new(HashMap::new()), queue: RwLock::new(HashMap::new()), sender: Mutex::new(send), receiver: Mutex::new(Some(recv)), @@ -104,11 +165,11 @@ impl Default for ConfirmationsQueue { } impl ConfirmationsQueue { - /// Blocks the thread and starts listening for notifications. - /// For each event `listener` callback function will be invoked. - /// This method can be used only once. + /// Blocks the thread and starts listening for notifications regarding all actions in the queue. + /// For each event, `listener` callback will be invoked. + /// This method can be used only once (only single consumer of events can exist). pub fn start_listening(&self, listener: F) -> Result<(), QueueError> - where F: Fn(QueueMessage) -> () { + where F: Fn(QueueEvent) -> () { let recv = self.receiver.lock().unwrap().take(); if let None = recv { return Err(QueueError::AlreadyUsed); @@ -117,7 +178,7 @@ impl ConfirmationsQueue { loop { let message = try!(recv.recv().map_err(|e| QueueError::ReceiverError(e))); - if let QueueMessage::Finish = message { + if let QueueEvent::Finish = message { return Ok(()); } @@ -125,23 +186,35 @@ impl ConfirmationsQueue { } } - /// Notifies receiver that the communcation is over. + /// Notifies consumer that the communcation is over. + /// No more events will be sent after this function is invoked. pub fn finish(&self) { - self.notify(QueueMessage::Finish); + self.notify(QueueEvent::Finish); } - fn notify(&self, message: QueueMessage) { + /// Notifies receiver about the event happening in this queue. + fn notify(&self, message: QueueEvent) { // We don't really care about the result let _ = self.sender.lock().unwrap().send(message); } - fn remove(&self, id: U256) -> Option { - self.queue.write().unwrap().remove(&id) - } + /// Removes transaction from this queue and notifies `ConfirmationPromise` holders about the result. + /// Notifies also a receiver about that event. + fn remove(&self, id: U256, result: Option) -> Option { + let token = self.queue.write().unwrap().remove(&id); - fn update_status(&self, id: U256, status: QueueStatus) { - let mut waiters = self.waiters.write().unwrap(); - waiters.insert(id, status); + if let Some(token) = token { + // notify receiver about the event + self.notify(result.map_or_else( + || QueueEvent::RequestRejected(id), + |_| QueueEvent::RequestConfirmed(id) + )); + // notify token holders about resolution + token.resolve(result); + // return a result + return Some(token.request.clone()); + } + None } } @@ -152,7 +225,7 @@ impl Drop for ConfirmationsQueue { } impl SigningQueue for ConfirmationsQueue { - fn add_request(&self, transaction: TransactionRequest) -> U256 { + fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise { // Increment id let id = { let mut last_id = self.id.lock().unwrap(); @@ -160,87 +233,42 @@ impl SigningQueue for ConfirmationsQueue { *last_id }; // Add request to queue - { + let res = { let mut queue = self.queue.write().unwrap(); - queue.insert(id, TransactionConfirmation { - id: id, - transaction: transaction, + queue.insert(id, ConfirmationToken { + result: Arc::new(Mutex::new(ConfirmationResult::Waiting)), + handle: thread::current(), + request: TransactionConfirmation { + id: id, + transaction: transaction, + }, }); debug!(target: "own_tx", "Signer: New transaction ({:?}) in confirmation queue.", id); - } + queue.get(&id).map(|token| token.as_promise()).expect("Token was just inserted.") + }; // Notify listeners - self.notify(QueueMessage::NewRequest(id)); - id + self.notify(QueueEvent::NewRequest(id)); + res + } fn peek(&self, id: &U256) -> Option { - self.queue.read().unwrap().get(id).cloned() + self.queue.read().unwrap().get(id).map(|token| token.request.clone()) } fn request_rejected(&self, id: U256) -> Option { debug!(target: "own_tx", "Signer: Transaction rejected ({:?}).", id); - let o = self.remove(id); - self.update_status(id, QueueStatus::Rejected); - o + self.remove(id, None) } fn request_confirmed(&self, id: U256, hash: H256) -> Option { debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id); - let o = self.remove(id); - self.update_status(id, QueueStatus::Confirmed(hash)); - o + self.remove(id, Some(hash)) } fn requests(&self) -> Vec { let queue = self.queue.read().unwrap(); - queue.values().cloned().collect() - } - - fn wait_with_timeout(&self, id: U256) -> Option { - { - let mut waiters = self.waiters.write().unwrap(); - let r = waiters.insert(id, QueueStatus::Waiting); - match r { - // This is ok, we can have many waiters - Some(QueueStatus::Waiting) | None => {}, - // There already was a response for someone. - // The one waiting for it will cleanup, so... - Some(v) => { - // ... insert old status back - waiters.insert(id, v.clone()); - if let QueueStatus::Confirmed(h) = v { - return Some(h); - } - return None; - }, - } - } - - info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", id); - // Now wait for a response - let deadline = Instant::now() + Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); - while Instant::now() < deadline { - let status = { - let waiters = self.waiters.read().unwrap(); - waiters.get(&id).expect("Only the waiting thread can remove any message.").clone() - }; - - match status { - QueueStatus::Waiting => thread::sleep(Duration::from_millis(50)), - QueueStatus::Confirmed(h) => { - self.waiters.write().unwrap().remove(&id); - return Some(h); - }, - QueueStatus::Rejected => { - self.waiters.write().unwrap().remove(&id); - return None; - }, - } - } - // We reached the timeout. Just return `None` and make sure to remove waiting. - trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", id); - self.waiters.write().unwrap().remove(&id); - None + queue.values().map(|token| token.request.clone()).collect() } } @@ -277,7 +305,7 @@ mod test { let q = queue.clone(); let handle = thread::spawn(move || { let v = q.add_request(request); - q.wait_with_timeout(v).expect("Should return hash") + v.wait_with_timeout().expect("Should return hash") }); let id = U256::from(1); @@ -307,13 +335,13 @@ mod test { *v = Some(notification); }).expect("Should be closed nicely.") }); - let v = queue.add_request(request); + queue.add_request(request); queue.finish(); // then handle.join().expect("Thread should finish nicely"); let r = received.lock().unwrap().take(); - assert_eq!(r, Some(QueueMessage::NewRequest(v))); + assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1)))); } #[test] diff --git a/rpc/src/v1/impls/eth_signing.rs b/rpc/src/v1/impls/eth_signing.rs index d7e997c71..a5b3f2592 100644 --- a/rpc/src/v1/impls/eth_signing.rs +++ b/rpc/src/v1/impls/eth_signing.rs @@ -54,7 +54,7 @@ impl EthSigning for EthSigningQueueClient { .and_then(|(request, )| { let queue = take_weak!(self.queue); let id = queue.add_request(request); - let result = queue.wait_with_timeout(id); + let result = id.wait_with_timeout(); to_value(&result.unwrap_or_else(H256::new)) }) }