Asynchronous transactions (polling based for now). (#1652)

* Asynchronous transactions (polling based for now).

- Alter eth_sendTransaction to be async, returning one of:
  - Transaction hash (signed and submitted).
  - Transaction promise ID (< 32 bytes).
  - Zero hash (will never be signed).
- Introduce new JSONRPC eth_checkTransaction.

The new API call takes a single argument - a promise ID. It returns
either:
- Transaction hash (signed and submitted).
- null (still pending, call again later),
- Zero hash (will never be signed).

* New RPC eth_postTransaction

- Restore previous semantics of sendTransaction.
- Introduce eth_postTransaction.
- Some refactoring.

* Fix minor lockup.

* Use TransientHashMap to prevent leak.
This commit is contained in:
Gav Wood 2016-07-19 09:19:58 +02:00 committed by GitHub
parent 18f16616fe
commit 3ba3dd3805
5 changed files with 115 additions and 42 deletions

View File

@ -22,4 +22,4 @@ mod signing_queue;
pub use self::poll_manager::PollManager; pub use self::poll_manager::PollManager;
pub use self::poll_filter::PollFilter; pub use self::poll_filter::PollFilter;
pub use self::requests::{TransactionRequest, TransactionConfirmation, CallRequest}; pub use self::requests::{TransactionRequest, TransactionConfirmation, CallRequest};
pub use self::signing_queue::{ConfirmationsQueue, SigningQueue, QueueEvent}; pub use self::signing_queue::{ConfirmationsQueue, ConfirmationPromise, ConfirmationResult, SigningQueue, QueueEvent};

View File

@ -77,8 +77,9 @@ pub trait SigningQueue: Send + Sync {
fn is_empty(&self) -> bool; fn is_empty(&self) -> bool;
} }
#[derive(Debug, PartialEq)] #[derive(Debug, Clone, PartialEq)]
enum ConfirmationResult { /// Result of a pending transaction.
pub enum ConfirmationResult {
/// The transaction has not yet been confirmed nor rejected. /// The transaction has not yet been confirmed nor rejected.
Waiting, Waiting,
/// The transaction has been rejected. /// The transaction has been rejected.
@ -125,35 +126,44 @@ impl ConfirmationToken {
} }
impl ConfirmationPromise { impl ConfirmationPromise {
/// Get the ID for this request.
pub fn id(&self) -> U256 { self.id }
/// Blocks current thread and awaits for /// Blocks current thread and awaits for
/// resolution of the transaction (rejected / confirmed) /// resolution of the transaction (rejected / confirmed)
/// Returns `None` if transaction was rejected or timeout reached. /// Returns `None` if transaction was rejected or timeout reached.
/// Returns `Some(result)` if transaction was confirmed. /// Returns `Some(result)` if transaction was confirmed.
pub fn wait_with_timeout(&self) -> Option<RpcResult> { pub fn wait_with_timeout(&self) -> Option<RpcResult> {
let timeout = Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); let timeout = Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC);
let deadline = Instant::now() + timeout; let res = self.wait_until(Instant::now() + timeout);
match res {
ConfirmationResult::Confirmed(h) => Some(h),
ConfirmationResult::Rejected | ConfirmationResult::Waiting => None,
}
}
info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id); /// Just get the result, assuming it exists.
pub fn result(&self) -> ConfirmationResult { self.wait_until(Instant::now()) }
/// Blocks current thread and awaits for
/// resolution of the transaction (rejected / confirmed)
/// Returns `None` if transaction was rejected or timeout reached.
/// Returns `Some(result)` if transaction was confirmed.
pub fn wait_until(&self, deadline: Instant) -> ConfirmationResult {
trace!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id);
loop { loop {
let now = Instant::now(); let now = Instant::now();
if now >= deadline { // Check the result...
break; match *self.result.lock() {
// Waiting and deadline not yet passed continue looping.
ConfirmationResult::Waiting if now < deadline => {}
// Anything else - return.
ref a => return a.clone(),
} }
// Park thread (may wake up spuriously) // wait a while longer - maybe the solution will arrive.
thread::park_timeout(deadline - now); thread::park_timeout(deadline - now);
// Take confirmation result
let res = self.result.lock();
// Check the result
match *res {
ConfirmationResult::Rejected => return None,
ConfirmationResult::Confirmed(ref h) => return Some(h.clone()),
ConfirmationResult::Waiting => continue,
} }
} }
// We reached the timeout. Just return `None`
trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", self.id);
None
}
} }
/// Queue for all unconfirmed transactions. /// Queue for all unconfirmed transactions.

