dispatcher abstraction, port most things to it

This commit is contained in:
Robert Habermeier 2017-02-08 15:36:53 +01:00
parent 4bb45c4f64
commit e73ea80954
9 changed files with 405 additions and 324 deletions

View File

@ -472,6 +472,12 @@ impl PendingTransaction {
} }
} }
impl Deref for PendingTransaction {
type Target = SignedTransaction;
fn deref(&self) -> &SignedTransaction { &self.transaction }
}
impl From<SignedTransaction> for PendingTransaction { impl From<SignedTransaction> for PendingTransaction {
fn from(t: SignedTransaction) -> Self { fn from(t: SignedTransaction) -> Self {
PendingTransaction { PendingTransaction {

View File

@ -16,6 +16,9 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Weak;
use futures::{future, Future, BoxFuture};
use rlp; use rlp;
use util::{Address, H520, H256, U256, Uint, Bytes}; use util::{Address, H520, H256, U256, Uint, Bytes};
use util::bytes::ToPretty; use util::bytes::ToPretty;
@ -38,6 +41,118 @@ use v1::types::{
DecryptRequest as RpcDecryptRequest, DecryptRequest as RpcDecryptRequest,
}; };
/// Has the capability to dispatch, sign, and decrypt.
///
/// Requires a clone implementation, with the implication that it be cheap;
/// usually just bumping a reference count or two.
pub trait Dispatcher: Send + Sync + Clone {
// TODO: when ATC exist, use zero-cost
// type Out<T>: IntoFuture<T, Error>
/// Fill optional fields of a transaction request, fetching gas price but not nonce.
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address)
-> BoxFuture<FilledTransactionRequest, Error>;
/// Sign the given transaction request without dispatching, fetching appropriate nonce.
fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>;
/// "Dispatch" a local transaction.
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error>;
}
/// A dispatcher which uses references to a client and miner in order to sign
/// requests locally.
#[derive(Debug)]
pub struct FullDispatcher<C, M> {
client: Weak<C>,
miner: Weak<M>,
}
impl<C, M> FullDispatcher<C, M> {
/// Create a `FullDispatcher` from weak references to a client and miner.
pub fn new(client: Weak<C>, miner: Weak<M>) -> Self {
FullDispatcher {
client: client,
miner: miner,
}
}
}
impl<C, M> Clone for FullDispatcher<C, M> {
fn clone(&self) -> Self {
FullDispatcher {
client: self.client.clone(),
miner: self.miner.clone(),
}
}
}
impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C, M> {
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address)
-> BoxFuture<FilledTransactionRequest, Error>
{
let inner = move || {
let (client, miner) = (take_weak!(self.client), take_weak!(self.miner));
Ok(FilledTransactionRequest {
from: request.from.unwrap_or(default_sender),
used_default_from: request.from.is_none(),
to: request.to,
nonce: request.nonce,
gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(client, miner)),
gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()),
value: request.value.unwrap_or_else(|| 0.into()),
data: request.data.unwrap_or_else(Vec::new),
condition: request.condition,
})
};
future::done(inner()).boxed()
}
fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>
{
let inner = move || {
let (client, miner) = (take_weak!(self.client), take_weak!(self.miner));
let network_id = client.signing_network_id();
let address = filled.from;
let signed_transaction = {
let t = Transaction {
nonce: filled.nonce
.or_else(|| miner
.last_nonce(&filled.from)
.map(|nonce| nonce + U256::one()))
.unwrap_or_else(|| client.latest_nonce(&filled.from)),
action: filled.to.map_or(Action::Create, Action::Call),
gas: filled.gas,
gas_price: filled.gas_price,
value: filled.value,
data: filled.data,
};
let hash = t.hash(network_id);
let signature = signature(accounts, address, hash, password)?;
signature.map(|sig| {
SignedTransaction::new(t.with_signature(sig, network_id))
.expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed")
})
};
Ok(signed_transaction)
};
future::done(inner()).boxed()
}
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error> {
let hash = signed_transaction.transaction.hash();
take_weak!(self.miner).import_own_transaction(take_weak!(self.client), signed_transaction)
.map_err(errors::from_transaction_error)
.map(|_| hash)
}
}
pub const DEFAULT_MAC: [u8; 2] = [0, 0]; pub const DEFAULT_MAC: [u8; 2] = [0, 0];
type AccountToken = String; type AccountToken = String;
@ -83,6 +198,14 @@ impl<T: Debug> WithToken<T> {
WithToken::Yes(v, _) => v, WithToken::Yes(v, _) => v,
} }
} }
/// Convert the `WithToken` into a tuple.
pub fn into_tuple(self) -> (T, Option<AccountToken>) {
match self {
WithToken::No(v) => (v, None),
WithToken::Yes(v, token) => (v, Some(token))
}
}
} }
impl<T: Debug> From<(T, AccountToken)> for WithToken<T> { impl<T: Debug> From<(T, AccountToken)> for WithToken<T> {
@ -91,26 +214,44 @@ impl<T: Debug> From<(T, AccountToken)> for WithToken<T> {
} }
} }
pub fn execute<C, M>(client: &C, miner: &M, accounts: &AccountProvider, payload: ConfirmationPayload, pass: SignWith) -> Result<WithToken<ConfirmationResponse>, Error> impl<T: Debug> From<(T, Option<AccountToken>)> for WithToken<T> {
where C: MiningBlockChainClient, M: MinerService fn from(tuple: (T, Option<AccountToken>)) -> Self {
{ match tuple.1 {
Some(token) => WithToken::Yes(tuple.0, token),
None => WithToken::No(tuple.0),
}
}
}
pub fn execute<D: Dispatcher>(
dispatcher: D,
accounts: &AccountProvider,
payload: ConfirmationPayload,
pass: SignWith
) -> BoxFuture<WithToken<ConfirmationResponse>, Error> {
match payload { match payload {
ConfirmationPayload::SendTransaction(request) => { ConfirmationPayload::SendTransaction(request) => {
sign_and_dispatch(client, miner, accounts, request, pass) let condition = request.condition.clone();
.map(|result| result dispatcher.sign(accounts, request, pass)
.map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition)))
.map(WithToken::into_tuple)
.map(|(tx, token)| (tx, token, dispatcher))
.and_then(|(tx, tok, dispatcher)| {
dispatcher.dispatch_transaction(tx)
.map(RpcH256::from) .map(RpcH256::from)
.map(ConfirmationResponse::SendTransaction) .map(ConfirmationResponse::SendTransaction)
) .map(move |h| WithToken::from((h, tok)))
}).boxed()
}, },
ConfirmationPayload::SignTransaction(request) => { ConfirmationPayload::SignTransaction(request) => {
sign_no_dispatch(client, miner, accounts, request, pass) dispatcher.sign(accounts, request, pass)
.map(|result| result .map(|result| result
.map(RpcRichRawTransaction::from) .map(RpcRichRawTransaction::from)
.map(ConfirmationResponse::SignTransaction) .map(ConfirmationResponse::SignTransaction)
) ).boxed()
}, },
ConfirmationPayload::Signature(address, data) => { ConfirmationPayload::Signature(address, data) => {
signature(accounts, address, data.sha3(), pass) let res = signature(accounts, address, data.sha3(), pass)
.map(|result| result .map(|result| result
.map(|rsv| { .map(|rsv| {
let mut vrs = [0u8; 65]; let mut vrs = [0u8; 65];
@ -122,14 +263,16 @@ pub fn execute<C, M>(client: &C, miner: &M, accounts: &AccountProvider, payload:
}) })
.map(RpcH520::from) .map(RpcH520::from)
.map(ConfirmationResponse::Signature) .map(ConfirmationResponse::Signature)
) );
future::done(res).boxed()
}, },
ConfirmationPayload::Decrypt(address, data) => { ConfirmationPayload::Decrypt(address, data) => {
decrypt(accounts, address, data, pass) let res = decrypt(accounts, address, data, pass)
.map(|result| result .map(|result| result
.map(RpcBytes) .map(RpcBytes)
.map(ConfirmationResponse::Decrypt) .map(ConfirmationResponse::Decrypt)
) );
future::done(res).boxed()
}, },
} }
} }
@ -156,105 +299,34 @@ fn decrypt(accounts: &AccountProvider, address: Address, msg: Bytes, password: S
}) })
} }
pub fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: PendingTransaction) -> Result<H256, Error> /// Extract default gas price from a client and miner.
where C: MiningBlockChainClient, M: MinerService {
let hash = signed_transaction.transaction.hash();
miner.import_own_transaction(client, signed_transaction)
.map_err(errors::from_transaction_error)
.map(|_| hash)
}
pub fn sign_no_dispatch<C, M>(client: &C, miner: &M, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) -> Result<WithToken<SignedTransaction>, Error>
where C: MiningBlockChainClient, M: MinerService {
let network_id = client.signing_network_id();
let address = filled.from;
let signed_transaction = {
let t = Transaction {
nonce: filled.nonce
.or_else(|| miner
.last_nonce(&filled.from)
.map(|nonce| nonce + U256::one()))
.unwrap_or_else(|| client.latest_nonce(&filled.from)),
action: filled.to.map_or(Action::Create, Action::Call),
gas: filled.gas,
gas_price: filled.gas_price,
value: filled.value,
data: filled.data,
};
let hash = t.hash(network_id);
let signature = signature(accounts, address, hash, password)?;
signature.map(|sig| {
SignedTransaction::new(t.with_signature(sig, network_id))
.expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed")
})
};
Ok(signed_transaction)
}
pub fn sign_and_dispatch<C, M>(client: &C, miner: &M, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) -> Result<WithToken<H256>, Error>
where C: MiningBlockChainClient, M: MinerService
{
let network_id = client.signing_network_id();
let condition = filled.condition.clone();
let signed_transaction = sign_no_dispatch(client, miner, accounts, filled, password)?;
let (signed_transaction, token) = match signed_transaction {
WithToken::No(signed_transaction) => (signed_transaction, None),
WithToken::Yes(signed_transaction, token) => (signed_transaction, Some(token)),
};
trace!(target: "miner", "send_transaction: dispatching tx: {} for network ID {:?}", rlp::encode(&signed_transaction).to_vec().pretty(), network_id);
let pending_transaction = PendingTransaction::new(signed_transaction, condition.map(Into::into));
dispatch_transaction(&*client, &*miner, pending_transaction).map(|hash| {
match token {
Some(ref token) => WithToken::Yes(hash, token.clone()),
None => WithToken::No(hash),
}
})
}
pub fn fill_optional_fields<C, M>(request: TransactionRequest, default_sender: Address, client: &C, miner: &M) -> FilledTransactionRequest
where C: MiningBlockChainClient, M: MinerService
{
FilledTransactionRequest {
from: request.from.unwrap_or(default_sender),
used_default_from: request.from.is_none(),
to: request.to,
nonce: request.nonce,
gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(client, miner)),
gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()),
value: request.value.unwrap_or_else(|| 0.into()),
data: request.data.unwrap_or_else(Vec::new),
condition: request.condition,
}
}
pub fn default_gas_price<C, M>(client: &C, miner: &M) -> U256 pub fn default_gas_price<C, M>(client: &C, miner: &M) -> U256
where C: MiningBlockChainClient, M: MinerService where C: MiningBlockChainClient, M: MinerService
{ {
client.gas_price_median(100).unwrap_or_else(|| miner.sensible_gas_price()) client.gas_price_median(100).unwrap_or_else(|| miner.sensible_gas_price())
} }
pub fn from_rpc<C, M>(payload: RpcConfirmationPayload, default_account: Address, client: &C, miner: &M) -> ConfirmationPayload /// Convert RPC confirmation payload to signer confirmation payload.
where C: MiningBlockChainClient, M: MinerService { /// May need to resolve in the future to fetch things like gas price.
pub fn from_rpc<D>(payload: RpcConfirmationPayload, default_account: Address, dispatcher: &D) -> BoxFuture<ConfirmationPayload, Error>
where D: Dispatcher
{
match payload { match payload {
RpcConfirmationPayload::SendTransaction(request) => { RpcConfirmationPayload::SendTransaction(request) => {
ConfirmationPayload::SendTransaction(fill_optional_fields(request.into(), default_account, client, miner)) dispatcher.fill_optional_fields(request.into(), default_account)
.map(ConfirmationPayload::SendTransaction)
.boxed()
}, },
RpcConfirmationPayload::SignTransaction(request) => { RpcConfirmationPayload::SignTransaction(request) => {
ConfirmationPayload::SignTransaction(fill_optional_fields(request.into(), default_account, client, miner)) dispatcher.fill_optional_fields(request.into(), default_account)
.map(ConfirmationPayload::SignTransaction)
.boxed()
}, },
RpcConfirmationPayload::Decrypt(RpcDecryptRequest { address, msg }) => { RpcConfirmationPayload::Decrypt(RpcDecryptRequest { address, msg }) => {
ConfirmationPayload::Decrypt(address.into(), msg.into()) future::ok(ConfirmationPayload::Decrypt(address.into(), msg.into())).boxed()
}, },
RpcConfirmationPayload::Signature(RpcSignRequest { address, data }) => { RpcConfirmationPayload::Signature(RpcSignRequest { address, data }) => {
ConfirmationPayload::Signature(address.into(), data.into()) future::ok(ConfirmationPayload::Signature(address.into(), data.into())).boxed()
}, },
} }
} }

