use generic dispatcher everywhere, squash errors

This commit is contained in:
Robert Habermeier 2017-02-08 16:55:06 +01:00
parent e73ea80954
commit 5223e25aa6
7 changed files with 79 additions and 58 deletions

View File

@ -94,12 +94,13 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
{ {
let inner = move || { let inner = move || {
let (client, miner) = (take_weak!(self.client), take_weak!(self.miner)); let (client, miner) = (take_weak!(self.client), take_weak!(self.miner));
let request = request;
Ok(FilledTransactionRequest { Ok(FilledTransactionRequest {
from: request.from.unwrap_or(default_sender), from: request.from.unwrap_or(default_sender),
used_default_from: request.from.is_none(), used_default_from: request.from.is_none(),
to: request.to, to: request.to,
nonce: request.nonce, nonce: request.nonce,
gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(client, miner)), gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(&*client, &*miner)),
gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()), gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()),
value: request.value.unwrap_or_else(|| 0.into()), value: request.value.unwrap_or_else(|| 0.into()),
data: request.data.unwrap_or_else(Vec::new), data: request.data.unwrap_or_else(Vec::new),
@ -147,7 +148,7 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error> { fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error> {
let hash = signed_transaction.transaction.hash(); let hash = signed_transaction.transaction.hash();
take_weak!(self.miner).import_own_transaction(take_weak!(self.client), signed_transaction) take_weak!(self.miner).import_own_transaction(&*take_weak!(self.client), signed_transaction)
.map_err(errors::from_transaction_error) .map_err(errors::from_transaction_error)
.map(|_| hash) .map(|_| hash)
} }
@ -223,7 +224,7 @@ impl<T: Debug> From<(T, Option<AccountToken>)> for WithToken<T> {
} }
} }
pub fn execute<D: Dispatcher>( pub fn execute<D: Dispatcher + 'static>(
dispatcher: D, dispatcher: D,
accounts: &AccountProvider, accounts: &AccountProvider,
payload: ConfirmationPayload, payload: ConfirmationPayload,
@ -231,7 +232,7 @@ pub fn execute<D: Dispatcher>(
) -> BoxFuture<WithToken<ConfirmationResponse>, Error> { ) -> BoxFuture<WithToken<ConfirmationResponse>, Error> {
match payload { match payload {
ConfirmationPayload::SendTransaction(request) => { ConfirmationPayload::SendTransaction(request) => {
let condition = request.condition.clone(); let condition = request.condition.clone().map(Into::into);
dispatcher.sign(accounts, request, pass) dispatcher.sign(accounts, request, pass)
.map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition))) .map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition)))
.map(WithToken::into_tuple) .map(WithToken::into_tuple)

View File

@ -46,7 +46,7 @@ use jsonrpc_core::Error;
use jsonrpc_macros::Trailing; use jsonrpc_macros::Trailing;
use v1::helpers::{CallRequest as CRequest, errors, limit_logs}; use v1::helpers::{CallRequest as CRequest, errors, limit_logs};
use v1::helpers::dispatch::{dispatch_transaction, default_gas_price}; use v1::helpers::dispatch::{Dispatcher, FullDispatcher, default_gas_price};
use v1::helpers::block_import::is_major_importing; use v1::helpers::block_import::is_major_importing;
use v1::traits::Eth; use v1::traits::Eth;
use v1::types::{ use v1::types::{
@ -634,7 +634,8 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
.map_err(errors::from_rlp_error) .map_err(errors::from_rlp_error)
.and_then(|tx| SignedTransaction::new(tx).map_err(errors::from_transaction_error)) .and_then(|tx| SignedTransaction::new(tx).map_err(errors::from_transaction_error))
.and_then(|signed_transaction| { .and_then(|signed_transaction| {
dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), PendingTransaction::new(signed_transaction, None)) FullDispatcher::new(self.client.clone(), self.miner.clone())
.dispatch_transaction(signed_transaction.into())
}) })
.map(Into::into) .map(Into::into)
} }

View File

