Use nonce reservation per address

This commit is contained in:
Nicolas Gotchac 2017-11-09 19:49:34 +01:00
parent ffee6aacff
commit 4c8780f188
2 changed files with 30 additions and 13 deletions

View File

@ -239,10 +239,10 @@ impl FullDependencies {
use parity_rpc::v1::*; use parity_rpc::v1::*;
macro_rules! add_signing_methods { macro_rules! add_signing_methods {
($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { ($namespace:ident, $handler:expr, $deps:expr) => {
{ {
let deps = &$deps; let deps = &$deps;
let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), $nonces); let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool());
if deps.signer_service.is_enabled() { if deps.signer_service.is_enabled() {
$handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store)))
} else { } else {
@ -252,11 +252,10 @@ impl FullDependencies {
} }
} }
let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool())));
let dispatcher = FullDispatcher::new( let dispatcher = FullDispatcher::new(
self.client.clone(), self.client.clone(),
self.miner.clone(), self.miner.clone(),
nonces.clone(), self.fetch.pool(),
); );
for api in apis { for api in apis {
match *api { match *api {
@ -286,7 +285,7 @@ impl FullDependencies {
let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone());
handler.extend_with(filter_client.to_delegate()); handler.extend_with(filter_client.to_delegate());
add_signing_methods!(EthSigning, handler, self, nonces.clone()); add_signing_methods!(EthSigning, handler, self);
} }
}, },
Api::EthPubSub => { Api::EthPubSub => {
@ -323,7 +322,7 @@ impl FullDependencies {
).to_delegate()); ).to_delegate());
if !for_generic_pubsub { if !for_generic_pubsub {
add_signing_methods!(ParitySigning, handler, self, nonces.clone()); add_signing_methods!(ParitySigning, handler, self);
} }
}, },
Api::ParityPubSub => { Api::ParityPubSub => {
@ -440,7 +439,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
self.on_demand.clone(), self.on_demand.clone(),
self.cache.clone(), self.cache.clone(),
self.transaction_queue.clone(), self.transaction_queue.clone(),
Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))), self.fetch.pool(),
); );
macro_rules! add_signing_methods { macro_rules! add_signing_methods {

View File

@ -19,6 +19,7 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
use light::cache::Cache as LightDataCache; use light::cache::Cache as LightDataCache;
use light::client::LightChainClient; use light::client::LightChainClient;
@ -32,6 +33,7 @@ use util::Address;
use bytes::Bytes; use bytes::Bytes;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use stats::Corpus; use stats::Corpus;
use futures_cpupool::CpuPool;
use ethkey::Signature; use ethkey::Signature;
use ethsync::LightSync; use ethsync::LightSync;
@ -87,16 +89,20 @@ pub trait Dispatcher: Send + Sync + Clone {
pub struct FullDispatcher<C, M> { pub struct FullDispatcher<C, M> {
client: Arc<C>, client: Arc<C>,
miner: Arc<M>, miner: Arc<M>,
nonces: Arc<Mutex<nonce::Reservations>>, nonces: Arc<Mutex<HashMap<Address, nonce::Reservations>>>,
pool: CpuPool,
} }
impl<C, M> FullDispatcher<C, M> { impl<C, M> FullDispatcher<C, M> {
/// Create a `FullDispatcher` from Arc references to a client and miner. /// Create a `FullDispatcher` from Arc references to a client and miner.
pub fn new(client: Arc<C>, miner: Arc<M>, nonces: Arc<Mutex<nonce::Reservations>>) -> Self { pub fn new(client: Arc<C>, miner: Arc<M>, pool: CpuPool) -> Self {
let nonces = Arc::new(Mutex::new(HashMap::new()));
FullDispatcher { FullDispatcher {
client, client,
miner, miner,
nonces, nonces,
pool,
} }
} }
} }
@ -107,6 +113,7 @@ impl<C, M> Clone for FullDispatcher<C, M> {
client: self.client.clone(), client: self.client.clone(),
miner: self.miner.clone(), miner: self.miner.clone(),
nonces: self.nonces.clone(), nonces: self.nonces.clone(),
pool: self.pool.clone(),
} }
} }
} }
@ -162,7 +169,10 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
} }
let state = self.state_nonce(&filled.from); let state = self.state_nonce(&filled.from);
let reserved = self.nonces.lock().reserve_nonce(state); let reserved = self.nonces.lock().entry(filled.from)
.or_insert(nonce::Reservations::with_pool(self.pool.clone()))
.reserve_nonce(state);
Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password)) Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password))
} }
@ -252,7 +262,9 @@ pub struct LightDispatcher {
/// Transaction queue. /// Transaction queue.
pub transaction_queue: Arc<RwLock<LightTransactionQueue>>, pub transaction_queue: Arc<RwLock<LightTransactionQueue>>,
/// Nonce reservations /// Nonce reservations
pub nonces: Arc<Mutex<nonce::Reservations>>, pub nonces: Arc<Mutex<HashMap<Address, nonce::Reservations>>>,
/// Cpu pool
pub pool: CpuPool,
} }
impl LightDispatcher { impl LightDispatcher {
@ -265,8 +277,10 @@ impl LightDispatcher {
on_demand: Arc<OnDemand>, on_demand: Arc<OnDemand>,
cache: Arc<Mutex<LightDataCache>>, cache: Arc<Mutex<LightDataCache>>,
transaction_queue: Arc<RwLock<LightTransactionQueue>>, transaction_queue: Arc<RwLock<LightTransactionQueue>>,
nonces: Arc<Mutex<nonce::Reservations>>, pool: CpuPool,
) -> Self { ) -> Self {
let nonces = Arc::new(Mutex::new(HashMap::new()));
LightDispatcher { LightDispatcher {
sync, sync,
client, client,
@ -274,6 +288,7 @@ impl LightDispatcher {
cache, cache,
transaction_queue, transaction_queue,
nonces, nonces,
pool,
} }
} }
@ -379,10 +394,13 @@ impl Dispatcher for LightDispatcher {
} }
let nonces = self.nonces.clone(); let nonces = self.nonces.clone();
let pool = self.pool.clone();
Box::new(self.next_nonce(filled.from) Box::new(self.next_nonce(filled.from)
.map_err(|_| errors::no_light_peers()) .map_err(|_| errors::no_light_peers())
.and_then(move |nonce| { .and_then(move |nonce| {
let reserved = nonces.lock().reserve_nonce(nonce); let reserved = nonces.lock().entry(filled.from)
.or_insert(nonce::Reservations::with_pool(pool))
.reserve_nonce(nonce);
ProspectiveSigner::new(accounts, filled, chain_id, reserved, password) ProspectiveSigner::new(accounts, filled, chain_id, reserved, password)
})) }))
} }