From 4c8780f1886ca01346501a25a34e70385b7ee3c2 Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Thu, 9 Nov 2017 19:49:34 +0100 Subject: [PATCH 1/4] Use nonce reservation per address --- parity/rpc_apis.rs | 13 ++++++------- rpc/src/v1/helpers/dispatch.rs | 30 ++++++++++++++++++++++++------ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index be9dbdeb1..6c48adb69 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -239,10 +239,10 @@ impl FullDependencies { use parity_rpc::v1::*; macro_rules! add_signing_methods { - ($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { + ($namespace:ident, $handler:expr, $deps:expr) => { { let deps = &$deps; - let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), $nonces); + let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool()); if deps.signer_service.is_enabled() { $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) } else { @@ -252,11 +252,10 @@ impl FullDependencies { } } - let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))); let dispatcher = FullDispatcher::new( self.client.clone(), self.miner.clone(), - nonces.clone(), + self.fetch.pool(), ); for api in apis { match *api { @@ -286,7 +285,7 @@ impl FullDependencies { let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); handler.extend_with(filter_client.to_delegate()); - add_signing_methods!(EthSigning, handler, self, nonces.clone()); + add_signing_methods!(EthSigning, handler, self); } }, Api::EthPubSub => { @@ -323,7 +322,7 @@ impl FullDependencies { ).to_delegate()); if !for_generic_pubsub { - add_signing_methods!(ParitySigning, handler, self, nonces.clone()); + add_signing_methods!(ParitySigning, handler, self); } }, Api::ParityPubSub => { @@ -440,7 +439,7 @@ impl LightDependencies { self.on_demand.clone(), self.cache.clone(), self.transaction_queue.clone(), - Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))), + self.fetch.pool(), ); macro_rules! add_signing_methods { diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index c556226b5..9220790e8 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -19,6 +19,7 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; +use std::collections::HashMap; use light::cache::Cache as LightDataCache; use light::client::LightChainClient; @@ -32,6 +33,7 @@ use util::Address; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; use stats::Corpus; +use futures_cpupool::CpuPool; use ethkey::Signature; use ethsync::LightSync; @@ -87,16 +89,20 @@ pub trait Dispatcher: Send + Sync + Clone { pub struct FullDispatcher { client: Arc, miner: Arc, - nonces: Arc>, + nonces: Arc>>, + pool: CpuPool, } impl FullDispatcher { /// Create a `FullDispatcher` from Arc references to a client and miner. - pub fn new(client: Arc, miner: Arc, nonces: Arc>) -> Self { + pub fn new(client: Arc, miner: Arc, pool: CpuPool) -> Self { + let nonces = Arc::new(Mutex::new(HashMap::new())); + FullDispatcher { client, miner, nonces, + pool, } } } @@ -107,6 +113,7 @@ impl Clone for FullDispatcher { client: self.client.clone(), miner: self.miner.clone(), nonces: self.nonces.clone(), + pool: self.pool.clone(), } } } @@ -162,7 +169,10 @@ impl Dispatcher for FullDispatcher>, /// Nonce reservations - pub nonces: Arc>, + pub nonces: Arc>>, + /// Cpu pool + pub pool: CpuPool, } impl LightDispatcher { @@ -265,8 +277,10 @@ impl LightDispatcher { on_demand: Arc, cache: Arc>, transaction_queue: Arc>, - nonces: Arc>, + pool: CpuPool, ) -> Self { + let nonces = Arc::new(Mutex::new(HashMap::new())); + LightDispatcher { sync, client, @@ -274,6 +288,7 @@ impl LightDispatcher { cache, transaction_queue, nonces, + pool, } } @@ -379,10 +394,13 @@ impl Dispatcher for LightDispatcher { } let nonces = self.nonces.clone(); + let pool = self.pool.clone(); Box::new(self.next_nonce(filled.from) .map_err(|_| errors::no_light_peers()) .and_then(move |nonce| { - let reserved = nonces.lock().reserve_nonce(nonce); + let reserved = nonces.lock().entry(filled.from) + .or_insert(nonce::Reservations::with_pool(pool)) + .reserve_nonce(nonce); ProspectiveSigner::new(accounts, filled, chain_id, reserved, password) })) } From 15c97336a432e3fbcb899455837feef41442b5db Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Fri, 10 Nov 2017 17:11:04 +0100 Subject: [PATCH 2/4] Create hashmap in RPC Apis --- parity/rpc_apis.rs | 14 ++++++++------ rpc/src/v1/helpers/dispatch.rs | 12 +++++++----- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 6c48adb69..281b97589 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -15,8 +15,7 @@ // along with Parity. If not, see . use std::cmp::PartialEq; -use std::collections::BTreeMap; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet, HashMap}; use std::str::FromStr; use std::sync::{Arc, Weak}; @@ -239,10 +238,10 @@ impl FullDependencies { use parity_rpc::v1::*; macro_rules! add_signing_methods { - ($namespace:ident, $handler:expr, $deps:expr) => { + ($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { { let deps = &$deps; - let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool()); + let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool(), $nonces); if deps.signer_service.is_enabled() { $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) } else { @@ -252,10 +251,12 @@ impl FullDependencies { } } + let nonces = Arc::new(Mutex::new(HashMap::new())); let dispatcher = FullDispatcher::new( self.client.clone(), self.miner.clone(), self.fetch.pool(), + nonces.clone(), ); for api in apis { match *api { @@ -285,7 +286,7 @@ impl FullDependencies { let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); handler.extend_with(filter_client.to_delegate()); - add_signing_methods!(EthSigning, handler, self); + add_signing_methods!(EthSigning, handler, self, nonces.clone()); } }, Api::EthPubSub => { @@ -322,7 +323,7 @@ impl FullDependencies { ).to_delegate()); if !for_generic_pubsub { - add_signing_methods!(ParitySigning, handler, self); + add_signing_methods!(ParitySigning, handler, self, nonces.clone()); } }, Api::ParityPubSub => { @@ -440,6 +441,7 @@ impl LightDependencies { self.cache.clone(), self.transaction_queue.clone(), self.fetch.pool(), + Arc::new(Mutex::new(HashMap::new())), ); macro_rules! add_signing_methods { diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index 9220790e8..f17797619 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -95,9 +95,12 @@ pub struct FullDispatcher { impl FullDispatcher { /// Create a `FullDispatcher` from Arc references to a client and miner. - pub fn new(client: Arc, miner: Arc, pool: CpuPool) -> Self { - let nonces = Arc::new(Mutex::new(HashMap::new())); - + pub fn new( + client: Arc, + miner: Arc, + pool: CpuPool, + nonces: Arc>>, + ) -> Self { FullDispatcher { client, miner, @@ -278,9 +281,8 @@ impl LightDispatcher { cache: Arc>, transaction_queue: Arc>, pool: CpuPool, + nonces: Arc>>, ) -> Self { - let nonces = Arc::new(Mutex::new(HashMap::new())); - LightDispatcher { sync, client, From 72907da2aed962715c6efc5e98162e0d11500089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sun, 12 Nov 2017 12:36:41 +0100 Subject: [PATCH 3/4] Garbage collect hashmap entries. --- parity/rpc_apis.rs | 10 ++-- rpc/src/v1/helpers/dispatch.rs | 28 +++------- rpc/src/v1/helpers/nonce.rs | 95 +++++++++++++++++++++++++++++----- 3 files changed, 93 insertions(+), 40 deletions(-) diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 281b97589..a5bc1e154 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::cmp::PartialEq; -use std::collections::{BTreeMap, HashSet, HashMap}; +use std::collections::{BTreeMap, HashSet}; use std::str::FromStr; use std::sync::{Arc, Weak}; @@ -241,7 +241,7 @@ impl FullDependencies { ($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { { let deps = &$deps; - let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool(), $nonces); + let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), $nonces); if deps.signer_service.is_enabled() { $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) } else { @@ -251,11 +251,10 @@ impl FullDependencies { } } - let nonces = Arc::new(Mutex::new(HashMap::new())); + let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))); let dispatcher = FullDispatcher::new( self.client.clone(), self.miner.clone(), - self.fetch.pool(), nonces.clone(), ); for api in apis { @@ -440,8 +439,7 @@ impl LightDependencies { self.on_demand.clone(), self.cache.clone(), self.transaction_queue.clone(), - self.fetch.pool(), - Arc::new(Mutex::new(HashMap::new())), + Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))), ); macro_rules! add_signing_methods { diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index f17797619..35e16f573 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -19,7 +19,6 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; -use std::collections::HashMap; use light::cache::Cache as LightDataCache; use light::client::LightChainClient; @@ -33,7 +32,6 @@ use util::Address; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; use stats::Corpus; -use futures_cpupool::CpuPool; use ethkey::Signature; use ethsync::LightSync; @@ -89,8 +87,7 @@ pub trait Dispatcher: Send + Sync + Clone { pub struct FullDispatcher { client: Arc, miner: Arc, - nonces: Arc>>, - pool: CpuPool, + nonces: Arc>, } impl FullDispatcher { @@ -98,14 +95,12 @@ impl FullDispatcher { pub fn new( client: Arc, miner: Arc, - pool: CpuPool, - nonces: Arc>>, + nonces: Arc>, ) -> Self { FullDispatcher { client, miner, nonces, - pool, } } } @@ -116,7 +111,6 @@ impl Clone for FullDispatcher { client: self.client.clone(), miner: self.miner.clone(), nonces: self.nonces.clone(), - pool: self.pool.clone(), } } } @@ -172,9 +166,7 @@ impl Dispatcher for FullDispatcher>, /// Nonce reservations - pub nonces: Arc>>, - /// Cpu pool - pub pool: CpuPool, + pub nonces: Arc>, } impl LightDispatcher { @@ -280,8 +270,7 @@ impl LightDispatcher { on_demand: Arc, cache: Arc>, transaction_queue: Arc>, - pool: CpuPool, - nonces: Arc>>, + nonces: Arc>, ) -> Self { LightDispatcher { sync, @@ -290,7 +279,6 @@ impl LightDispatcher { cache, transaction_queue, nonces, - pool, } } @@ -396,13 +384,11 @@ impl Dispatcher for LightDispatcher { } let nonces = self.nonces.clone(); - let pool = self.pool.clone(); Box::new(self.next_nonce(filled.from) .map_err(|_| errors::no_light_peers()) .and_then(move |nonce| { - let reserved = nonces.lock().entry(filled.from) - .or_insert(nonce::Reservations::with_pool(pool)) - .reserve_nonce(nonce); + let reserved = nonces.lock().reserve(filled.from, nonce); + ProspectiveSigner::new(accounts, filled, chain_id, reserved, password) })) } diff --git a/rpc/src/v1/helpers/nonce.rs b/rpc/src/v1/helpers/nonce.rs index a048d9a87..2b4df49ae 100644 --- a/rpc/src/v1/helpers/nonce.rs +++ b/rpc/src/v1/helpers/nonce.rs @@ -15,35 +15,84 @@ // along with Parity. If not, see . use std::{cmp, mem}; +use std::collections::HashMap; use std::sync::{atomic, Arc}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use bigint::prelude::U256; use futures::{Future, future, Poll, Async}; use futures::future::Either; use futures::sync::oneshot; use futures_cpupool::CpuPool; +use util::Address; -/// Manages currently reserved and prospective nonces. +/// Manages currently reserved and prospective nonces +/// for multiple senders. #[derive(Debug)] pub struct Reservations { - previous: Option>, + nonces: HashMap, pool: CpuPool, - prospective_value: U256, - dropped: Arc, } - impl Reservations { + /// A maximal number of reserved nonces in the hashmap + /// before we start clearing the unused ones. + const CLEAN_AT: usize = 512; + /// Create new nonces manager and spawn a single-threaded cpu pool /// for progressing execution of dropped nonces. pub fn new() -> Self { Self::with_pool(CpuPool::new(1)) } - /// Create new nonces manager with given cpu pool. + /// Create new nonces manager with given cpupool. pub fn with_pool(pool: CpuPool) -> Self { Reservations { + nonces: Default::default(), + pool, + } + } + + /// Reserve a nonce for particular address. + /// + /// The reserved nonce cannot be smaller than the minimal nonce. + pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved { + if self.nonces.len() + 1 > Self::CLEAN_AT { + let to_remove = self.nonces.iter().filter(|&(_, v)| v.is_empty()).map(|(k, _)| *k).collect::>(); + for address in to_remove { + self.nonces.remove(&address); + } + } + + let pool = &self.pool; + self.nonces.entry(sender) + .or_insert_with(move || SenderReservations::with_pool(pool.clone())) + .reserve_nonce(minimal) + } +} + +/// Manages currently reserved and prospective nonces. +#[derive(Debug)] +pub struct SenderReservations { + previous: Option>, + previous_ready: Arc, + pool: CpuPool, + prospective_value: U256, + dropped: Arc, +} + +impl SenderReservations { + /// Create new nonces manager and spawn a single-threaded cpu pool + /// for progressing execution of dropped nonces. + #[cfg(test)] + pub fn new() -> Self { + Self::with_pool(CpuPool::new(1)) + } + + /// Create new nonces manager with given cpu pool. + pub fn with_pool(pool: CpuPool) -> Self { + SenderReservations { previous: None, + previous_ready: Arc::new(AtomicBool::new(true)), pool, prospective_value: Default::default(), dropped: Default::default(), @@ -64,12 +113,15 @@ impl Reservations { let (next, rx) = oneshot::channel(); let next = Some(next); + let next_sent = Arc::new(AtomicBool::default()); let pool = self.pool.clone(); let dropped = self.dropped.clone(); + self.previous_ready = next_sent.clone(); match mem::replace(&mut self.previous, Some(rx)) { Some(previous) => Reserved { previous: Either::A(previous), next, + next_sent, minimal, prospective_value, pool, @@ -78,6 +130,7 @@ impl Reservations { None => Reserved { previous: Either::B(future::ok(minimal)), next, + next_sent, minimal, prospective_value, pool, @@ -85,6 +138,11 @@ impl Reservations { }, } } + + /// Returns true if there are no reserved nonces. + pub fn is_empty(&self) -> bool { + self.previous_ready.load(atomic::Ordering::SeqCst) + } } /// Represents a future nonce. @@ -92,9 +150,10 @@ impl Reservations { pub struct Reserved { previous: Either< oneshot::Receiver, - future::FutureResult + future::FutureResult, >, next: Option>, + next_sent: Arc, minimal: U256, prospective_value: U256, pool: CpuPool, @@ -128,6 +187,7 @@ impl Future for Reserved { value, matches_prospective, next: self.next.take(), + next_sent: self.next_sent.clone(), dropped: self.dropped.clone(), })) } @@ -136,10 +196,12 @@ impl Future for Reserved { impl Drop for Reserved { fn drop(&mut self) { if let Some(next) = self.next.take() { + let next_sent = self.next_sent.clone(); self.dropped.fetch_add(1, atomic::Ordering::SeqCst); // If Reserved is dropped just pipe previous and next together. let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default()))); - self.pool.spawn(previous.map(|nonce| { + self.pool.spawn(previous.map(move |nonce| { + next_sent.store(true, atomic::Ordering::SeqCst); next.send(nonce).expect(Ready::RECV_PROOF) })).forget() } @@ -156,6 +218,7 @@ pub struct Ready { value: U256, matches_prospective: bool, next: Option>, + next_sent: Arc, dropped: Arc, } @@ -176,15 +239,17 @@ impl Ready { /// Make sure to call that method after this nonce has been consumed. pub fn mark_used(mut self) { let next = self.next.take().expect("Nonce can be marked as used only once; qed"); + self.next_sent.store(true, atomic::Ordering::SeqCst); next.send(self.value + 1.into()).expect(Self::RECV_PROOF); } } impl Drop for Ready { fn drop(&mut self) { - if let Some(send) = self.next.take() { + if let Some(next) = self.next.take() { self.dropped.fetch_add(1, atomic::Ordering::SeqCst); - send.send(self.value).expect(Self::RECV_PROOF); + self.next_sent.store(true, atomic::Ordering::SeqCst); + next.send(self.value).expect(Self::RECV_PROOF); } } } @@ -195,12 +260,14 @@ mod tests { #[test] fn should_reserve_a_set_of_nonces_and_resolve_them() { - let mut nonces = Reservations::new(); + let mut nonces = SenderReservations::new(); + assert!(nonces.is_empty()); let n1 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into()); let n3 = nonces.reserve_nonce(5.into()); let n4 = nonces.reserve_nonce(5.into()); + assert!(!nonces.is_empty()); // Check first nonce let r = n1.wait().unwrap(); @@ -234,11 +301,13 @@ mod tests { assert_eq!(r.value(), &U256::from(10)); assert!(r.matches_prospective()); r.mark_used(); + + assert!(nonces.is_empty()); } #[test] fn should_return_prospective_nonce() { - let mut nonces = Reservations::new(); + let mut nonces = SenderReservations::new(); let n1 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into()); From 8b85f648ca6e02f77fd9370e54904e860f4b006f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 13 Nov 2017 17:09:30 +0100 Subject: [PATCH 4/4] HashMap::retain --- rpc/src/v1/helpers/nonce.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rpc/src/v1/helpers/nonce.rs b/rpc/src/v1/helpers/nonce.rs index 2b4df49ae..30ad15211 100644 --- a/rpc/src/v1/helpers/nonce.rs +++ b/rpc/src/v1/helpers/nonce.rs @@ -57,10 +57,7 @@ impl Reservations { /// The reserved nonce cannot be smaller than the minimal nonce. pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved { if self.nonces.len() + 1 > Self::CLEAN_AT { - let to_remove = self.nonces.iter().filter(|&(_, v)| v.is_empty()).map(|(k, _)| *k).collect::>(); - for address in to_remove { - self.nonces.remove(&address); - } + self.nonces.retain(|_, v| !v.is_empty()); } let pool = &self.pool;