View File

@ -16,15 +16,6 @@
//! Ethereum rpc interface implementation. //! Ethereum rpc interface implementation.
macro_rules! take_weak {
($weak: expr) => {
match $weak.upgrade() {
Some(arc) => arc,
None => return Err(Error::internal_error())
}
}
}
mod eth; mod eth;
mod eth_filter; mod eth_filter;
mod net; mod net;

View File

@ -20,46 +20,37 @@ use std::sync::{Arc, Weak};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::client::MiningBlockChainClient; use ethcore::client::MiningBlockChainClient;
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use util::{Address, U128, Uint}; use ethcore::transaction::PendingTransaction;
use futures::{self, Future, BoxFuture}; use util::{Address, U128, Uint, ToPretty};
use futures::{self, future, Future, BoxFuture};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::errors; use v1::helpers::errors;
use v1::helpers::dispatch::{self, sign_and_dispatch}; use v1::helpers::dispatch::{Dispatcher, SignWith};
use v1::traits::Personal; use v1::traits::Personal;
use v1::types::{H160 as RpcH160, H256 as RpcH256, U128 as RpcU128, TransactionRequest}; use v1::types::{H160 as RpcH160, H256 as RpcH256, U128 as RpcU128, TransactionRequest};
use v1::metadata::Metadata; use v1::metadata::Metadata;
/// Account management (personal) rpc implementation. /// Account management (personal) rpc implementation.
pub struct PersonalClient<C, M> where pub struct PersonalClient<D: Dispatcher> {
C: MiningBlockChainClient,
M: MinerService,
{
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D,
miner: Weak<M>,
allow_perm_unlock: bool, allow_perm_unlock: bool,
} }
impl<C, M> PersonalClient<C, M> where impl<D: Dispatcher> PersonalClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
/// Creates new PersonalClient /// Creates new PersonalClient
pub fn new(store: &Arc<AccountProvider>, client: &Arc<C>, miner: &Arc<M>, allow_perm_unlock: bool) -> Self { pub fn new(store: &Arc<AccountProvider>, dispatcher: D, allow_perm_unlock: bool) -> Self {
PersonalClient { PersonalClient {
accounts: Arc::downgrade(store), accounts: Arc::downgrade(store),
client: Arc::downgrade(client), dispatcher: dispatcher,
miner: Arc::downgrade(miner),
allow_perm_unlock: allow_perm_unlock, allow_perm_unlock: allow_perm_unlock,
} }
} }
} }
impl<C, M> Personal for PersonalClient<C, M> where impl<D: Dispatcher + 'static> Personal for PersonalClient<D> {
C: MiningBlockChainClient + 'static,
M: MinerService + 'static,
{
type Metadata = Metadata; type Metadata = Metadata;
fn accounts(&self) -> Result<Vec<RpcH160>, Error> { fn accounts(&self) -> Result<Vec<RpcH160>, Error> {
@ -106,28 +97,41 @@ impl<C, M> Personal for PersonalClient<C, M> where
} }
fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256, Error> { fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256, Error> {
let sign_and_send = move || { let setup = || {
let client = take_weak!(self.client); let dispatcher = self.dispatcher.clone();
let miner = take_weak!(self.miner);
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
let default_account = match request.from { Ok((accounts, dispatcher))
Some(ref account) => account.clone().into(), };
future::done(setup())
.and_then(move |(accounts, dispatcher)| {
let default = match request.from.as_ref() {
Some(account) => Ok(account.clone().into()),
None => accounts None => accounts
.default_address(meta.dapp_id.unwrap_or_default().into()) .default_address(meta.dapp_id.unwrap_or_default().into())
.map_err(|e| errors::account("Cannot find default account.", e))?, .map_err(|e| errors::account("Cannot find default account.", e)),
}; };
let request = dispatch::fill_optional_fields(request.into(), default_account, &*client, &*miner); let dis = dispatcher.clone();
sign_and_dispatch( future::done(default)
&*client, .and_then(move |default| dis.fill_optional_fields(request.into(), default))
&*miner, .map(move |tx| (tx, accounts, dispatcher))
&*accounts, })
request, .and_then(move |(filled, accounts, dispatcher)| {
dispatch::SignWith::Password(password) let condition = filled.condition.clone().map(Into::into);
).map(|v| v.into_value().into()) dispatcher.sign(&accounts, filled, SignWith::Password(password))
}; .map(|tx| tx.into_value())
.map(move |tx| PendingTransaction::new(tx, condition))
.map(move |tx| (tx, dispatcher))
})
.and_then(move |(pending_tx, dispatcher)| {
let network_id = pending_tx.network_id();
trace!(target: "miner", "send_transaction: dispatching tx: {} for network ID {:?}",
::rlp::encode(&*pending_tx).to_vec().pretty(), network_id);
futures::done(sign_and_send()).boxed() dispatcher.dispatch_transaction(pending_tx).map(Into::into)
})
.boxed()
} }
} }