View File

@ -302,7 +302,7 @@ impl<C, S: ?Sized, M, EM> Eth for EthClient<C, S, M, EM> where
}; };
to_value(&res) to_value(&res)
} }
_ => Err(Error::invalid_params()) _ => Err(Error::invalid_params()),
} }
} }

View File

@ -20,11 +20,12 @@ use std::sync::{Arc, Weak};
use jsonrpc_core::*; use jsonrpc_core::*;
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient; use ethcore::client::MiningBlockChainClient;
use util::{U256, Address, H256}; use util::{U256, Address, H256, Mutex};
use transient_hashmap::TransientHashMap;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use v1::helpers::{SigningQueue, ConfirmationsQueue, TransactionRequest as TRequest}; use v1::helpers::{SigningQueue, ConfirmationPromise, ConfirmationResult, ConfirmationsQueue, TransactionRequest as TRequest};
use v1::traits::EthSigning; use v1::traits::EthSigning;
use v1::types::{TransactionRequest, H160 as RpcH160, H256 as RpcH256, H520 as RpcH520}; use v1::types::{TransactionRequest, H160 as RpcH160, H256 as RpcH256, H520 as RpcH520, U256 as RpcU256};
use v1::impls::{default_gas_price, sign_and_dispatch}; use v1::impls::{default_gas_price, sign_and_dispatch};
fn fill_optional_fields<C, M>(request: &mut TRequest, client: &C, miner: &M) fn fill_optional_fields<C, M>(request: &mut TRequest, client: &C, miner: &M)
@ -49,8 +50,12 @@ pub struct EthSigningQueueClient<C, M> where C: MiningBlockChainClient, M: Miner
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, client: Weak<C>,
miner: Weak<M>, miner: Weak<M>,
pending: Mutex<TransientHashMap<U256, ConfirmationPromise>>,
} }
const MAX_PENDING_DURATION: u64 = 60 * 60;
impl<C, M> EthSigningQueueClient<C, M> where C: MiningBlockChainClient, M: MinerService { impl<C, M> EthSigningQueueClient<C, M> where C: MiningBlockChainClient, M: MinerService {
/// Creates a new signing queue client given shared signing queue. /// Creates a new signing queue client given shared signing queue.
pub fn new(queue: &Arc<ConfirmationsQueue>, client: &Arc<C>, miner: &Arc<M>, accounts: &Arc<AccountProvider>) -> Self { pub fn new(queue: &Arc<ConfirmationsQueue>, client: &Arc<C>, miner: &Arc<M>, accounts: &Arc<AccountProvider>) -> Self {
@ -59,6 +64,7 @@ impl<C, M> EthSigningQueueClient<C, M> where C: MiningBlockChainClient, M: Miner
accounts: Arc::downgrade(accounts), accounts: Arc::downgrade(accounts),
client: Arc::downgrade(client), client: Arc::downgrade(client),
miner: Arc::downgrade(miner), miner: Arc::downgrade(miner),
pending: Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION)),
} }
} }
@ -67,21 +73,8 @@ impl<C, M> EthSigningQueueClient<C, M> where C: MiningBlockChainClient, M: Miner
take_weak!(self.client).keep_alive(); take_weak!(self.client).keep_alive();
Ok(()) Ok(())
} }
}
impl<C, M> EthSigning for EthSigningQueueClient<C, M> fn dispatch<F: FnOnce(ConfirmationPromise) -> Result<Value, Error>>(&self, params: Params, f: F) -> Result<Value, Error> {
where C: MiningBlockChainClient + 'static, M: MinerService + 'static
{
fn sign(&self, _params: Params) -> Result<Value, Error> {
try!(self.active());
warn!("Invoking eth_sign is not yet supported with signer enabled.");
// TODO [ToDr] Implement sign when rest of the signing queue is ready.
rpc_unimplemented!()
}
fn send_transaction(&self, params: Params) -> Result<Value, Error> {
try!(self.active());
from_params::<(TransactionRequest, )>(params) from_params::<(TransactionRequest, )>(params)
.and_then(|(request, )| { .and_then(|(request, )| {
let mut request: TRequest = request.into(); let mut request: TRequest = request.into();
@ -98,9 +91,54 @@ impl<C, M> EthSigning for EthSigningQueueClient<C, M>
let queue = take_weak!(self.queue); let queue = take_weak!(self.queue);
fill_optional_fields(&mut request, &*client, &*miner); fill_optional_fields(&mut request, &*client, &*miner);
let id = queue.add_request(request); let promise = queue.add_request(request);
let result = id.wait_with_timeout(); f(promise)
result.unwrap_or_else(|| to_value(&RpcH256::default())) })
}
}
impl<C, M> EthSigning for EthSigningQueueClient<C, M>
where C: MiningBlockChainClient + 'static, M: MinerService + 'static
{
fn sign(&self, _params: Params) -> Result<Value, Error> {
try!(self.active());
warn!("Invoking eth_sign is not yet supported with signer enabled.");
// TODO [ToDr] Implement sign when rest of the signing queue is ready.
rpc_unimplemented!()
}
fn send_transaction(&self, params: Params) -> Result<Value, Error> {
try!(self.active());
self.dispatch(params, |promise: ConfirmationPromise| {
promise.wait_with_timeout().unwrap_or_else(|| to_value(&RpcH256::default()))
})
}
fn post_transaction(&self, params: Params) -> Result<Value, Error> {
try!(self.active());
self.dispatch(params, |promise: ConfirmationPromise| {
let ret = to_value(&RpcU256::from(promise.id()));
self.pending.lock().insert(promise.id(), promise);
ret
})
}
fn check_transaction(&self, params: Params) -> Result<Value, Error> {
try!(self.active());
let mut pending = self.pending.lock();
from_params::<(RpcU256, )>(params).and_then(|(id, )| {
let id: U256 = id.into();
let res = match pending.get(&id) {
Some(ref promise) => match promise.result() {
ConfirmationResult::Waiting => { return Ok(Value::Null); }
ConfirmationResult::Rejected => to_value(&RpcH256::default()),
ConfirmationResult::Confirmed(rpc_response) => rpc_response,
},
_ => { return Err(Error::invalid_params()); }
};
pending.remove(&id);
res
}) })
} }
} }
@ -160,4 +198,14 @@ impl<C, M> EthSigning for EthSigningUnsafeClient<C, M> where
} }
}) })
} }
fn post_transaction(&self, _: Params) -> Result<Value, Error> {
// We don't support this in non-signer mode.
Err(Error::invalid_params())
}
fn check_transaction(&self, _: Params) -> Result<Value, Error> {
// We don't support this in non-signer mode.
Err(Error::invalid_params())
}
} }