@ -37,7 +37,6 @@ use futures::{future, Future, BoxFuture};
use futures::sync::oneshot; use futures::sync::oneshot;
use v1::helpers::{CallRequest as CRequest, errors, limit_logs}; use v1::helpers::{CallRequest as CRequest, errors, limit_logs};
use v1::helpers::dispatch::{dispatch_transaction, default_gas_price};
use v1::helpers::block_import::is_major_importing; use v1::helpers::block_import::is_major_importing;
use v1::traits::Eth; use v1::traits::Eth;
use v1::types::{ use v1::types::{

View File

@ -23,46 +23,52 @@ use ethcore::account_provider::AccountProvider;
use ethcore::client::MiningBlockChainClient; use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{SignedTransaction, PendingTransaction}; use ethcore::transaction::{SignedTransaction, PendingTransaction};
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use futures::{self, future, BoxFuture, Future, IntoFuture};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload}; use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload};
use v1::helpers::dispatch::{self, dispatch_transaction, WithToken}; use v1::helpers::dispatch::{self, Dispatcher, WithToken};
use v1::traits::Signer; use v1::traits::Signer;
use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes}; use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes};
/// Transactions confirmation (personal) rpc implementation. /// Transactions confirmation (personal) rpc implementation.
pub struct SignerClient<C, M> where C: MiningBlockChainClient, M: MinerService { pub struct SignerClient<D: Dispatcher> {
signer: Weak<SignerService>, signer: Weak<SignerService>,
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D
miner: Weak<M>,
} }
impl<C: 'static, M: 'static> SignerClient<C, M> where C: MiningBlockChainClient, M: MinerService { impl<D: Dispatcher + 'static> SignerClient<D> {
/// Create new instance of signer client. /// Create new instance of signer client.
pub fn new( pub fn new(
store: &Arc<AccountProvider>, store: &Arc<AccountProvider>,
client: &Arc<C>, dispatcher: D,
miner: &Arc<M>,
signer: &Arc<SignerService>, signer: &Arc<SignerService>,
) -> Self { ) -> Self {
SignerClient { SignerClient {
signer: Arc::downgrade(signer), signer: Arc::downgrade(signer),
accounts: Arc::downgrade(store), accounts: Arc::downgrade(store),
client: Arc::downgrade(client), dispatcher: dispatcher,
miner: Arc::downgrade(miner),
} }
} }
fn confirm_internal<F>(&self, id: U256, modification: TransactionModification, f: F) -> Result<WithToken<ConfirmationResponse>, Error> where fn confirm_internal<F, T>(&self, id: U256, modification: TransactionModification, f: F) -> BoxFuture<WithToken<ConfirmationResponse>, Error> where
F: FnOnce(&C, &M, &AccountProvider, ConfirmationPayload) -> Result<WithToken<ConfirmationResponse>, Error>, F: FnOnce(D, &AccountProvider, ConfirmationPayload) -> T,
T: IntoFuture<Item=WithToken<ConfirmationResponse>, Error=Error>,
T::Future: Send + 'static
{ {
let id = id.into(); let id = id.into();
let accounts = take_weak!(self.accounts); let dispatcher = self.dispatcher.clone();
let signer = take_weak!(self.signer);
let client = take_weak!(self.client); let setup = || {
let miner = take_weak!(self.miner); Ok((take_weak!(self.accounts), take_weak!(self.signer)))
};
let (accounts, signer) = match setup() {
Ok(x) => x,
Err(e) => return future::err(e).boxed(),
};
signer.peek(&id).map(|confirmation| { signer.peek(&id).map(|confirmation| {
let mut payload = confirmation.payload.clone(); let mut payload = confirmation.payload.clone();
@ -83,17 +89,21 @@ impl<C: 'static, M: 'static> SignerClient<C, M> where C: MiningBlockChainClient,
request.condition = condition.clone().map(Into::into); request.condition = condition.clone().map(Into::into);
} }
} }
let result = f(&*client, &*miner, &*accounts, payload); let fut = f(dispatcher, &*accounts, payload);
// Execute fut.into_future().then(move |result| {
if let Ok(ref response) = result { // Execute
signer.request_confirmed(id, Ok((*response).clone())); if let Ok(ref response) = result {
} signer.request_confirmed(id, Ok((*response).clone()));
result }
}).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id)))
result
}).boxed()
})
.unwrap_or_else(|| future::err(errors::invalid_params("Unknown RequestID", id)).boxed())
} }
} }
impl<C: 'static, M: 'static> Signer for SignerClient<C, M> where C: MiningBlockChainClient, M: MinerService { impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> { fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> {
let signer = take_weak!(self.signer); let signer = take_weak!(self.signer);
@ -107,29 +117,31 @@ impl<C: 'static, M: 'static> Signer for SignerClient<C, M> where C: MiningBlockC
// TODO [ToDr] TransactionModification is redundant for some calls // TODO [ToDr] TransactionModification is redundant for some calls
// might be better to replace it in future // might be better to replace it in future
fn confirm_request(&self, id: U256, modification: TransactionModification, pass: String) -> Result<ConfirmationResponse, Error> { fn confirm_request(&self, id: U256, modification: TransactionModification, pass: String)
self.confirm_internal(id, modification, move |client, miner, accounts, payload| { -> BoxFuture<ConfirmationResponse, Error>
dispatch::execute(client, miner, accounts, payload, dispatch::SignWith::Password(pass)) {
}).map(|v| v.into_value()) self.confirm_internal(id, modification, move |dis, accounts, payload| {
dispatch::execute(dis, accounts, payload, dispatch::SignWith::Password(pass))
}).map(|v| v.into_value()).boxed()
} }
fn confirm_request_with_token(&self, id: U256, modification: TransactionModification, token: String) -> Result<ConfirmationResponseWithToken, Error> { fn confirm_request_with_token(&self, id: U256, modification: TransactionModification, token: String)
self.confirm_internal(id, modification, move |client, miner, accounts, payload| { -> BoxFuture<ConfirmationResponseWithToken, Error>
dispatch::execute(client, miner, accounts, payload, dispatch::SignWith::Token(token)) {
self.confirm_internal(id, modification, move |dis, accounts, payload| {
dispatch::execute(dis, accounts, payload, dispatch::SignWith::Token(token))
}).and_then(|v| match v { }).and_then(|v| match v {
WithToken::No(_) => Err(errors::internal("Unexpected response without token.", "")), WithToken::No(_) => Err(errors::internal("Unexpected response without token.", "")),
WithToken::Yes(response, token) => Ok(ConfirmationResponseWithToken { WithToken::Yes(response, token) => Ok(ConfirmationResponseWithToken {
result: response, result: response,
token: token, token: token,
}), }),
}) }).boxed()
} }
fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse, Error> { fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse, Error> {
let id = id.into(); let id = id.into();
let signer = take_weak!(self.signer); let signer = take_weak!(self.signer);
let client = take_weak!(self.client);
let miner = take_weak!(self.miner);
signer.peek(&id).map(|confirmation| { signer.peek(&id).map(|confirmation| {
let result = match confirmation.payload { let result = match confirmation.payload {
@ -150,7 +162,7 @@ impl<C: 'static, M: 'static> Signer for SignerClient<C, M> where C: MiningBlockC
// Dispatch if everything is ok // Dispatch if everything is ok
if sender_matches && data_matches && value_matches && nonce_matches { if sender_matches && data_matches && value_matches && nonce_matches {
let pending_transaction = PendingTransaction::new(signed_transaction, request.condition.map(Into::into)); let pending_transaction = PendingTransaction::new(signed_transaction, request.condition.map(Into::into));
dispatch_transaction(&*client, &*miner, pending_transaction) self.dispatcher.dispatch_transaction(pending_transaction)
.map(Into::into) .map(Into::into)
.map(ConfirmationResponse::SendTransaction) .map(ConfirmationResponse::SendTransaction)
} else { } else {

View File

@ -72,7 +72,7 @@ fn handle_dispatch<OnResponse>(res: Result<DispatchResult, Error>, on_response:
} }
} }
impl<D: Dispatcher> SigningQueueClient<D> { impl<D: Dispatcher + 'static> SigningQueueClient<D> {
/// Creates a new signing queue client given shared signing queue. /// Creates a new signing queue client given shared signing queue.
pub fn new(signer: &Arc<SignerService>, dispatcher: D, accounts: &Arc<AccountProvider>) -> Self { pub fn new(signer: &Arc<SignerService>, dispatcher: D, accounts: &Arc<AccountProvider>) -> Self {
SigningQueueClient { SigningQueueClient {
@ -84,30 +84,34 @@ impl<D: Dispatcher> SigningQueueClient<D> {
} }
fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> BoxFuture<DispatchResult, Error> { fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> BoxFuture<DispatchResult, Error> {
let setup = || { let setup = move || {
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
let default_account = default_account;
let default_account = match default_account { let default_account = match default_account {
DefaultAccount::Provided(acc) => acc, DefaultAccount::Provided(acc) => acc,
DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(), DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(),
}; };
(self.dispatcher.clone(), accounts, default_account) Ok((self.dispatcher.clone(), accounts, default_account))
}; };
let weak_signer = self.signer.clone(); let weak_signer = self.signer.clone();
future::done(setup) future::done(setup())
.and_then(move |(dispatcher, accounts, default_account)| { .and_then(move |(dispatcher, accounts, default_account)| {
dispatch::from_rpc(payload, default_account, &dispatcher) dispatch::from_rpc(payload, default_account, &dispatcher)
.and_then(move |payload| { .and_then(move |payload| {
let sender = payload.sender(); let sender = payload.sender();
if accounts.is_unlocked(sender) { if accounts.is_unlocked(sender) {
dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing) dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing)
.map(|v| v.into_value())
.map(DispatchResult::Value)
.boxed() .boxed()
} else { } else {
future::lazy(move || take_weak!(weak_signer).add_request(payload)) future::lazy(move ||
.map(DispatchResult::Promise) take_weak!(weak_signer).add_request(payload)
.map_err(|_| errors::request_rejected_limit()) .map(DispatchResult::Promise)
.boxed() .map_err(|_| errors::request_rejected_limit())
).boxed()
} }
}) })
}) })
@ -129,6 +133,7 @@ impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
RpcEither::Either(id.into()) RpcEither::Either(id.into())
}, },
}) })
.boxed()
} }
fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
@ -142,6 +147,7 @@ impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
RpcEither::Either(id.into()) RpcEither::Either(id.into())
}, },
}) })
.boxed()
} }
fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> { fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> {

View File

@ -44,7 +44,7 @@ pub struct SigningUnsafeClient<D> {
dispatcher: D, dispatcher: D,
} }
impl<D: Dispatcher> SigningUnsafeClient<D> { impl<D: Dispatcher + 'static> SigningUnsafeClient<D> {
/// Creates new SigningUnsafeClient. /// Creates new SigningUnsafeClient.
pub fn new(accounts: &Arc<AccountProvider>, dispatcher: D) -> Self { pub fn new(accounts: &Arc<AccountProvider>, dispatcher: D) -> Self {
SigningUnsafeClient { SigningUnsafeClient {
@ -56,20 +56,21 @@ impl<D: Dispatcher> SigningUnsafeClient<D> {
fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> BoxFuture<RpcConfirmationResponse, Error> { fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> BoxFuture<RpcConfirmationResponse, Error> {
let setup = move || { let setup = move || {
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
let default_account = match account { let default_account = account;
let default_account = match default_account {
DefaultAccount::Provided(acc) => acc, DefaultAccount::Provided(acc) => acc,
DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(), DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(),
}; };
(accounts, default_account) Ok((accounts, default_account))
}; };
let dis = self.dispatcher.clone(); let dis = self.dispatcher.clone();
future::done(setup()) future::done(setup())
.and_then(move |(accounts, default)| { .and_then(move |(accounts, default)| {
dispatch::from_rpc(payload, default, &dis) dispatch::from_rpc(payload, default, &dis)
.and_then(|payload| { .and_then(move |payload| {
dispatch::execute(&dis, &accounts, payload, dispatch::SignWith::Nothing) dispatch::execute(dis, &accounts, payload, dispatch::SignWith::Nothing)
}) })
.map(|v| v.into_value()) .map(|v| v.into_value())
}) })

View File

@ -16,6 +16,7 @@
//! Parity Signer-related rpc interface. //! Parity Signer-related rpc interface.
use jsonrpc_core::Error; use jsonrpc_core::Error;
use futures::BoxFuture;
use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken}; use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken};
@ -28,12 +29,12 @@ build_rpc_trait! {
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error>; fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error>;
/// Confirm specific request. /// Confirm specific request.
#[rpc(name = "signer_confirmRequest")] #[rpc(async, name = "signer_confirmRequest")]
fn confirm_request(&self, U256, TransactionModification, String) -> Result<ConfirmationResponse, Error>; fn confirm_request(&self, U256, TransactionModification, String) -> BoxFuture<ConfirmationResponse, Error>;
/// Confirm specific request with token. /// Confirm specific request with token.
#[rpc(name = "signer_confirmRequestWithToken")] #[rpc(async, name = "signer_confirmRequestWithToken")]
fn confirm_request_with_token(&self, U256, TransactionModification, String) -> Result<ConfirmationResponseWithToken, Error>; fn confirm_request_with_token(&self, U256, TransactionModification, String) -> BoxFuture<ConfirmationResponseWithToken, Error>;
/// Confirm specific request with already signed data. /// Confirm specific request with already signed data.
#[rpc(name = "signer_confirmRequestRaw")] #[rpc(name = "signer_confirmRequestRaw")]