diff --git a/parity/signer.rs b/parity/signer.rs
index d549b89cb..a7de993fb 100644
--- a/parity/signer.rs
+++ b/parity/signer.rs
@@ -51,7 +51,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> SignerServer {
});
let start_result = {
- let server = signer::ServerBuilder::new();
+ let server = signer::ServerBuilder::new(deps.apis.signer_queue.clone());
let server = rpc_apis::setup_rpc(server, deps.apis, rpc_apis::ApiSet::SafeContext);
server.start(addr)
};
diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs
index eee4328ee..0ded8998c 100644
--- a/rpc/src/v1/helpers/signing_queue.rs
+++ b/rpc/src/v1/helpers/signing_queue.rs
@@ -14,78 +14,281 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
-use std::sync::Mutex;
+use std::thread;
+use std::time::{Instant, Duration};
+use std::sync::{mpsc, Mutex, RwLock, Arc};
use std::collections::HashMap;
use v1::types::{TransactionRequest, TransactionConfirmation};
-use util::U256;
+use util::{U256, H256};
+
+
+/// Possible events happening in the queue that can be listened to.
+#[derive(Debug, PartialEq)]
+pub enum QueueEvent {
+ /// Receiver should stop work upon receiving `Finish` message.
+ Finish,
+ /// Informs about new request.
+ NewRequest(U256),
+ /// Request rejected.
+ RequestRejected(U256),
+ /// Request resolved.
+ RequestConfirmed(U256),
+}
+
+/// Defines possible errors returned from queue receiving method.
+#[derive(Debug, PartialEq)]
+pub enum QueueError {
+ /// Returned when method has been already used (no receiver available).
+ AlreadyUsed,
+ /// Returned when receiver encounters an error.
+ ReceiverError(mpsc::RecvError),
+}
+
+/// Message Receiver type
+pub type QueueEventReceiver = mpsc::Receiver;
/// A queue of transactions awaiting to be confirmed and signed.
pub trait SigningQueue: Send + Sync {
/// Add new request to the queue.
- fn add_request(&self, transaction: TransactionRequest) -> U256;
+ /// Returns a `ConfirmationPromise` that can be used to await for resolution of given request.
+ fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise;
- /// Remove request from the queue.
- fn remove_request(&self, id: U256) -> Option;
+ /// Removes a request from the queue.
+ /// Notifies possible token holders that transaction was rejected.
+ fn request_rejected(&self, id: U256) -> Option;
+
+ /// Removes a request from the queue.
+ /// Notifies possible token holders that transaction was confirmed and given hash was assigned.
+ fn request_confirmed(&self, id: U256, hash: H256) -> Option;
+
+ /// Returns a request if it is contained in the queue.
+ fn peek(&self, id: &U256) -> Option;
/// Return copy of all the requests in the queue.
fn requests(&self) -> Vec;
}
-/// Queue for all unconfirmed transactions.
-pub struct ConfirmationsQueue {
- id: Mutex,
- queue: Mutex>,
+#[derive(Debug, PartialEq)]
+enum ConfirmationResult {
+ /// The transaction has not yet been confirmed nor rejected.
+ Waiting,
+ /// The transaction has been rejected.
+ Rejected,
+ /// The transaction has been confirmed.
+ Confirmed(H256),
}
-impl Default for ConfirmationsQueue {
- fn default() -> Self {
- ConfirmationsQueue {
- id: Mutex::new(U256::from(0)),
- queue: Mutex::new(HashMap::new()),
+/// Time you need to confirm the transaction in UI.
+/// This is the amount of time token holder will wait before
+/// returning `None`.
+/// Unless we have a multi-threaded RPC this will lock
+/// any other incoming call!
+const QUEUE_TIMEOUT_DURATION_SEC : u64 = 20;
+
+/// A handle to submitted request.
+/// Allows to block and wait for a resolution of that request.
+pub struct ConfirmationToken {
+ result: Arc>,
+ handle: thread::Thread,
+ request: TransactionConfirmation,
+}
+
+pub struct ConfirmationPromise {
+ id: U256,
+ result: Arc>,
+}
+
+impl ConfirmationToken {
+ /// Submit solution to all listeners
+ fn resolve(&self, result: Option) {
+ let mut res = self.result.lock().unwrap();
+ *res = result.map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h));
+ // Notify listener
+ self.handle.unpark();
+ }
+
+ fn as_promise(&self) -> ConfirmationPromise {
+ ConfirmationPromise {
+ id: self.request.id,
+ result: self.result.clone(),
}
}
}
-impl SigningQueue for ConfirmationsQueue {
- fn add_request(&self, transaction: TransactionRequest) -> U256 {
+impl ConfirmationPromise {
+ /// Blocks current thread and awaits for
+ /// resolution of the transaction (rejected / confirmed)
+ /// Returns `None` if transaction was rejected or timeout reached.
+ /// Returns `Some(hash)` if transaction was confirmed.
+ pub fn wait_with_timeout(&self) -> Option {
+ let timeout = Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC);
+ let deadline = Instant::now() + timeout;
+
+ info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id);
+ loop {
+ let now = Instant::now();
+ if now >= deadline {
+ break;
+ }
+ // Park thread (may wake up spuriously)
+ thread::park_timeout(deadline - now);
+ // Take confirmation result
+ let res = self.result.lock().unwrap();
+ // Check the result
+ match *res {
+ ConfirmationResult::Rejected => return None,
+ ConfirmationResult::Confirmed(h) => return Some(h),
+ ConfirmationResult::Waiting => continue,
+ }
+ }
+ // We reached the timeout. Just return `None`
+ trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", self.id);
+ None
+ }
+}
+
+/// Queue for all unconfirmed transactions.
+pub struct ConfirmationsQueue {
+ id: Mutex,
+ queue: RwLock>,
+ sender: Mutex>,
+ receiver: Mutex