From 3ba3dd3805d95a87e97df097860181306b0317e6 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 19 Jul 2016 09:19:58 +0200 Subject: [PATCH] Asynchronous transactions (polling based for now). (#1652) * Asynchronous transactions (polling based for now). - Alter eth_sendTransaction to be async, returning one of: - Transaction hash (signed and submitted). - Transaction promise ID (< 32 bytes). - Zero hash (will never be signed). - Introduce new JSONRPC eth_checkTransaction. The new API call takes a single argument - a promise ID. It returns either: - Transaction hash (signed and submitted). - null (still pending, call again later), - Zero hash (will never be signed). * New RPC eth_postTransaction - Restore previous semantics of sendTransaction. - Introduce eth_postTransaction. - Some refactoring. * Fix minor lockup. * Use TransientHashMap to prevent leak. --- rpc/src/v1/helpers/mod.rs | 2 +- rpc/src/v1/helpers/signing_queue.rs | 48 +++++++++------- rpc/src/v1/impls/eth.rs | 2 +- rpc/src/v1/impls/eth_signing.rs | 88 ++++++++++++++++++++++------- rpc/src/v1/traits/eth.rs | 17 +++++- 5 files changed, 115 insertions(+), 42 deletions(-) diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index 3df96b00e..e38a77a79 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -22,4 +22,4 @@ mod signing_queue; pub use self::poll_manager::PollManager; pub use self::poll_filter::PollFilter; pub use self::requests::{TransactionRequest, TransactionConfirmation, CallRequest}; -pub use self::signing_queue::{ConfirmationsQueue, SigningQueue, QueueEvent}; +pub use self::signing_queue::{ConfirmationsQueue, ConfirmationPromise, ConfirmationResult, SigningQueue, QueueEvent}; diff --git a/rpc/src/v1/helpers/signing_queue.rs b/rpc/src/v1/helpers/signing_queue.rs index 8477ed4db..96c546052 100644 --- a/rpc/src/v1/helpers/signing_queue.rs +++ b/rpc/src/v1/helpers/signing_queue.rs @@ -77,8 +77,9 @@ pub trait SigningQueue: Send + Sync { fn is_empty(&self) -> bool; } -#[derive(Debug, PartialEq)] -enum ConfirmationResult { +#[derive(Debug, Clone, PartialEq)] +/// Result of a pending transaction. +pub enum ConfirmationResult { /// The transaction has not yet been confirmed nor rejected. Waiting, /// The transaction has been rejected. @@ -125,34 +126,43 @@ impl ConfirmationToken { } impl ConfirmationPromise { + /// Get the ID for this request. + pub fn id(&self) -> U256 { self.id } + /// Blocks current thread and awaits for /// resolution of the transaction (rejected / confirmed) /// Returns `None` if transaction was rejected or timeout reached. /// Returns `Some(result)` if transaction was confirmed. pub fn wait_with_timeout(&self) -> Option { let timeout = Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC); - let deadline = Instant::now() + timeout; + let res = self.wait_until(Instant::now() + timeout); + match res { + ConfirmationResult::Confirmed(h) => Some(h), + ConfirmationResult::Rejected | ConfirmationResult::Waiting => None, + } + } - info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id); + /// Just get the result, assuming it exists. + pub fn result(&self) -> ConfirmationResult { self.wait_until(Instant::now()) } + + /// Blocks current thread and awaits for + /// resolution of the transaction (rejected / confirmed) + /// Returns `None` if transaction was rejected or timeout reached. + /// Returns `Some(result)` if transaction was confirmed. + pub fn wait_until(&self, deadline: Instant) -> ConfirmationResult { + trace!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", self.id); loop { let now = Instant::now(); - if now >= deadline { - break; + // Check the result... + match *self.result.lock() { + // Waiting and deadline not yet passed continue looping. + ConfirmationResult::Waiting if now < deadline => {} + // Anything else - return. + ref a => return a.clone(), } - // Park thread (may wake up spuriously) + // wait a while longer - maybe the solution will arrive. thread::park_timeout(deadline - now); - // Take confirmation result - let res = self.result.lock(); - // Check the result - match *res { - ConfirmationResult::Rejected => return None, - ConfirmationResult::Confirmed(ref h) => return Some(h.clone()), - ConfirmationResult::Waiting => continue, - } } - // We reached the timeout. Just return `None` - trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", self.id); - None } } @@ -237,7 +247,7 @@ impl Drop for ConfirmationsQueue { } } -impl SigningQueue for ConfirmationsQueue { +impl SigningQueue for ConfirmationsQueue { fn add_request(&self, transaction: TransactionRequest) -> ConfirmationPromise { // Increment id let id = { diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index a3e774662..0bece9cd4 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -302,7 +302,7 @@ impl Eth for EthClient where }; to_value(&res) } - _ => Err(Error::invalid_params()) + _ => Err(Error::invalid_params()), } } diff --git a/rpc/src/v1/impls/eth_signing.rs b/rpc/src/v1/impls/eth_signing.rs index 4ba076358..331ce4deb 100644 --- a/rpc/src/v1/impls/eth_signing.rs +++ b/rpc/src/v1/impls/eth_signing.rs @@ -20,11 +20,12 @@ use std::sync::{Arc, Weak}; use jsonrpc_core::*; use ethcore::miner::MinerService; use ethcore::client::MiningBlockChainClient; -use util::{U256, Address, H256}; +use util::{U256, Address, H256, Mutex}; +use transient_hashmap::TransientHashMap; use ethcore::account_provider::AccountProvider; -use v1::helpers::{SigningQueue, ConfirmationsQueue, TransactionRequest as TRequest}; +use v1::helpers::{SigningQueue, ConfirmationPromise, ConfirmationResult, ConfirmationsQueue, TransactionRequest as TRequest}; use v1::traits::EthSigning; -use v1::types::{TransactionRequest, H160 as RpcH160, H256 as RpcH256, H520 as RpcH520}; +use v1::types::{TransactionRequest, H160 as RpcH160, H256 as RpcH256, H520 as RpcH520, U256 as RpcU256}; use v1::impls::{default_gas_price, sign_and_dispatch}; fn fill_optional_fields(request: &mut TRequest, client: &C, miner: &M) @@ -49,8 +50,12 @@ pub struct EthSigningQueueClient where C: MiningBlockChainClient, M: Miner accounts: Weak, client: Weak, miner: Weak, + + pending: Mutex>, } +const MAX_PENDING_DURATION: u64 = 60 * 60; + impl EthSigningQueueClient where C: MiningBlockChainClient, M: MinerService { /// Creates a new signing queue client given shared signing queue. pub fn new(queue: &Arc, client: &Arc, miner: &Arc, accounts: &Arc) -> Self { @@ -59,6 +64,7 @@ impl EthSigningQueueClient where C: MiningBlockChainClient, M: Miner accounts: Arc::downgrade(accounts), client: Arc::downgrade(client), miner: Arc::downgrade(miner), + pending: Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION)), } } @@ -67,21 +73,8 @@ impl EthSigningQueueClient where C: MiningBlockChainClient, M: Miner take_weak!(self.client).keep_alive(); Ok(()) } -} -impl EthSigning for EthSigningQueueClient - where C: MiningBlockChainClient + 'static, M: MinerService + 'static -{ - - fn sign(&self, _params: Params) -> Result { - try!(self.active()); - warn!("Invoking eth_sign is not yet supported with signer enabled."); - // TODO [ToDr] Implement sign when rest of the signing queue is ready. - rpc_unimplemented!() - } - - fn send_transaction(&self, params: Params) -> Result { - try!(self.active()); + fn dispatch Result>(&self, params: Params, f: F) -> Result { from_params::<(TransactionRequest, )>(params) .and_then(|(request, )| { let mut request: TRequest = request.into(); @@ -98,9 +91,54 @@ impl EthSigning for EthSigningQueueClient let queue = take_weak!(self.queue); fill_optional_fields(&mut request, &*client, &*miner); - let id = queue.add_request(request); - let result = id.wait_with_timeout(); - result.unwrap_or_else(|| to_value(&RpcH256::default())) + let promise = queue.add_request(request); + f(promise) + }) + } +} + +impl EthSigning for EthSigningQueueClient + where C: MiningBlockChainClient + 'static, M: MinerService + 'static +{ + + fn sign(&self, _params: Params) -> Result { + try!(self.active()); + warn!("Invoking eth_sign is not yet supported with signer enabled."); + // TODO [ToDr] Implement sign when rest of the signing queue is ready. + rpc_unimplemented!() + } + + fn send_transaction(&self, params: Params) -> Result { + try!(self.active()); + self.dispatch(params, |promise: ConfirmationPromise| { + promise.wait_with_timeout().unwrap_or_else(|| to_value(&RpcH256::default())) + }) + } + + fn post_transaction(&self, params: Params) -> Result { + try!(self.active()); + self.dispatch(params, |promise: ConfirmationPromise| { + let ret = to_value(&RpcU256::from(promise.id())); + self.pending.lock().insert(promise.id(), promise); + ret + }) + } + + fn check_transaction(&self, params: Params) -> Result { + try!(self.active()); + let mut pending = self.pending.lock(); + from_params::<(RpcU256, )>(params).and_then(|(id, )| { + let id: U256 = id.into(); + let res = match pending.get(&id) { + Some(ref promise) => match promise.result() { + ConfirmationResult::Waiting => { return Ok(Value::Null); } + ConfirmationResult::Rejected => to_value(&RpcH256::default()), + ConfirmationResult::Confirmed(rpc_response) => rpc_response, + }, + _ => { return Err(Error::invalid_params()); } + }; + pending.remove(&id); + res }) } } @@ -160,4 +198,14 @@ impl EthSigning for EthSigningUnsafeClient where } }) } + + fn post_transaction(&self, _: Params) -> Result { + // We don't support this in non-signer mode. + Err(Error::invalid_params()) + } + + fn check_transaction(&self, _: Params) -> Result { + // We don't support this in non-signer mode. + Err(Error::invalid_params()) + } } diff --git a/rpc/src/v1/traits/eth.rs b/rpc/src/v1/traits/eth.rs index c4369ff2a..d78f36c4c 100644 --- a/rpc/src/v1/traits/eth.rs +++ b/rpc/src/v1/traits/eth.rs @@ -206,14 +206,29 @@ pub trait EthSigning: Sized + Send + Sync + 'static { /// Signs the data with given address signature. fn sign(&self, _: Params) -> Result; - /// Sends transaction. + /// Sends transaction; will block for 20s to try to return the + /// transaction hash. + /// If it cannot yet be signed, it will return a transaction ID for + /// later use with check_transaction. fn send_transaction(&self, _: Params) -> Result; + /// Posts transaction asynchronously. + /// Will return a transaction ID for later use with check_transaction. + fn post_transaction(&self, _: Params) -> Result; + + /// Checks the progress of a previously posted transaction. + /// Should be given a valid send_transaction ID. + /// Returns the transaction hash, the zero hash (not yet available), + /// or an error. + fn check_transaction(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); delegate.add_method("eth_sign", EthSigning::sign); delegate.add_method("eth_sendTransaction", EthSigning::send_transaction); + delegate.add_method("eth_postTransaction", EthSigning::post_transaction); + delegate.add_method("eth_checkTransaction", EthSigning::check_transaction); delegate } }