// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
use std::mem;
use std::cell::RefCell;
use std::sync::{mpsc, Arc};
use std::collections::BTreeMap;
use jsonrpc_core;
use util::{Mutex, RwLock, U256};
use v1::helpers::{ConfirmationRequest, ConfirmationPayload};
use v1::types::ConfirmationResponse;
/// Result that can be returned from JSON RPC.
pub type RpcResult = Result;
/// Possible events happening in the queue that can be listened to.
#[derive(Debug, PartialEq)]
pub enum QueueEvent {
/// Receiver should stop work upon receiving `Finish` message.
Finish,
/// Informs about new request.
NewRequest(U256),
/// Request rejected.
RequestRejected(U256),
/// Request resolved.
RequestConfirmed(U256),
}
/// Defines possible errors returned from queue receiving method.
#[derive(Debug, PartialEq)]
pub enum QueueError {
/// Returned when method has been already used (no receiver available).
AlreadyUsed,
/// Returned when receiver encounters an error.
ReceiverError(mpsc::RecvError),
}
/// Defines possible errors when inserting to queue
#[derive(Debug, PartialEq)]
pub enum QueueAddError {
LimitReached,
}
/// Message Receiver type
pub type QueueEventReceiver = mpsc::Receiver;
// TODO [todr] to consider: timeout instead of limit?
const QUEUE_LIMIT: usize = 50;
/// A queue of transactions awaiting to be confirmed and signed.
pub trait SigningQueue: Send + Sync {
/// Add new request to the queue.
/// Returns a `ConfirmationPromise` that can be used to await for resolution of given request.
fn add_request(&self, request: ConfirmationPayload) -> Result;
/// Removes a request from the queue.
/// Notifies possible token holders that request was rejected.
fn request_rejected(&self, id: U256) -> Option;
/// Removes a request from the queue.
/// Notifies possible token holders that request was confirmed and given hash was assigned.
fn request_confirmed(&self, id: U256, result: RpcResult) -> Option;
/// Returns a request if it is contained in the queue.
fn peek(&self, id: &U256) -> Option;
/// Return copy of all the requests in the queue.
fn requests(&self) -> Vec;
/// Returns number of requests awaiting confirmation.
fn len(&self) -> usize;
/// Returns true if there are no requests awaiting confirmation.
fn is_empty(&self) -> bool;
}
#[derive(Debug, Clone, PartialEq)]
/// Result of a pending confirmation request.
pub enum ConfirmationResult {
/// The request has not yet been confirmed nor rejected.
Waiting,
/// The request has been rejected.
Rejected,
/// The request has been confirmed.
Confirmed(RpcResult),
}
type Listener = Box) + Send>;
/// A handle to submitted request.
/// Allows to block and wait for a resolution of that request.
pub struct ConfirmationToken {
result: Arc>,
listeners: Arc>>,
request: ConfirmationRequest,
}
pub struct ConfirmationPromise {
id: U256,
result: Arc>,
listeners: Arc>>,
}
impl ConfirmationToken {
/// Submit solution to all listeners
fn resolve(&self, result: Option) {
let wrapped = result.clone().map_or(ConfirmationResult::Rejected, |h| ConfirmationResult::Confirmed(h));
{
let mut res = self.result.lock();
*res = wrapped.clone();
}
// Notify listener
let listeners = {
let mut listeners = self.listeners.lock();
mem::replace(&mut *listeners, Vec::new())
};
for mut listener in listeners {
listener(result.clone());
}
}
fn as_promise(&self) -> ConfirmationPromise {
ConfirmationPromise {
id: self.request.id,
result: self.result.clone(),
listeners: self.listeners.clone(),
}
}
}
impl ConfirmationPromise {
/// Get the ID for this request.
pub fn id(&self) -> U256 { self.id }
/// Just get the result, assuming it exists.
pub fn result(&self) -> ConfirmationResult {
self.result.lock().clone()
}
pub fn wait_for_result(self, callback: F) where F: FnOnce(Option) + Send + 'static {
trace!(target: "own_tx", "Signer: Awaiting confirmation... ({:?}).", self.id);
let _result = self.result.lock();
let mut listeners = self.listeners.lock();
// TODO [todr] Overcoming FnBox unstability
let callback = RefCell::new(Some(callback));
listeners.push(Box::new(move |result| {
let ref mut f = *callback.borrow_mut();
f.take().expect("Callbacks are called only once.")(result)
}));
}
}
/// Queue for all unconfirmed requests.
pub struct ConfirmationsQueue {
id: Mutex,
queue: RwLock>,
sender: Mutex>,
receiver: Mutex