View File

@ -206,14 +206,29 @@ pub trait EthSigning: Sized + Send + Sync + 'static {
/// Signs the data with given address signature. /// Signs the data with given address signature.
fn sign(&self, _: Params) -> Result<Value, Error>; fn sign(&self, _: Params) -> Result<Value, Error>;
/// Sends transaction. /// Sends transaction; will block for 20s to try to return the
/// transaction hash.
/// If it cannot yet be signed, it will return a transaction ID for
/// later use with check_transaction.
fn send_transaction(&self, _: Params) -> Result<Value, Error>; fn send_transaction(&self, _: Params) -> Result<Value, Error>;
/// Posts transaction asynchronously.
/// Will return a transaction ID for later use with check_transaction.
fn post_transaction(&self, _: Params) -> Result<Value, Error>;
/// Checks the progress of a previously posted transaction.
/// Should be given a valid send_transaction ID.
/// Returns the transaction hash, the zero hash (not yet available),
/// or an error.
fn check_transaction(&self, _: Params) -> Result<Value, Error>;
/// Should be used to convert object to io delegate. /// Should be used to convert object to io delegate.
fn to_delegate(self) -> IoDelegate<Self> { fn to_delegate(self) -> IoDelegate<Self> {
let mut delegate = IoDelegate::new(Arc::new(self)); let mut delegate = IoDelegate::new(Arc::new(self));
delegate.add_method("eth_sign", EthSigning::sign); delegate.add_method("eth_sign", EthSigning::sign);
delegate.add_method("eth_sendTransaction", EthSigning::send_transaction); delegate.add_method("eth_sendTransaction", EthSigning::send_transaction);
delegate.add_method("eth_postTransaction", EthSigning::post_transaction);
delegate.add_method("eth_checkTransaction", EthSigning::check_transaction);
delegate delegate
} }
} }