View File

@ -24,13 +24,14 @@ use ethcore::account_provider::AccountProvider;
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient; use ethcore::client::MiningBlockChainClient;
use futures::{self, BoxFuture, Future}; use futures::{self, future, BoxFuture, Future};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::{ use v1::helpers::{
errors, dispatch, errors,
DefaultAccount, DefaultAccount,
SigningQueue, ConfirmationPromise, ConfirmationResult, ConfirmationPayload, SignerService SigningQueue, ConfirmationPromise, ConfirmationResult, ConfirmationPayload, SignerService
}; };
use v1::helpers::dispatch::{self, Dispatcher};
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::traits::{EthSigning, ParitySigning}; use v1::traits::{EthSigning, ParitySigning};
use v1::types::{ use v1::types::{
@ -50,31 +51,14 @@ enum DispatchResult {
} }
/// Implementation of functions that require signing when no trusted signer is used. /// Implementation of functions that require signing when no trusted signer is used.
pub struct SigningQueueClient<C, M> where C: MiningBlockChainClient, M: MinerService { pub struct SigningQueueClient<D> {
signer: Weak<SignerService>, signer: Weak<SignerService>,
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D,
miner: Weak<M>, pending: Arc<Mutex<TransientHashMap<U256, ConfirmationPromise>>>,
pending: Mutex<TransientHashMap<U256, ConfirmationPromise>>,
} }
impl<C, M> SigningQueueClient<C, M> where fn handle_dispatch<OnResponse>(res: Result<DispatchResult, Error>, on_response: OnResponse)
C: MiningBlockChainClient,
M: MinerService,
{
/// Creates a new signing queue client given shared signing queue.
pub fn new(signer: &Arc<SignerService>, client: &Arc<C>, miner: &Arc<M>, accounts: &Arc<AccountProvider>) -> Self {
SigningQueueClient {
signer: Arc::downgrade(signer),
accounts: Arc::downgrade(accounts),
client: Arc::downgrade(client),
miner: Arc::downgrade(miner),
pending: Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION)),
}
}
fn handle_dispatch<OnResponse>(&self, res: Result<DispatchResult, Error>, on_response: OnResponse)
where OnResponse: FnOnce(Result<RpcConfirmationResponse, Error>) + Send + 'static where OnResponse: FnOnce(Result<RpcConfirmationResponse, Error>) + Send + 'static
{ {
match res { match res {
@ -88,67 +72,76 @@ impl<C, M> SigningQueueClient<C, M> where
} }
} }
fn add_to_queue(&self, payload: ConfirmationPayload) -> Result<DispatchResult, Error> { impl<D: Dispatcher> SigningQueueClient<D> {
let client = take_weak!(self.client); /// Creates a new signing queue client given shared signing queue.
let miner = take_weak!(self.miner); pub fn new(signer: &Arc<SignerService>, dispatcher: D, accounts: &Arc<AccountProvider>) -> Self {
SigningQueueClient {
signer: Arc::downgrade(signer),
accounts: Arc::downgrade(accounts),
dispatcher: dispatcher,
pending: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION))),
}
}
fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> BoxFuture<DispatchResult, Error> {
let setup = || {
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
let sender = payload.sender();
if accounts.is_unlocked(sender) {
return dispatch::execute(&*client, &*miner, &*accounts, payload, dispatch::SignWith::Nothing)
.map(|v| v.into_value())
.map(DispatchResult::Value);
}
take_weak!(self.signer).add_request(payload)
.map(DispatchResult::Promise)
.map_err(|_| errors::request_rejected_limit())
}
fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> Result<DispatchResult, Error> {
let client = take_weak!(self.client);
let miner = take_weak!(self.miner);
let default_account = match default_account { let default_account = match default_account {
DefaultAccount::Provided(acc) => acc, DefaultAccount::Provided(acc) => acc,
DefaultAccount::ForDapp(dapp) => take_weak!(self.accounts).default_address(dapp).ok().unwrap_or_default(), DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(),
}; };
let payload = dispatch::from_rpc(payload, default_account, &*client, &*miner);
self.add_to_queue(payload) (self.dispatcher.clone(), accounts, default_account)
};
let weak_signer = self.signer.clone();
future::done(setup)
.and_then(move |(dispatcher, accounts, default_account)| {
dispatch::from_rpc(payload, default_account, &dispatcher)
.and_then(move |payload| {
let sender = payload.sender();
if accounts.is_unlocked(sender) {
dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing)
.boxed()
} else {
future::lazy(move || take_weak!(weak_signer).add_request(payload))
.map(DispatchResult::Promise)
.map_err(|_| errors::request_rejected_limit())
.boxed()
}
})
})
.boxed()
} }
} }
impl<C: 'static, M: 'static> ParitySigning for SigningQueueClient<C, M> where impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
type Metadata = Metadata; type Metadata = Metadata;
fn post_sign(&self, address: RpcH160, data: RpcBytes) -> Result<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
let pending = self.pending.clone();
self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), DefaultAccount::Provided(address.into())) self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), DefaultAccount::Provided(address.into()))
.map(|result| match result { .map(move |result| match result {
DispatchResult::Value(v) => RpcEither::Or(v), DispatchResult::Value(v) => RpcEither::Or(v),
DispatchResult::Promise(promise) => { DispatchResult::Promise(promise) => {
let id = promise.id(); let id = promise.id();
self.pending.lock().insert(id, promise); pending.lock().insert(id, promise);
RpcEither::Either(id.into()) RpcEither::Either(id.into())
}, },
}) })
} }
fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
let post_transaction = move || { let pending = self.pending.clone();
self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()) self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into())
.map(|result| match result { .map(move |result| match result {
DispatchResult::Value(v) => RpcEither::Or(v), DispatchResult::Value(v) => RpcEither::Or(v),
DispatchResult::Promise(promise) => { DispatchResult::Promise(promise) => {
let id = promise.id(); let id = promise.id();
self.pending.lock().insert(id, promise); pending.lock().insert(id, promise);
RpcEither::Either(id.into()) RpcEither::Either(id.into())
}, },
}) })
};
futures::done(post_transaction()).boxed()
} }
fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> { fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> {
@ -170,8 +163,11 @@ impl<C: 'static, M: 'static> ParitySigning for SigningQueueClient<C, M> where
let res = self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()); let res = self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
// TODO [todr] typed handle_dispatch
self.handle_dispatch(res, |response| { // when dispatch is complete
res.then(move |res| {
// register callback via the oneshot sender.
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)), Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
@ -179,36 +175,40 @@ impl<C: 'static, M: 'static> ParitySigning for SigningQueueClient<C, M> where
} }
}); });
// and wait for that to resolve.
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed()
}).boxed()
} }
} }
impl<C: 'static, M: 'static> EthSigning for SigningQueueClient<C, M> where impl<D: Dispatcher + 'static> EthSigning for SigningQueueClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
type Metadata = Metadata; type Metadata = Metadata;
fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> { fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> {
let res = self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()); let res = self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
self.handle_dispatch(res, |response| {
res.then(move |res| {
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::Signature(signature)) => ready.complete(Ok(signature)), Ok(RpcConfirmationResponse::Signature(sig)) => ready.complete(Ok(sig)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
e => ready.complete(Err(errors::internal("Unexpected result.", e))), e => ready.complete(Err(errors::internal("Unexpected result.", e))),
} }
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed()
}).boxed()
} }
fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> { fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> {
let res = self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()); let res = self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
self.handle_dispatch(res, |response| {
res.then(move |res| {
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)), Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
@ -217,13 +217,16 @@ impl<C: 'static, M: 'static> EthSigning for SigningQueueClient<C, M> where
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed()
}).boxed()
} }
fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> { fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> {
let res = self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into()); let res = self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
self.handle_dispatch(res, |response| {
res.then(move |res| {
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)), Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
@ -232,5 +235,6 @@ impl<C: 'static, M: 'static> EthSigning for SigningQueueClient<C, M> where
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed()
}).boxed()
} }
} }

