import rpc transactions sequentially (#10051)

* import rpc transactions sequentially

* use impl trait in argument position, renamed ProspectiveDispatcher to WithPostSign

* grouped imports

* integrates PostSign with ProspectiveSigner

* fix spaces, removed unnecessary type cast and duplicate polling

* clean up code style

* Apply suggestions from code review
This commit is contained in:
Seun LanLege 2019-01-22 12:33:56 +04:00 committed by Tomasz Drwięga
parent a9a278a6e1
commit c35abe4196
2 changed files with 159 additions and 65 deletions

View File

@ -41,11 +41,11 @@ use types::basic_account::BasicAccount;
use types::ids::BlockId; use types::ids::BlockId;
use jsonrpc_core::{BoxFuture, Result, Error}; use jsonrpc_core::{BoxFuture, Result, Error};
use jsonrpc_core::futures::{future, Future, Poll, Async}; use jsonrpc_core::futures::{future, Future, Poll, Async, IntoFuture};
use jsonrpc_core::futures::future::Either; use jsonrpc_core::futures::future::Either;
use v1::helpers::{errors, nonce, TransactionRequest, FilledTransactionRequest, ConfirmationPayload}; use v1::helpers::{errors, nonce, TransactionRequest, FilledTransactionRequest, ConfirmationPayload};
use v1::types::{ use v1::types::{
H256 as RpcH256, H520 as RpcH520, Bytes as RpcBytes, H520 as RpcH520, Bytes as RpcBytes,
RichRawTransaction as RpcRichRawTransaction, RichRawTransaction as RpcRichRawTransaction,
ConfirmationPayload as RpcConfirmationPayload, ConfirmationPayload as RpcConfirmationPayload,
ConfirmationResponse, ConfirmationResponse,
@ -69,12 +69,20 @@ pub trait Dispatcher: Send + Sync + Clone {
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address, force_nonce: bool) fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address, force_nonce: bool)
-> BoxFuture<FilledTransactionRequest>; -> BoxFuture<FilledTransactionRequest>;
/// Sign the given transaction request without dispatching, fetching appropriate nonce. /// Sign the given transaction request, fetching appropriate nonce and executing the PostSign action
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith) fn sign<P>(
-> BoxFuture<WithToken<SignedTransaction>>; &self,
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
password: SignWith,
post_sign: P
) -> BoxFuture<P::Item>
where
P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send;
/// Converts a `SignedTransaction` into `RichRawTransaction` /// Converts a `SignedTransaction` into `RichRawTransaction`
fn enrich(&self, SignedTransaction) -> RpcRichRawTransaction; fn enrich(&self, signed: SignedTransaction) -> RpcRichRawTransaction;
/// "Dispatch" a local transaction. /// "Dispatch" a local transaction.
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) fn dispatch_transaction(&self, signed_transaction: PendingTransaction)
@ -164,19 +172,30 @@ impl<C: miner::BlockChainClient + BlockChainClient, M: MinerService> Dispatcher
})) }))
} }
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith) fn sign<P>(
-> BoxFuture<WithToken<SignedTransaction>> &self,
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
password: SignWith,
post_sign: P
) -> BoxFuture<P::Item>
where
P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send
{ {
let chain_id = self.client.signing_chain_id(); let chain_id = self.client.signing_chain_id();
if let Some(nonce) = filled.nonce { if let Some(nonce) = filled.nonce {
return Box::new(future::done(sign_transaction(&*accounts, filled, chain_id, nonce, password))); let future = sign_transaction(&*accounts, filled, chain_id, nonce, password)
.into_future()
.and_then(move |signed| post_sign.execute(signed));
Box::new(future)
} else {
let state = self.state_nonce(&filled.from);
let reserved = self.nonces.lock().reserve(filled.from, state);
Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password, post_sign))
} }
let state = self.state_nonce(&filled.from);
let reserved = self.nonces.lock().reserve(filled.from, state);
Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password))
} }
fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction { fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction {
@ -396,12 +415,24 @@ impl Dispatcher for LightDispatcher {
})) }))
} }
fn sign(&self, accounts: Arc<AccountProvider>, filled: FilledTransactionRequest, password: SignWith) fn sign<P>(
-> BoxFuture<WithToken<SignedTransaction>> &self,
accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest,
password: SignWith,
post_sign: P
) -> BoxFuture<P::Item>
where
P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send
{ {
let chain_id = self.client.signing_chain_id(); let chain_id = self.client.signing_chain_id();
let nonce = filled.nonce.expect("nonce is always provided; qed"); let nonce = filled.nonce.expect("nonce is always provided; qed");
Box::new(future::done(sign_transaction(&*accounts, filled, chain_id, nonce, password)))
let future = sign_transaction(&*accounts, filled, chain_id, nonce, password)
.into_future()
.and_then(move |signed| post_sign.execute(signed));
Box::new(future)
} }
fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction { fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction {
@ -449,28 +480,60 @@ fn sign_transaction(
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum ProspectiveSignerState { enum ProspectiveSignerState {
TryProspectiveSign, TryProspectiveSign,
WaitForPostSign,
WaitForNonce, WaitForNonce,
Finish,
} }
struct ProspectiveSigner { struct ProspectiveSigner<P: PostSign> {
accounts: Arc<AccountProvider>, accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest, filled: FilledTransactionRequest,
chain_id: Option<u64>, chain_id: Option<u64>,
reserved: nonce::Reserved, reserved: nonce::Reserved,
password: SignWith, password: SignWith,
state: ProspectiveSignerState, state: ProspectiveSignerState,
prospective: Option<Result<WithToken<SignedTransaction>>>, prospective: Option<WithToken<SignedTransaction>>,
ready: Option<nonce::Ready>, ready: Option<nonce::Ready>,
post_sign: Option<P>,
post_sign_future: Option<<P::Out as IntoFuture>::Future>
} }
impl ProspectiveSigner { /// action to execute after signing
/// e.g importing a transaction into the chain
pub trait PostSign: Send {
/// item that this PostSign returns
type Item: Send;
/// incase you need to perform async PostSign actions
type Out: IntoFuture<Item = Self::Item, Error = Error> + Send;
/// perform an action with the signed transaction
fn execute(self, signer: WithToken<SignedTransaction>) -> Self::Out;
}
impl PostSign for () {
type Item = WithToken<SignedTransaction>;
type Out = Result<Self::Item>;
fn execute(self, signed: WithToken<SignedTransaction>) -> Self::Out {
Ok(signed)
}
}
impl<F: Send, T: Send> PostSign for F
where F: FnOnce(WithToken<SignedTransaction>) -> Result<T>
{
type Item = T;
type Out = Result<Self::Item>;
fn execute(self, signed: WithToken<SignedTransaction>) -> Self::Out {
(self)(signed)
}
}
impl<P: PostSign> ProspectiveSigner<P> {
pub fn new( pub fn new(
accounts: Arc<AccountProvider>, accounts: Arc<AccountProvider>,
filled: FilledTransactionRequest, filled: FilledTransactionRequest,
chain_id: Option<u64>, chain_id: Option<u64>,
reserved: nonce::Reserved, reserved: nonce::Reserved,
password: SignWith, password: SignWith,
post_sign: P
) -> Self { ) -> Self {
// If the account is permanently unlocked we can try to sign // If the account is permanently unlocked we can try to sign
// using prospective nonce. This should speed up sending // using prospective nonce. This should speed up sending
@ -491,6 +554,8 @@ impl ProspectiveSigner {
}, },
prospective: None, prospective: None,
ready: None, ready: None,
post_sign: Some(post_sign),
post_sign_future: None
} }
} }
@ -509,8 +574,8 @@ impl ProspectiveSigner {
} }
} }
impl Future for ProspectiveSigner { impl<P: PostSign> Future for ProspectiveSigner<P> {
type Item = WithToken<SignedTransaction>; type Item = P::Item;
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -523,32 +588,45 @@ impl Future for ProspectiveSigner {
match self.poll_reserved()? { match self.poll_reserved()? {
Async::NotReady => { Async::NotReady => {
self.state = WaitForNonce; self.state = WaitForNonce;
self.prospective = Some(self.sign(self.reserved.prospective_value())); self.prospective = Some(self.sign(self.reserved.prospective_value())?);
}, },
Async::Ready(nonce) => { Async::Ready(nonce) => {
self.state = Finish; self.state = WaitForPostSign;
self.prospective = Some(self.sign(nonce.value())); self.post_sign_future = Some(self.post_sign.take()
.expect("post_sign is set on creation; qed")
.execute(self.sign(nonce.value())?)
.into_future());
self.ready = Some(nonce); self.ready = Some(nonce);
}, },
} }
}, },
WaitForNonce => { WaitForNonce => {
let nonce = try_ready!(self.poll_reserved()); let nonce = try_ready!(self.poll_reserved());
let result = match (self.prospective.take(), nonce.matches_prospective()) { let prospective = match (self.prospective.take(), nonce.matches_prospective()) {
(Some(prospective), true) => prospective, (Some(prospective), true) => prospective,
_ => self.sign(nonce.value()), _ => self.sign(nonce.value())?,
}; };
self.state = Finish;
self.prospective = Some(result);
self.ready = Some(nonce); self.ready = Some(nonce);
self.state = WaitForPostSign;
self.post_sign_future = Some(self.post_sign.take()
.expect("post_sign is set on creation; qed")
.execute(prospective)
.into_future());
}, },
Finish => { WaitForPostSign => {
if let (Some(result), Some(nonce)) = (self.prospective.take(), self.ready.take()) { if let Some(mut fut) = self.post_sign_future.as_mut() {
// Mark nonce as used on successful signing match fut.poll()? {
return result.map(move |tx| { Async::Ready(item) => {
nonce.mark_used(); let nonce = self.ready
Async::Ready(tx) .take()
}) .expect("nonce is set before state transitions to WaitForPostSign; qed");
nonce.mark_used();
return Ok(Async::Ready(item))
},
Async::NotReady => {
return Ok(Async::NotReady)
}
}
} else { } else {
panic!("Poll after ready."); panic!("Poll after ready.");
} }
@ -655,19 +733,21 @@ pub fn execute<D: Dispatcher + 'static>(
match payload { match payload {
ConfirmationPayload::SendTransaction(request) => { ConfirmationPayload::SendTransaction(request) => {
let condition = request.condition.clone().map(Into::into); let condition = request.condition.clone().map(Into::into);
Box::new(dispatcher.sign(accounts, request, pass) let cloned_dispatcher = dispatcher.clone();
.map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition))) let post_sign = move |with_token_signed: WithToken<SignedTransaction>| {
.map(WithToken::into_tuple) let (signed, token) = with_token_signed.into_tuple();
.map(|(tx, token)| (tx, token, dispatcher)) let signed_transaction = PendingTransaction::new(signed, condition);
.and_then(|(tx, tok, dispatcher)| { cloned_dispatcher.dispatch_transaction(signed_transaction)
dispatcher.dispatch_transaction(tx) .map(|hash| (hash, token))
.map(RpcH256::from) };
.map(ConfirmationResponse::SendTransaction) let future = dispatcher.sign(accounts, request, pass, post_sign)
.map(move |h| WithToken::from((h, tok))) .map(|(hash, token)| {
})) WithToken::from((ConfirmationResponse::SendTransaction(hash.into()), token))
});
Box::new(future)
}, },
ConfirmationPayload::SignTransaction(request) => { ConfirmationPayload::SignTransaction(request) => {
Box::new(dispatcher.sign(accounts, request, pass) Box::new(dispatcher.sign(accounts, request, pass, ())
.map(move |result| result .map(move |result| result
.map(move |tx| dispatcher.enrich(tx)) .map(move |tx| dispatcher.enrich(tx))
.map(ConfirmationResponse::SignTransaction) .map(ConfirmationResponse::SignTransaction)

View File

@ -18,7 +18,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use bytes::{Bytes, ToPretty}; use bytes::Bytes;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use types::transaction::PendingTransaction; use types::transaction::PendingTransaction;
use ethereum_types::{H520, U128, Address}; use ethereum_types::{H520, U128, Address};
@ -27,7 +27,7 @@ use ethkey::{public_to_address, recover, Signature};
use jsonrpc_core::{BoxFuture, Result}; use jsonrpc_core::{BoxFuture, Result};
use jsonrpc_core::futures::{future, Future}; use jsonrpc_core::futures::{future, Future};
use v1::helpers::{errors, eip191}; use v1::helpers::{errors, eip191};
use v1::helpers::dispatch::{self, eth_data_hash, Dispatcher, SignWith}; use v1::helpers::dispatch::{self, eth_data_hash, Dispatcher, SignWith, PostSign, WithToken};
use v1::traits::Personal; use v1::traits::Personal;
use v1::types::{ use v1::types::{
H160 as RpcH160, H256 as RpcH256, H520 as RpcH520, U128 as RpcU128, H160 as RpcH160, H256 as RpcH256, H520 as RpcH520, U128 as RpcU128,
@ -41,6 +41,7 @@ use v1::types::{
use v1::metadata::Metadata; use v1::metadata::Metadata;
use eip_712::{EIP712, hash_structured_data}; use eip_712::{EIP712, hash_structured_data};
use jsonrpc_core::types::Value; use jsonrpc_core::types::Value;
use transaction::SignedTransaction;
/// Account management (personal) rpc implementation. /// Account management (personal) rpc implementation.
pub struct PersonalClient<D: Dispatcher> { pub struct PersonalClient<D: Dispatcher> {
@ -68,7 +69,16 @@ impl<D: Dispatcher> PersonalClient<D> {
} }
impl<D: Dispatcher + 'static> PersonalClient<D> { impl<D: Dispatcher + 'static> PersonalClient<D> {
fn do_sign_transaction(&self, _meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<(PendingTransaction, D)> { fn do_sign_transaction<P>(
&self,
_meta: Metadata,
request: TransactionRequest,
password: String,
post_sign: P
) -> BoxFuture<P::Item>
where P: PostSign + 'static,
<P::Out as futures::future::IntoFuture>::Future: Send
{
let dispatcher = self.dispatcher.clone(); let dispatcher = self.dispatcher.clone();
let accounts = self.accounts.clone(); let accounts = self.accounts.clone();
@ -86,11 +96,7 @@ impl<D: Dispatcher + 'static> PersonalClient<D> {
Box::new(dispatcher.fill_optional_fields(request.into(), default, false) Box::new(dispatcher.fill_optional_fields(request.into(), default, false)
.and_then(move |filled| { .and_then(move |filled| {
let condition = filled.condition.clone().map(Into::into); dispatcher.sign(accounts, filled, SignWith::Password(password.into()), post_sign)
dispatcher.sign(accounts, filled, SignWith::Password(password.into()))
.map(|tx| tx.into_value())
.map(move |tx| PendingTransaction::new(tx, condition))
.map(move |tx| (tx, dispatcher))
}) })
) )
} }
@ -223,18 +229,26 @@ impl<D: Dispatcher + 'static> Personal for PersonalClient<D> {
} }
fn sign_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcRichRawTransaction> { fn sign_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcRichRawTransaction> {
Box::new(self.do_sign_transaction(meta, request, password) let condition = request.condition.clone().map(Into::into);
.map(|(pending_tx, dispatcher)| dispatcher.enrich(pending_tx.transaction))) let dispatcher = self.dispatcher.clone();
Box::new(self.do_sign_transaction(meta, request, password, ())
.map(move |tx| PendingTransaction::new(tx.into_value(), condition))
.map(move |pending_tx| dispatcher.enrich(pending_tx.transaction)))
} }
fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256> { fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256> {
Box::new(self.do_sign_transaction(meta, request, password) let condition = request.condition.clone().map(Into::into);
.and_then(|(pending_tx, dispatcher)| { let dispatcher = self.dispatcher.clone();
let chain_id = pending_tx.chain_id(); Box::new(self.do_sign_transaction(meta, request, password, move |signed: WithToken<SignedTransaction>| {
trace!(target: "miner", "send_transaction: dispatching tx: {} for chain ID {:?}", dispatcher.dispatch_transaction(
::rlp::encode(&*pending_tx).pretty(), chain_id); PendingTransaction::new(
signed.into_value(),
dispatcher.dispatch_transaction(pending_tx).map(Into::into) condition
)
)
})
.and_then(|hash| {
Ok(RpcH256::from(hash))
}) })
) )
} }