diff --git a/Cargo.lock b/Cargo.lock index 8a53a21e3..0253f05e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2299,6 +2299,7 @@ dependencies = [ "ethstore 0.1.0", "ethsync 1.8.0", "fetch 0.1.0", + "futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hardware-wallet 1.8.0", "hash 0.1.0", diff --git a/ethcore/src/account_provider/mod.rs b/ethcore/src/account_provider/mod.rs index 3e1d1058c..6d7ab52cb 100755 --- a/ethcore/src/account_provider/mod.rs +++ b/ethcore/src/account_provider/mod.rs @@ -516,8 +516,8 @@ impl AccountProvider { } /// Returns each hardware account along with name and meta. - pub fn is_hardware_address(&self, address: Address) -> bool { - self.hardware_store.as_ref().and_then(|s| s.wallet_info(&address)).is_some() + pub fn is_hardware_address(&self, address: &Address) -> bool { + self.hardware_store.as_ref().and_then(|s| s.wallet_info(address)).is_some() } /// Returns each account along with name and meta. @@ -589,7 +589,7 @@ impl AccountProvider { } } - if self.unlock_keep_secret && unlock != Unlock::OneTime { + if self.unlock_keep_secret && unlock == Unlock::Perm { // verify password and get the secret let secret = self.sstore.raw_secret(&account, &password)?; self.unlocked_secrets.write().insert(account.clone(), secret); @@ -639,14 +639,22 @@ impl AccountProvider { } /// Checks if given account is unlocked - pub fn is_unlocked(&self, address: Address) -> bool { + pub fn is_unlocked(&self, address: &Address) -> bool { let unlocked = self.unlocked.read(); let unlocked_secrets = self.unlocked_secrets.read(); - self.sstore.account_ref(&address) + self.sstore.account_ref(address) .map(|r| unlocked.get(&r).is_some() || unlocked_secrets.get(&r).is_some()) .unwrap_or(false) } + /// Checks if given account is unlocked permanently + pub fn is_unlocked_permanently(&self, address: &Address) -> bool { + let unlocked = self.unlocked.read(); + self.sstore.account_ref(address) + .map(|r| unlocked.get(&r).map_or(false, |account| account.unlock == Unlock::Perm)) + .unwrap_or(false) + } + /// Signs the message. If password is not provided the account must be unlocked. pub fn sign(&self, address: Address, password: Option, message: Message) -> Result { let account = self.sstore.account_ref(&address)?; diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index f9f7e28d0..5e0342f3e 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -458,7 +458,7 @@ usage! { "--jsonrpc-hosts=[HOSTS]", "List of allowed Host header values. This option will validate the Host header sent by the browser, it is additional security against some attack vectors. Special options: \"all\", \"none\",.", - ARG arg_jsonrpc_threads: (usize) = 0usize, or |c: &Config| otry!(c.rpc).processing_threads, + ARG arg_jsonrpc_threads: (usize) = 4usize, or |c: &Config| otry!(c.rpc).processing_threads, "--jsonrpc-threads=[THREADS]", "Turn on additional processing threads in all RPC servers. Setting this to non-zero value allows parallel cpu-heavy queries execution.", @@ -1461,7 +1461,7 @@ mod tests { arg_jsonrpc_apis: "web3,eth,net,parity,traces,rpc,secretstore".into(), arg_jsonrpc_hosts: "none".into(), arg_jsonrpc_server_threads: None, - arg_jsonrpc_threads: 0, + arg_jsonrpc_threads: 4, // WS flag_no_ws: false, diff --git a/parity/rpc.rs b/parity/rpc.rs index da8986851..984709c15 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -62,7 +62,7 @@ impl Default for HttpConfiguration { cors: Some(vec![]), hosts: Some(vec![]), server_threads: 1, - processing_threads: 0, + processing_threads: 4, } } } diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index d348202a9..933b87922 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -15,8 +15,7 @@ // along with Parity. If not, see . use std::cmp::PartialEq; -use std::collections::BTreeMap; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::str::FromStr; use std::sync::{Arc, Weak}; @@ -239,10 +238,10 @@ impl FullDependencies { use parity_rpc::v1::*; macro_rules! add_signing_methods { - ($namespace:ident, $handler:expr, $deps:expr) => { + ($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { { let deps = &$deps; - let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone()); + let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), $nonces); if deps.signer_service.is_enabled() { $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store))) } else { @@ -252,7 +251,12 @@ impl FullDependencies { } } - let dispatcher = FullDispatcher::new(self.client.clone(), self.miner.clone()); + let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))); + let dispatcher = FullDispatcher::new( + self.client.clone(), + self.miner.clone(), + nonces.clone(), + ); for api in apis { match *api { Api::Web3 => { @@ -281,7 +285,7 @@ impl FullDependencies { let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); handler.extend_with(filter_client.to_delegate()); - add_signing_methods!(EthSigning, handler, self); + add_signing_methods!(EthSigning, handler, self, nonces.clone()); } }, Api::EthPubSub => { @@ -318,7 +322,7 @@ impl FullDependencies { ).to_delegate()); if !for_generic_pubsub { - add_signing_methods!(ParitySigning, handler, self); + add_signing_methods!(ParitySigning, handler, self, nonces.clone()); } }, Api::ParityPubSub => { @@ -435,6 +439,7 @@ impl LightDependencies { self.on_demand.clone(), self.cache.clone(), self.transaction_queue.clone(), + Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))), ); macro_rules! add_signing_methods { diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index ceb57639c..9f0e544ee 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -10,6 +10,7 @@ authors = ["Parity Technologies "] [dependencies] ansi_term = "0.9" cid = "0.2" +futures = "0.1" futures-cpupool = "0.1" log = "0.3" multihash ="0.6" diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index a4572835f..d5e4cebe8 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -66,6 +66,8 @@ extern crate stats; extern crate hash; extern crate hardware_wallet; +#[macro_use] +extern crate futures; #[macro_use] extern crate log; #[macro_use] diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index d84bd6cad..35e16f573 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -43,9 +43,9 @@ use ethcore::account_provider::AccountProvider; use crypto::DEFAULT_MAC; use jsonrpc_core::{BoxFuture, Error}; -use jsonrpc_core::futures::{future, Future}; +use jsonrpc_core::futures::{future, Future, Poll, Async}; use jsonrpc_core::futures::future::Either; -use v1::helpers::{errors, TransactionRequest, FilledTransactionRequest, ConfirmationPayload}; +use v1::helpers::{errors, nonce, TransactionRequest, FilledTransactionRequest, ConfirmationPayload}; use v1::types::{ H256 as RpcH256, H520 as RpcH520, Bytes as RpcBytes, RichRawTransaction as RpcRichRawTransaction, @@ -55,6 +55,8 @@ use v1::types::{ DecryptRequest as RpcDecryptRequest, }; +pub use self::nonce::Reservations; + /// Has the capability to dispatch, sign, and decrypt. /// /// Requires a clone implementation, with the implication that it be cheap; @@ -75,7 +77,8 @@ pub trait Dispatcher: Send + Sync + Clone { fn enrich(&self, SignedTransaction) -> RpcRichRawTransaction; /// "Dispatch" a local transaction. - fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result; + fn dispatch_transaction(&self, signed_transaction: PendingTransaction) + -> Result; } /// A dispatcher which uses references to a client and miner in order to sign @@ -84,14 +87,20 @@ pub trait Dispatcher: Send + Sync + Clone { pub struct FullDispatcher { client: Arc, miner: Arc, + nonces: Arc>, } impl FullDispatcher { /// Create a `FullDispatcher` from Arc references to a client and miner. - pub fn new(client: Arc, miner: Arc) -> Self { + pub fn new( + client: Arc, + miner: Arc, + nonces: Arc>, + ) -> Self { FullDispatcher { client, miner, + nonces, } } } @@ -101,15 +110,24 @@ impl Clone for FullDispatcher { FullDispatcher { client: self.client.clone(), miner: self.miner.clone(), + nonces: self.nonces.clone(), } } } impl FullDispatcher { - fn fill_nonce(nonce: Option, from: &Address, miner: &M, client: &C) -> U256 { - nonce - .or_else(|| miner.last_nonce(from).map(|nonce| nonce + U256::one())) - .unwrap_or_else(|| client.latest_nonce(from)) + fn state_nonce(&self, from: &Address) -> U256 { + self.miner.last_nonce(from).map(|nonce| nonce + U256::one()) + .unwrap_or_else(|| self.client.latest_nonce(from)) + } + + /// Imports transaction to the miner's queue. + pub fn dispatch_transaction(client: &C, miner: &M, signed_transaction: PendingTransaction) -> Result { + let hash = signed_transaction.transaction.hash(); + + miner.import_own_transaction(client, signed_transaction) + .map_err(errors::transaction) + .map(|_| hash) } } @@ -117,20 +135,21 @@ impl Dispatcher for FullDispatcher BoxFuture { - let (client, miner) = (self.client.clone(), self.miner.clone()); let request = request; let from = request.from.unwrap_or(default_sender); - let nonce = match force_nonce { - false => request.nonce, - true => Some(Self::fill_nonce(request.nonce, &from, &miner, &client)), + let nonce = if force_nonce { + request.nonce.or_else(|| Some(self.state_nonce(&from))) + } else { + request.nonce }; + Box::new(future::ok(FilledTransactionRequest { - from: from, + from, used_default_from: request.from.is_none(), to: request.to, - nonce: nonce, - gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(&*client, &*miner)), - gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()), + nonce, + gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(&*self.client, &*self.miner)), + gas: request.gas.unwrap_or_else(|| self.miner.sensible_gas_limit()), value: request.value.unwrap_or_else(|| 0.into()), data: request.data.unwrap_or_else(Vec::new), condition: request.condition, @@ -140,30 +159,16 @@ impl Dispatcher for FullDispatcher, filled: FilledTransactionRequest, password: SignWith) -> BoxFuture, Error> { - let (client, miner) = (self.client.clone(), self.miner.clone()); - let chain_id = client.signing_chain_id(); - let address = filled.from; - Box::new(future::done({ - let t = Transaction { - nonce: Self::fill_nonce(filled.nonce, &filled.from, &miner, &client), - action: filled.to.map_or(Action::Create, Action::Call), - gas: filled.gas, - gas_price: filled.gas_price, - value: filled.value, - data: filled.data, - }; + let chain_id = self.client.signing_chain_id(); - if accounts.is_hardware_address(address) { - hardware_signature(&*accounts, address, t, chain_id).map(WithToken::No) - } else { - let hash = t.hash(chain_id); - let signature = try_bf!(signature(&*accounts, address, hash, password)); - Ok(signature.map(|sig| { - SignedTransaction::new(t.with_signature(sig, chain_id)) - .expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed") - })) - } - })) + if let Some(nonce) = filled.nonce { + return Box::new(future::done(sign_transaction(&*accounts, filled, chain_id, nonce, password))); + } + + let state = self.state_nonce(&filled.from); + let reserved = self.nonces.lock().reserve(filled.from, state); + + Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password)) } fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction { @@ -172,11 +177,7 @@ impl Dispatcher for FullDispatcher Result { - let hash = signed_transaction.transaction.hash(); - - self.miner.import_own_transaction(&*self.client, signed_transaction) - .map_err(errors::transaction) - .map(|_| hash) + Self::dispatch_transaction(&*self.client, &*self.miner, signed_transaction) } } @@ -255,6 +256,8 @@ pub struct LightDispatcher { pub cache: Arc>, /// Transaction queue. pub transaction_queue: Arc>, + /// Nonce reservations + pub nonces: Arc>, } impl LightDispatcher { @@ -267,6 +270,7 @@ impl LightDispatcher { on_demand: Arc, cache: Arc>, transaction_queue: Arc>, + nonces: Arc>, ) -> Self { LightDispatcher { sync, @@ -274,6 +278,7 @@ impl LightDispatcher { on_demand, cache, transaction_queue, + nonces, } } @@ -372,39 +377,20 @@ impl Dispatcher for LightDispatcher { -> BoxFuture, Error> { let chain_id = self.client.signing_chain_id(); - let address = filled.from; - - let with_nonce = move |filled: FilledTransactionRequest, nonce| { - let t = Transaction { - nonce: nonce, - action: filled.to.map_or(Action::Create, Action::Call), - gas: filled.gas, - gas_price: filled.gas_price, - value: filled.value, - data: filled.data, - }; - - if accounts.is_hardware_address(address) { - return hardware_signature(&*accounts, address, t, chain_id).map(WithToken::No) - } - - let hash = t.hash(chain_id); - let signature = signature(&*accounts, address, hash, password)?; - - Ok(signature.map(|sig| { - SignedTransaction::new(t.with_signature(sig, chain_id)) - .expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed") - })) - }; // fast path for pre-filled nonce. if let Some(nonce) = filled.nonce { - return Box::new(future::done(with_nonce(filled, nonce))) + return Box::new(future::done(sign_transaction(&*accounts, filled, chain_id, nonce, password))) } - Box::new(self.next_nonce(address) + let nonces = self.nonces.clone(); + Box::new(self.next_nonce(filled.from) .map_err(|_| errors::no_light_peers()) - .and_then(move |nonce| with_nonce(filled, nonce))) + .and_then(move |nonce| { + let reserved = nonces.lock().reserve(filled.from, nonce); + + ProspectiveSigner::new(accounts, filled, chain_id, reserved, password) + })) } fn enrich(&self, signed_transaction: SignedTransaction) -> RpcRichRawTransaction { @@ -422,6 +408,147 @@ impl Dispatcher for LightDispatcher { } } +fn sign_transaction( + accounts: &AccountProvider, + filled: FilledTransactionRequest, + chain_id: Option, + nonce: U256, + password: SignWith, +) -> Result, Error> { + let t = Transaction { + nonce: nonce, + action: filled.to.map_or(Action::Create, Action::Call), + gas: filled.gas, + gas_price: filled.gas_price, + value: filled.value, + data: filled.data, + }; + + if accounts.is_hardware_address(&filled.from) { + return hardware_signature(accounts, filled.from, t, chain_id).map(WithToken::No) + } + + let hash = t.hash(chain_id); + let signature = signature(accounts, filled.from, hash, password)?; + + Ok(signature.map(|sig| { + SignedTransaction::new(t.with_signature(sig, chain_id)) + .expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed") + })) +} + +#[derive(Debug, Clone, Copy)] +enum ProspectiveSignerState { + TryProspectiveSign, + WaitForNonce, + Finish, +} + +struct ProspectiveSigner { + accounts: Arc, + filled: FilledTransactionRequest, + chain_id: Option, + reserved: nonce::Reserved, + password: SignWith, + state: ProspectiveSignerState, + prospective: Option, Error>>, + ready: Option, +} + +impl ProspectiveSigner { + pub fn new( + accounts: Arc, + filled: FilledTransactionRequest, + chain_id: Option, + reserved: nonce::Reserved, + password: SignWith, + ) -> Self { + // If the account is permanently unlocked we can try to sign + // using prospective nonce. This should speed up sending + // multiple subsequent transactions in multi-threaded RPC environment. + let is_unlocked_permanently = accounts.is_unlocked_permanently(&filled.from); + let has_password = password.is_password(); + + ProspectiveSigner { + accounts, + filled, + chain_id, + reserved, + password, + state: if is_unlocked_permanently || has_password { + ProspectiveSignerState::TryProspectiveSign + } else { + ProspectiveSignerState::WaitForNonce + }, + prospective: None, + ready: None, + } + } + + fn sign(&self, nonce: &U256) -> Result, Error> { + sign_transaction( + &*self.accounts, + self.filled.clone(), + self.chain_id, + *nonce, + self.password.clone() + ) + } + + fn poll_reserved(&mut self) -> Poll { + self.reserved.poll().map_err(|_| errors::internal("Nonce reservation failure", "")) + } +} + +impl Future for ProspectiveSigner { + type Item = WithToken; + type Error = Error; + + fn poll(&mut self) -> Poll { + use self::ProspectiveSignerState::*; + + loop { + match self.state { + TryProspectiveSign => { + // Try to poll reserved, it might be ready. + match self.poll_reserved()? { + Async::NotReady => { + self.state = WaitForNonce; + self.prospective = Some(self.sign(self.reserved.prospective_value())); + }, + Async::Ready(nonce) => { + self.state = Finish; + self.prospective = Some(self.sign(nonce.value())); + self.ready = Some(nonce); + }, + } + }, + WaitForNonce => { + let nonce = try_ready!(self.poll_reserved()); + let result = match (self.prospective.take(), nonce.matches_prospective()) { + (Some(prospective), true) => prospective, + _ => self.sign(nonce.value()), + }; + self.state = Finish; + self.prospective = Some(result); + self.ready = Some(nonce); + }, + Finish => { + if let (Some(result), Some(nonce)) = (self.prospective.take(), self.ready.take()) { + // Mark nonce as used on successful signing + return result.map(move |tx| { + nonce.mark_used(); + Async::Ready(tx) + }) + } else { + panic!("Poll after ready."); + } + } + } + } + } +} + /// Single-use account token. pub type AccountToken = String; @@ -436,6 +563,16 @@ pub enum SignWith { Token(AccountToken), } +impl SignWith { + fn is_password(&self) -> bool { + if let SignWith::Password(_) = *self { + true + } else { + false + } + } +} + /// A value, potentially accompanied by a signing token. #[derive(Debug)] pub enum WithToken { @@ -529,7 +666,7 @@ pub fn execute( )) }, ConfirmationPayload::EthSignMessage(address, data) => { - if accounts.is_hardware_address(address) { + if accounts.is_hardware_address(&address) { return Box::new(future::err(errors::unsupported("Signing via hardware wallets is not supported.", None))); } @@ -543,7 +680,7 @@ pub fn execute( Box::new(future::done(res)) }, ConfirmationPayload::Decrypt(address, data) => { - if accounts.is_hardware_address(address) { + if accounts.is_hardware_address(&address) { return Box::new(future::err(errors::unsupported("Decrypting via hardware wallets is not supported.", None))); } @@ -572,7 +709,7 @@ fn signature(accounts: &AccountProvider, address: Address, hash: H256, password: fn hardware_signature(accounts: &AccountProvider, address: Address, t: Transaction, chain_id: Option) -> Result { - debug_assert!(accounts.is_hardware_address(address)); + debug_assert!(accounts.is_hardware_address(&address)); let mut stream = rlp::RlpStream::new(); t.rlp_append_unsigned_transaction(&mut stream, chain_id); diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index f330c75eb..b5deab08b 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -22,14 +22,15 @@ pub mod block_import; pub mod dapps; pub mod dispatch; pub mod fake_sign; -pub mod light_fetch; -pub mod oneshot; pub mod ipfs; +pub mod light_fetch; +pub mod nonce; +pub mod oneshot; pub mod secretstore; mod network_settings; -mod poll_manager; mod poll_filter; +mod poll_manager; mod requests; mod signer; mod signing_queue; diff --git a/rpc/src/v1/helpers/nonce.rs b/rpc/src/v1/helpers/nonce.rs new file mode 100644 index 000000000..30ad15211 --- /dev/null +++ b/rpc/src/v1/helpers/nonce.rs @@ -0,0 +1,315 @@ +// Copyright 2015-2017 harity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::{cmp, mem}; +use std::collections::HashMap; +use std::sync::{atomic, Arc}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; + +use bigint::prelude::U256; +use futures::{Future, future, Poll, Async}; +use futures::future::Either; +use futures::sync::oneshot; +use futures_cpupool::CpuPool; +use util::Address; + +/// Manages currently reserved and prospective nonces +/// for multiple senders. +#[derive(Debug)] +pub struct Reservations { + nonces: HashMap, + pool: CpuPool, +} +impl Reservations { + /// A maximal number of reserved nonces in the hashmap + /// before we start clearing the unused ones. + const CLEAN_AT: usize = 512; + + /// Create new nonces manager and spawn a single-threaded cpu pool + /// for progressing execution of dropped nonces. + pub fn new() -> Self { + Self::with_pool(CpuPool::new(1)) + } + + /// Create new nonces manager with given cpupool. + pub fn with_pool(pool: CpuPool) -> Self { + Reservations { + nonces: Default::default(), + pool, + } + } + + /// Reserve a nonce for particular address. + /// + /// The reserved nonce cannot be smaller than the minimal nonce. + pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved { + if self.nonces.len() + 1 > Self::CLEAN_AT { + self.nonces.retain(|_, v| !v.is_empty()); + } + + let pool = &self.pool; + self.nonces.entry(sender) + .or_insert_with(move || SenderReservations::with_pool(pool.clone())) + .reserve_nonce(minimal) + } +} + +/// Manages currently reserved and prospective nonces. +#[derive(Debug)] +pub struct SenderReservations { + previous: Option>, + previous_ready: Arc, + pool: CpuPool, + prospective_value: U256, + dropped: Arc, +} + +impl SenderReservations { + /// Create new nonces manager and spawn a single-threaded cpu pool + /// for progressing execution of dropped nonces. + #[cfg(test)] + pub fn new() -> Self { + Self::with_pool(CpuPool::new(1)) + } + + /// Create new nonces manager with given cpu pool. + pub fn with_pool(pool: CpuPool) -> Self { + SenderReservations { + previous: None, + previous_ready: Arc::new(AtomicBool::new(true)), + pool, + prospective_value: Default::default(), + dropped: Default::default(), + } + } + + /// Reserves a prospective nonce. + /// The caller should provide a minimal nonce that needs to be reserved (taken from state/txqueue). + /// If there were any previous reserved nonces the returned future will be resolved when those are finished + /// (confirmed that the nonce were indeed used). + /// The caller can use `prospective_nonce` and perform some heavy computation anticipating + /// that the `prospective_nonce` will be equal to the one he will get. + pub fn reserve_nonce(&mut self, minimal: U256) -> Reserved { + // Update prospective value + let dropped = self.dropped.swap(0, atomic::Ordering::SeqCst); + let prospective_value = cmp::max(minimal, self.prospective_value - dropped.into()); + self.prospective_value = prospective_value + 1.into(); + + let (next, rx) = oneshot::channel(); + let next = Some(next); + let next_sent = Arc::new(AtomicBool::default()); + let pool = self.pool.clone(); + let dropped = self.dropped.clone(); + self.previous_ready = next_sent.clone(); + match mem::replace(&mut self.previous, Some(rx)) { + Some(previous) => Reserved { + previous: Either::A(previous), + next, + next_sent, + minimal, + prospective_value, + pool, + dropped, + }, + None => Reserved { + previous: Either::B(future::ok(minimal)), + next, + next_sent, + minimal, + prospective_value, + pool, + dropped, + }, + } + } + + /// Returns true if there are no reserved nonces. + pub fn is_empty(&self) -> bool { + self.previous_ready.load(atomic::Ordering::SeqCst) + } +} + +/// Represents a future nonce. +#[derive(Debug)] +pub struct Reserved { + previous: Either< + oneshot::Receiver, + future::FutureResult, + >, + next: Option>, + next_sent: Arc, + minimal: U256, + prospective_value: U256, + pool: CpuPool, + dropped: Arc, +} + +impl Reserved { + /// Returns a prospective value of the nonce. + /// NOTE: This might be different than the one we resolve to. + /// Make sure to check if both nonces match or use the latter one. + pub fn prospective_value(&self) -> &U256 { + &self.prospective_value + } +} + +impl Future for Reserved { + type Item = Ready; + type Error = (); + + fn poll(&mut self) -> Poll { + let mut value = try_ready!(self.previous.poll().map_err(|e| { + warn!("Unexpected nonce cancellation: {}", e); + })); + + if value < self.minimal { + value = self.minimal + } + let matches_prospective = value == self.prospective_value; + + Ok(Async::Ready(Ready { + value, + matches_prospective, + next: self.next.take(), + next_sent: self.next_sent.clone(), + dropped: self.dropped.clone(), + })) + } +} + +impl Drop for Reserved { + fn drop(&mut self) { + if let Some(next) = self.next.take() { + let next_sent = self.next_sent.clone(); + self.dropped.fetch_add(1, atomic::Ordering::SeqCst); + // If Reserved is dropped just pipe previous and next together. + let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default()))); + self.pool.spawn(previous.map(move |nonce| { + next_sent.store(true, atomic::Ordering::SeqCst); + next.send(nonce).expect(Ready::RECV_PROOF) + })).forget() + } + } +} + +/// Represents a valid reserved nonce. +/// This can be used to dispatch the transaction. +/// +/// After this nonce is used it should be marked as such +/// using `mark_used` method. +#[derive(Debug)] +pub struct Ready { + value: U256, + matches_prospective: bool, + next: Option>, + next_sent: Arc, + dropped: Arc, +} + +impl Ready { + const RECV_PROOF: &'static str = "Receiver never dropped."; + + /// Returns a value of the nonce. + pub fn value(&self) -> &U256 { + &self.value + } + + /// Returns true if current value matches the prospective nonce. + pub fn matches_prospective(&self) -> bool { + self.matches_prospective + } + + /// Marks this nonce as used. + /// Make sure to call that method after this nonce has been consumed. + pub fn mark_used(mut self) { + let next = self.next.take().expect("Nonce can be marked as used only once; qed"); + self.next_sent.store(true, atomic::Ordering::SeqCst); + next.send(self.value + 1.into()).expect(Self::RECV_PROOF); + } +} + +impl Drop for Ready { + fn drop(&mut self) { + if let Some(next) = self.next.take() { + self.dropped.fetch_add(1, atomic::Ordering::SeqCst); + self.next_sent.store(true, atomic::Ordering::SeqCst); + next.send(self.value).expect(Self::RECV_PROOF); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_reserve_a_set_of_nonces_and_resolve_them() { + let mut nonces = SenderReservations::new(); + + assert!(nonces.is_empty()); + let n1 = nonces.reserve_nonce(5.into()); + let n2 = nonces.reserve_nonce(5.into()); + let n3 = nonces.reserve_nonce(5.into()); + let n4 = nonces.reserve_nonce(5.into()); + assert!(!nonces.is_empty()); + + // Check first nonce + let r = n1.wait().unwrap(); + assert_eq!(r.value(), &U256::from(5)); + assert!(r.matches_prospective()); + r.mark_used(); + + // Drop second nonce + drop(n2); + + // Drop third without marking as used + let r = n3.wait().unwrap(); + drop(r); + + // Last nonce should be resolved to 6 + let r = n4.wait().unwrap(); + assert_eq!(r.value(), &U256::from(6)); + assert!(!r.matches_prospective()); + r.mark_used(); + + // Next nonce should be immediately available. + let n5 = nonces.reserve_nonce(5.into()); + let r = n5.wait().unwrap(); + assert_eq!(r.value(), &U256::from(7)); + assert!(r.matches_prospective()); + r.mark_used(); + + // Should use start number if it's greater + let n6 = nonces.reserve_nonce(10.into()); + let r = n6.wait().unwrap(); + assert_eq!(r.value(), &U256::from(10)); + assert!(r.matches_prospective()); + r.mark_used(); + + assert!(nonces.is_empty()); + } + + #[test] + fn should_return_prospective_nonce() { + let mut nonces = SenderReservations::new(); + + let n1 = nonces.reserve_nonce(5.into()); + let n2 = nonces.reserve_nonce(5.into()); + + assert_eq!(n1.prospective_value(), &U256::from(5)); + assert_eq!(n2.prospective_value(), &U256::from(6)); + } +} diff --git a/rpc/src/v1/helpers/requests.rs b/rpc/src/v1/helpers/requests.rs index 83abf9353..b7d558033 100644 --- a/rpc/src/v1/helpers/requests.rs +++ b/rpc/src/v1/helpers/requests.rs @@ -15,8 +15,9 @@ // along with Parity. If not, see . use bigint::prelude::U256; -use util::Address; use bytes::Bytes; +use util::Address; + use v1::types::{Origin, TransactionCondition}; /// Transaction request coming from RPC diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 7c16c5a8a..48ac617c0 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -45,7 +45,7 @@ use jsonrpc_core::futures::future; use jsonrpc_macros::Trailing; use v1::helpers::{errors, limit_logs, fake_sign}; -use v1::helpers::dispatch::{Dispatcher, FullDispatcher, default_gas_price}; +use v1::helpers::dispatch::{FullDispatcher, default_gas_price}; use v1::helpers::block_import::is_major_importing; use v1::helpers::accounts::unwrap_provider; use v1::traits::Eth; @@ -610,8 +610,11 @@ impl Eth for EthClient where .map_err(errors::rlp) .and_then(|tx| SignedTransaction::new(tx).map_err(errors::transaction)) .and_then(|signed_transaction| { - FullDispatcher::new(self.client.clone(), self.miner.clone()) - .dispatch_transaction(signed_transaction.into()) + FullDispatcher::dispatch_transaction( + &*self.client, + &*self.miner, + signed_transaction.into(), + ) }) .map(Into::into) } diff --git a/rpc/src/v1/impls/signing.rs b/rpc/src/v1/impls/signing.rs index 0f1217069..a90c8e01d 100644 --- a/rpc/src/v1/impls/signing.rs +++ b/rpc/src/v1/impls/signing.rs @@ -119,7 +119,7 @@ impl SigningQueueClient { Box::new(dispatch::from_rpc(payload, default_account, &dispatcher) .and_then(move |payload| { let sender = payload.sender(); - if accounts.is_unlocked(sender) { + if accounts.is_unlocked(&sender) { Either::A(dispatch::execute(dispatcher, accounts, payload, dispatch::SignWith::Nothing) .map(|v| v.into_value()) .map(DispatchResult::Value)) diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index c7f1f2dab..dde8008c0 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -19,25 +19,27 @@ use std::env; use std::sync::Arc; use std::time::Duration; -use ethcore::client::{BlockChainClient, Client, ClientConfig}; -use ethcore::ids::BlockId; -use ethcore::spec::{Genesis, Spec}; -use ethcore::block::Block; -use ethcore::views::BlockView; -use ethcore::ethereum; -use ethcore::miner::{MinerOptions, Banning, GasPricer, MinerService, ExternalMiner, Miner, PendingSet, PrioritizationStrategy, GasLimit}; +use bigint::hash::H256; +use bigint::prelude::U256; use ethcore::account_provider::AccountProvider; +use ethcore::block::Block; +use ethcore::client::{BlockChainClient, Client, ClientConfig}; +use ethcore::ethereum; +use ethcore::ids::BlockId; +use ethcore::miner::{MinerOptions, Banning, GasPricer, MinerService, ExternalMiner, Miner, PendingSet, PrioritizationStrategy, GasLimit}; +use ethcore::spec::{Genesis, Spec}; +use ethcore::views::BlockView; use ethjson::blockchain::BlockChain; use ethjson::state::test::ForkSpec; use io::IoChannel; -use bigint::prelude::U256; -use bigint::hash::H256; -use util::Address; use kvdb_memorydb; +use parking_lot::Mutex; +use util::Address; use jsonrpc_core::IoHandler; -use v1::impls::{EthClient, SigningUnsafeClient}; use v1::helpers::dispatch::FullDispatcher; +use v1::helpers::nonce; +use v1::impls::{EthClient, SigningUnsafeClient}; use v1::metadata::Metadata; use v1::tests::helpers::{TestSnapshotService, TestSyncProvider, Config}; use v1::traits::eth::Eth; @@ -148,7 +150,9 @@ impl EthTester { Default::default(), ); - let dispatcher = FullDispatcher::new(client.clone(), miner_service.clone()); + let reservations = Arc::new(Mutex::new(nonce::Reservations::new())); + + let dispatcher = FullDispatcher::new(client.clone(), miner_service.clone(), reservations); let eth_sign = SigningUnsafeClient::new( &opt_account_provider, dispatcher, diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index 8bd97108c..5dde34c7c 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -37,6 +37,7 @@ use ethsync::SyncState; use jsonrpc_core::IoHandler; use v1::{Eth, EthClient, EthClientOptions, EthFilter, EthFilterClient, EthSigning, SigningUnsafeClient}; +use v1::helpers::nonce; use v1::helpers::dispatch::FullDispatcher; use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService, TestSnapshotService}; use v1::metadata::Metadata; @@ -94,8 +95,9 @@ impl EthTester { let external_miner = Arc::new(ExternalMiner::new(hashrates.clone())); let eth = EthClient::new(&client, &snapshot, &sync, &opt_ap, &miner, &external_miner, options).to_delegate(); let filter = EthFilterClient::new(client.clone(), miner.clone()).to_delegate(); + let reservations = Arc::new(Mutex::new(nonce::Reservations::new())); - let dispatcher = FullDispatcher::new(client.clone(), miner.clone()); + let dispatcher = FullDispatcher::new(client.clone(), miner.clone(), reservations); let sign = SigningUnsafeClient::new(&opt_ap, dispatcher).to_delegate(); let mut io: IoHandler = IoHandler::default(); io.extend_with(eth); diff --git a/rpc/src/v1/tests/mocked/personal.rs b/rpc/src/v1/tests/mocked/personal.rs index d34d734c0..447cee59a 100644 --- a/rpc/src/v1/tests/mocked/personal.rs +++ b/rpc/src/v1/tests/mocked/personal.rs @@ -17,14 +17,16 @@ use std::sync::Arc; use std::str::FromStr; +use bigint::prelude::U256; use ethcore::account_provider::AccountProvider; use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Action, Transaction}; use jsonrpc_core::IoHandler; -use bigint::prelude::U256; +use parking_lot::Mutex; use util::Address; use v1::{PersonalClient, Personal, Metadata}; +use v1::helpers::nonce; use v1::helpers::dispatch::FullDispatcher; use v1::tests::helpers::TestMinerService; @@ -52,8 +54,9 @@ fn setup() -> PersonalTester { let opt_accounts = Some(accounts.clone()); let client = blockchain_client(); let miner = miner_service(); + let reservations = Arc::new(Mutex::new(nonce::Reservations::new())); - let dispatcher = FullDispatcher::new(client, miner.clone()); + let dispatcher = FullDispatcher::new(client, miner.clone(), reservations); let personal = PersonalClient::new(&opt_accounts, dispatcher, false); let mut io = IoHandler::default(); diff --git a/rpc/src/v1/tests/mocked/signer.rs b/rpc/src/v1/tests/mocked/signer.rs index af84d336a..3274ec44f 100644 --- a/rpc/src/v1/tests/mocked/signer.rs +++ b/rpc/src/v1/tests/mocked/signer.rs @@ -24,6 +24,7 @@ use ethcore::account_provider::AccountProvider; use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Transaction, Action, SignedTransaction}; use parity_reactor::EventLoop; +use parking_lot::Mutex; use rlp::encode; use serde_json; @@ -32,7 +33,7 @@ use v1::{SignerClient, Signer, Origin}; use v1::metadata::Metadata; use v1::tests::helpers::TestMinerService; use v1::types::{Bytes as RpcBytes, H520}; -use v1::helpers::{SigningQueue, SignerService, FilledTransactionRequest, ConfirmationPayload}; +use v1::helpers::{nonce, SigningQueue, SignerService, FilledTransactionRequest, ConfirmationPayload}; use v1::helpers::dispatch::{FullDispatcher, eth_data_hash}; struct SignerTester { @@ -61,9 +62,10 @@ fn signer_tester() -> SignerTester { let opt_accounts = Some(accounts.clone()); let client = blockchain_client(); let miner = miner_service(); + let reservations = Arc::new(Mutex::new(nonce::Reservations::new())); let event_loop = EventLoop::spawn(); - let dispatcher = FullDispatcher::new(client, miner.clone()); + let dispatcher = FullDispatcher::new(client, miner.clone(), reservations); let mut io = IoHandler::default(); io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer, event_loop.remote()).to_delegate()); diff --git a/rpc/src/v1/tests/mocked/signing.rs b/rpc/src/v1/tests/mocked/signing.rs index 20400db52..cc470aad8 100644 --- a/rpc/src/v1/tests/mocked/signing.rs +++ b/rpc/src/v1/tests/mocked/signing.rs @@ -24,7 +24,7 @@ use jsonrpc_core::futures::Future; use v1::impls::SigningQueueClient; use v1::metadata::Metadata; use v1::traits::{EthSigning, ParitySigning, Parity}; -use v1::helpers::{SignerService, SigningQueue, FullDispatcher}; +use v1::helpers::{nonce, SignerService, SigningQueue, FullDispatcher}; use v1::types::{ConfirmationResponse, RichRawTransaction}; use v1::tests::helpers::TestMinerService; use v1::tests::mocked::parity; @@ -38,6 +38,7 @@ use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Transaction, Action, SignedTransaction}; use ethstore::ethkey::{Generator, Random}; use serde_json; +use parking_lot::Mutex; struct SigningTester { pub signer: Arc, @@ -54,9 +55,10 @@ impl Default for SigningTester { let miner = Arc::new(TestMinerService::default()); let accounts = Arc::new(AccountProvider::transient_provider()); let opt_accounts = Some(accounts.clone()); + let reservations = Arc::new(Mutex::new(nonce::Reservations::new())); let mut io = IoHandler::default(); - let dispatcher = FullDispatcher::new(client.clone(), miner.clone()); + let dispatcher = FullDispatcher::new(client.clone(), miner.clone(), reservations); let rpc = SigningQueueClient::new(&signer, dispatcher.clone(), &opt_accounts); io.extend_with(EthSigning::to_delegate(rpc));