View File

@ -22,9 +22,10 @@ use ethcore::account_provider::AccountProvider;
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient; use ethcore::client::MiningBlockChainClient;
use futures::{self, BoxFuture, Future}; use futures::{self, future, BoxFuture, Future};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::{errors, dispatch, DefaultAccount}; use v1::helpers::{errors, DefaultAccount};
use v1::helpers::dispatch::{self, Dispatcher};
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::traits::{EthSigning, ParitySigning}; use v1::traits::{EthSigning, ParitySigning};
use v1::types::{ use v1::types::{
@ -38,106 +39,100 @@ use v1::types::{
}; };
/// Implementation of functions that require signing when no trusted signer is used. /// Implementation of functions that require signing when no trusted signer is used.
pub struct SigningUnsafeClient<C, M> where pub struct SigningUnsafeClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D,
miner: Weak<M>,
} }
impl<C, M> SigningUnsafeClient<C, M> where impl<D: Dispatcher> SigningUnsafeClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
/// Creates new SigningUnsafeClient. /// Creates new SigningUnsafeClient.
pub fn new(client: &Arc<C>, accounts: &Arc<AccountProvider>, miner: &Arc<M>) pub fn new(accounts: &Arc<AccountProvider>, dispatcher: D) -> Self {
-> Self {
SigningUnsafeClient { SigningUnsafeClient {
client: Arc::downgrade(client),
miner: Arc::downgrade(miner),
accounts: Arc::downgrade(accounts), accounts: Arc::downgrade(accounts),
dispatcher: dispatcher,
} }
} }
fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> Result<RpcConfirmationResponse, Error> { fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> BoxFuture<RpcConfirmationResponse, Error> {
let client = take_weak!(self.client); let setup = move || {
let miner = take_weak!(self.miner);
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
let default_account = match account { let default_account = match account {
DefaultAccount::Provided(acc) => acc, DefaultAccount::Provided(acc) => acc,
DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(), DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(),
}; };
let payload = dispatch::from_rpc(payload, default_account, &*client, &*miner);
dispatch::execute(&*client, &*miner, &*accounts, payload, dispatch::SignWith::Nothing) (accounts, default_account)
};
let dis = self.dispatcher.clone();
future::done(setup())
.and_then(move |(accounts, default)| {
dispatch::from_rpc(payload, default, &dis)
.and_then(|payload| {
dispatch::execute(&dis, &accounts, payload, dispatch::SignWith::Nothing)
})
.map(|v| v.into_value()) .map(|v| v.into_value())
})
.boxed()
} }
} }
impl<C: 'static, M: 'static> EthSigning for SigningUnsafeClient<C, M> where impl<D: Dispatcher + 'static> EthSigning for SigningUnsafeClient<D>
C: MiningBlockChainClient,
M: MinerService,
{ {
type Metadata = Metadata; type Metadata = Metadata;
fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> { fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> {
let result = match self.handle(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()) { self.handle(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::Signature(signature)) => Ok(signature), Ok(RpcConfirmationResponse::Signature(signature)) => Ok(signature),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> { fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> {
let result = match self.handle(RpcConfirmationPayload::SendTransaction(request), meta.into()) { self.handle(RpcConfirmationPayload::SendTransaction(request), meta.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::SendTransaction(hash)) => Ok(hash), Ok(RpcConfirmationResponse::SendTransaction(hash)) => Ok(hash),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> { fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> {
let result = match self.handle(RpcConfirmationPayload::SignTransaction(request), meta.into()) { self.handle(RpcConfirmationPayload::SignTransaction(request), meta.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::SignTransaction(tx)) => Ok(tx), Ok(RpcConfirmationResponse::SignTransaction(tx)) => Ok(tx),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
} }
impl<C: 'static, M: 'static> ParitySigning for SigningUnsafeClient<C, M> where impl<D: Dispatcher + 'static> ParitySigning for SigningUnsafeClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
type Metadata = Metadata; type Metadata = Metadata;
fn decrypt_message(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcBytes, Error> { fn decrypt_message(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcBytes, Error> {
let result = match self.handle(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()) { self.handle(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::Decrypt(data)) => Ok(data), Ok(RpcConfirmationResponse::Decrypt(data)) => Ok(data),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
fn post_sign(&self, _: RpcH160, _: RpcBytes) -> Result<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_sign(&self, _: RpcH160, _: RpcBytes) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
// We don't support this in non-signer mode. // We don't support this in non-signer mode.
Err(errors::signer_disabled()) future::err(errors::signer_disabled()).boxed()
} }
fn post_transaction(&self, _: Metadata, _: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_transaction(&self, _: Metadata, _: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
// We don't support this in non-signer mode. // We don't support this in non-signer mode.
futures::done(Err(errors::signer_disabled())).boxed() future::err((errors::signer_disabled())).boxed()
} }
fn check_request(&self, _: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> { fn check_request(&self, _: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> {

View File

@ -18,6 +18,15 @@
//! //!
//! Compliant with ethereum rpc. //! Compliant with ethereum rpc.
macro_rules! take_weak {
($weak: expr) => {
match $weak.upgrade() {
Some(arc) => arc,
None => return Err(Error::internal_error())
}
}
}
#[macro_use] #[macro_use]
mod helpers; mod helpers;
mod impls; mod impls;

View File

@ -27,8 +27,8 @@ build_rpc_trait! {
/// Posts sign request asynchronously. /// Posts sign request asynchronously.
/// Will return a confirmation ID for later use with check_transaction. /// Will return a confirmation ID for later use with check_transaction.
#[rpc(name = "parity_postSign")] #[rpc(async, name = "parity_postSign")]
fn post_sign(&self, H160, Bytes) -> Result<Either<U256, ConfirmationResponse>, Error>; fn post_sign(&self, H160, Bytes) -> BoxFuture<Either<U256, ConfirmationResponse>, Error>;
/// Posts transaction asynchronously. /// Posts transaction asynchronously.
/// Will return a transaction ID for later use with check_transaction. /// Will return a transaction ID for later use with check_transaction.

View File

@ -164,7 +164,7 @@ mod tests {
logs_bloom: H2048::default(), logs_bloom: H2048::default(),
timestamp: U256::default(), timestamp: U256::default(),
difficulty: U256::default(), difficulty: U256::default(),
total_difficulty: U256::default(), total_difficulty: Some(U256::default()),
seal_fields: vec![Bytes::default(), Bytes::default()], seal_fields: vec![Bytes::default(), Bytes::default()],
uncles: vec![], uncles: vec![],
transactions: BlockTransactions::Hashes(vec![].into()), transactions: BlockTransactions::Hashes(vec![].into()),