Garbage collect hashmap entries.

This commit is contained in:
Tomasz Drwięga 2017-11-12 12:36:41 +01:00
parent 8c6b89df72
commit 72907da2ae
No known key found for this signature in database
GPG Key ID: D066F497E62CAF66
3 changed files with 93 additions and 40 deletions

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp::PartialEq; use std::cmp::PartialEq;
use std::collections::{BTreeMap, HashSet, HashMap}; use std::collections::{BTreeMap, HashSet};
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
@ -241,7 +241,7 @@ impl FullDependencies {
($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { ($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => {
{ {
let deps = &$deps; let deps = &$deps;
let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool(), $nonces); let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), $nonces);
if deps.signer_service.is_enabled() { if deps.signer_service.is_enabled() {
$handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store)))
} else { } else {
@ -251,11 +251,10 @@ impl FullDependencies {
} }
} }
let nonces = Arc::new(Mutex::new(HashMap::new())); let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool())));
let dispatcher = FullDispatcher::new( let dispatcher = FullDispatcher::new(
self.client.clone(), self.client.clone(),
self.miner.clone(), self.miner.clone(),
self.fetch.pool(),
nonces.clone(), nonces.clone(),
); );
for api in apis { for api in apis {
@ -440,8 +439,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
self.on_demand.clone(), self.on_demand.clone(),
self.cache.clone(), self.cache.clone(),
self.transaction_queue.clone(), self.transaction_queue.clone(),
self.fetch.pool(), Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))),
Arc::new(Mutex::new(HashMap::new())),
); );
macro_rules! add_signing_methods { macro_rules! add_signing_methods {

View File

@ -19,7 +19,6 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
use light::cache::Cache as LightDataCache; use light::cache::Cache as LightDataCache;
use light::client::LightChainClient; use light::client::LightChainClient;
@ -33,7 +32,6 @@ use util::Address;
use bytes::Bytes; use bytes::Bytes;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use stats::Corpus; use stats::Corpus;
use futures_cpupool::CpuPool;
use ethkey::Signature; use ethkey::Signature;
use ethsync::LightSync; use ethsync::LightSync;
@ -89,8 +87,7 @@ pub trait Dispatcher: Send + Sync + Clone {
pub struct FullDispatcher<C, M> { pub struct FullDispatcher<C, M> {
client: Arc<C>, client: Arc<C>,
miner: Arc<M>, miner: Arc<M>,
nonces: Arc<Mutex<HashMap<Address, nonce::Reservations>>>, nonces: Arc<Mutex<nonce::Reservations>>,
pool: CpuPool,
} }
impl<C, M> FullDispatcher<C, M> { impl<C, M> FullDispatcher<C, M> {
@ -98,14 +95,12 @@ impl<C, M> FullDispatcher<C, M> {
pub fn new( pub fn new(
client: Arc<C>, client: Arc<C>,
miner: Arc<M>, miner: Arc<M>,
pool: CpuPool, nonces: Arc<Mutex<nonce::Reservations>>,
nonces: Arc<Mutex<HashMap<Address, nonce::Reservations>>>,
) -> Self { ) -> Self {
FullDispatcher { FullDispatcher {
client, client,
miner, miner,
nonces, nonces,
pool,
} }
} }
} }
@ -116,7 +111,6 @@ impl<C, M> Clone for FullDispatcher<C, M> {
client: self.client.clone(), client: self.client.clone(),
miner: self.miner.clone(), miner: self.miner.clone(),
nonces: self.nonces.clone(), nonces: self.nonces.clone(),
pool: self.pool.clone(),
} }
} }
} }
@ -172,9 +166,7 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
} }
let state = self.state_nonce(&filled.from); let state = self.state_nonce(&filled.from);
let reserved = self.nonces.lock().entry(filled.from) let reserved = self.nonces.lock().reserve(filled.from, state);
.or_insert(nonce::Reservations::with_pool(self.pool.clone()))
.reserve_nonce(state);
Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password)) Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password))
} }
@ -265,9 +257,7 @@ pub struct LightDispatcher {
/// Transaction queue. /// Transaction queue.
pub transaction_queue: Arc<RwLock<LightTransactionQueue>>, pub transaction_queue: Arc<RwLock<LightTransactionQueue>>,
/// Nonce reservations /// Nonce reservations
pub nonces: Arc<Mutex<HashMap<Address, nonce::Reservations>>>, pub nonces: Arc<Mutex<nonce::Reservations>>,
/// Cpu pool
pub pool: CpuPool,
} }
impl LightDispatcher { impl LightDispatcher {
@ -280,8 +270,7 @@ impl LightDispatcher {
on_demand: Arc<OnDemand>, on_demand: Arc<OnDemand>,
cache: Arc<Mutex<LightDataCache>>, cache: Arc<Mutex<LightDataCache>>,
transaction_queue: Arc<RwLock<LightTransactionQueue>>, transaction_queue: Arc<RwLock<LightTransactionQueue>>,
pool: CpuPool, nonces: Arc<Mutex<nonce::Reservations>>,
nonces: Arc<Mutex<HashMap<Address, nonce::Reservations>>>,
) -> Self { ) -> Self {
LightDispatcher { LightDispatcher {
sync, sync,
@ -290,7 +279,6 @@ impl LightDispatcher {
cache, cache,
transaction_queue, transaction_queue,
nonces, nonces,
pool,
} }
} }
@ -396,13 +384,11 @@ impl Dispatcher for LightDispatcher {
} }
let nonces = self.nonces.clone(); let nonces = self.nonces.clone();
let pool = self.pool.clone();
Box::new(self.next_nonce(filled.from) Box::new(self.next_nonce(filled.from)
.map_err(|_| errors::no_light_peers()) .map_err(|_| errors::no_light_peers())
.and_then(move |nonce| { .and_then(move |nonce| {
let reserved = nonces.lock().entry(filled.from) let reserved = nonces.lock().reserve(filled.from, nonce);
.or_insert(nonce::Reservations::with_pool(pool))
.reserve_nonce(nonce);
ProspectiveSigner::new(accounts, filled, chain_id, reserved, password) ProspectiveSigner::new(accounts, filled, chain_id, reserved, password)
})) }))
} }

View File

@ -15,25 +15,29 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{cmp, mem}; use std::{cmp, mem};
use std::collections::HashMap;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, AtomicUsize};
use bigint::prelude::U256; use bigint::prelude::U256;
use futures::{Future, future, Poll, Async}; use futures::{Future, future, Poll, Async};
use futures::future::Either; use futures::future::Either;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use util::Address;
/// Manages currently reserved and prospective nonces. /// Manages currently reserved and prospective nonces
/// for multiple senders.
#[derive(Debug)] #[derive(Debug)]
pub struct Reservations { pub struct Reservations {
previous: Option<oneshot::Receiver<U256>>, nonces: HashMap<Address, SenderReservations>,
pool: CpuPool, pool: CpuPool,
prospective_value: U256,
dropped: Arc<AtomicUsize>,
} }
impl Reservations { impl Reservations {
/// A maximal number of reserved nonces in the hashmap
/// before we start clearing the unused ones.
const CLEAN_AT: usize = 512;
/// Create new nonces manager and spawn a single-threaded cpu pool /// Create new nonces manager and spawn a single-threaded cpu pool
/// for progressing execution of dropped nonces. /// for progressing execution of dropped nonces.
pub fn new() -> Self { pub fn new() -> Self {
@ -43,7 +47,52 @@ impl Reservations {
/// Create new nonces manager with given cpupool. /// Create new nonces manager with given cpupool.
pub fn with_pool(pool: CpuPool) -> Self { pub fn with_pool(pool: CpuPool) -> Self {
Reservations { Reservations {
nonces: Default::default(),
pool,
}
}
/// Reserve a nonce for particular address.
///
/// The reserved nonce cannot be smaller than the minimal nonce.
pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved {
if self.nonces.len() + 1 > Self::CLEAN_AT {
let to_remove = self.nonces.iter().filter(|&(_, v)| v.is_empty()).map(|(k, _)| *k).collect::<Vec<_>>();
for address in to_remove {
self.nonces.remove(&address);
}
}
let pool = &self.pool;
self.nonces.entry(sender)
.or_insert_with(move || SenderReservations::with_pool(pool.clone()))
.reserve_nonce(minimal)
}
}
/// Manages currently reserved and prospective nonces.
#[derive(Debug)]
pub struct SenderReservations {
previous: Option<oneshot::Receiver<U256>>,
previous_ready: Arc<AtomicBool>,
pool: CpuPool,
prospective_value: U256,
dropped: Arc<AtomicUsize>,
}
impl SenderReservations {
/// Create new nonces manager and spawn a single-threaded cpu pool
/// for progressing execution of dropped nonces.
#[cfg(test)]
pub fn new() -> Self {
Self::with_pool(CpuPool::new(1))
}
/// Create new nonces manager with given cpu pool.
pub fn with_pool(pool: CpuPool) -> Self {
SenderReservations {
previous: None, previous: None,
previous_ready: Arc::new(AtomicBool::new(true)),
pool, pool,
prospective_value: Default::default(), prospective_value: Default::default(),
dropped: Default::default(), dropped: Default::default(),
@ -64,12 +113,15 @@ impl Reservations {
let (next, rx) = oneshot::channel(); let (next, rx) = oneshot::channel();
let next = Some(next); let next = Some(next);
let next_sent = Arc::new(AtomicBool::default());
let pool = self.pool.clone(); let pool = self.pool.clone();
let dropped = self.dropped.clone(); let dropped = self.dropped.clone();
self.previous_ready = next_sent.clone();
match mem::replace(&mut self.previous, Some(rx)) { match mem::replace(&mut self.previous, Some(rx)) {
Some(previous) => Reserved { Some(previous) => Reserved {
previous: Either::A(previous), previous: Either::A(previous),
next, next,
next_sent,
minimal, minimal,
prospective_value, prospective_value,
pool, pool,
@ -78,6 +130,7 @@ impl Reservations {
None => Reserved { None => Reserved {
previous: Either::B(future::ok(minimal)), previous: Either::B(future::ok(minimal)),
next, next,
next_sent,
minimal, minimal,
prospective_value, prospective_value,
pool, pool,
@ -85,6 +138,11 @@ impl Reservations {
}, },
} }
} }
/// Returns true if there are no reserved nonces.
pub fn is_empty(&self) -> bool {
self.previous_ready.load(atomic::Ordering::SeqCst)
}
} }
/// Represents a future nonce. /// Represents a future nonce.
@ -92,9 +150,10 @@ impl Reservations {
pub struct Reserved { pub struct Reserved {
previous: Either< previous: Either<
oneshot::Receiver<U256>, oneshot::Receiver<U256>,
future::FutureResult<U256, oneshot::Canceled> future::FutureResult<U256, oneshot::Canceled>,
>, >,
next: Option<oneshot::Sender<U256>>, next: Option<oneshot::Sender<U256>>,
next_sent: Arc<AtomicBool>,
minimal: U256, minimal: U256,
prospective_value: U256, prospective_value: U256,
pool: CpuPool, pool: CpuPool,
@ -128,6 +187,7 @@ impl Future for Reserved {
value, value,
matches_prospective, matches_prospective,
next: self.next.take(), next: self.next.take(),
next_sent: self.next_sent.clone(),
dropped: self.dropped.clone(), dropped: self.dropped.clone(),
})) }))
} }
@ -136,10 +196,12 @@ impl Future for Reserved {
impl Drop for Reserved { impl Drop for Reserved {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(next) = self.next.take() { if let Some(next) = self.next.take() {
let next_sent = self.next_sent.clone();
self.dropped.fetch_add(1, atomic::Ordering::SeqCst); self.dropped.fetch_add(1, atomic::Ordering::SeqCst);
// If Reserved is dropped just pipe previous and next together. // If Reserved is dropped just pipe previous and next together.
let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default()))); let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default())));
self.pool.spawn(previous.map(|nonce| { self.pool.spawn(previous.map(move |nonce| {
next_sent.store(true, atomic::Ordering::SeqCst);
next.send(nonce).expect(Ready::RECV_PROOF) next.send(nonce).expect(Ready::RECV_PROOF)
})).forget() })).forget()
} }
@ -156,6 +218,7 @@ pub struct Ready {
value: U256, value: U256,
matches_prospective: bool, matches_prospective: bool,
next: Option<oneshot::Sender<U256>>, next: Option<oneshot::Sender<U256>>,
next_sent: Arc<AtomicBool>,
dropped: Arc<AtomicUsize>, dropped: Arc<AtomicUsize>,
} }
@ -176,15 +239,17 @@ impl Ready {
/// Make sure to call that method after this nonce has been consumed. /// Make sure to call that method after this nonce has been consumed.
pub fn mark_used(mut self) { pub fn mark_used(mut self) {
let next = self.next.take().expect("Nonce can be marked as used only once; qed"); let next = self.next.take().expect("Nonce can be marked as used only once; qed");
self.next_sent.store(true, atomic::Ordering::SeqCst);
next.send(self.value + 1.into()).expect(Self::RECV_PROOF); next.send(self.value + 1.into()).expect(Self::RECV_PROOF);
} }
} }
impl Drop for Ready { impl Drop for Ready {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(send) = self.next.take() { if let Some(next) = self.next.take() {
self.dropped.fetch_add(1, atomic::Ordering::SeqCst); self.dropped.fetch_add(1, atomic::Ordering::SeqCst);
send.send(self.value).expect(Self::RECV_PROOF); self.next_sent.store(true, atomic::Ordering::SeqCst);
next.send(self.value).expect(Self::RECV_PROOF);
} }
} }
} }
@ -195,12 +260,14 @@ mod tests {
#[test] #[test]
fn should_reserve_a_set_of_nonces_and_resolve_them() { fn should_reserve_a_set_of_nonces_and_resolve_them() {
let mut nonces = Reservations::new(); let mut nonces = SenderReservations::new();
assert!(nonces.is_empty());
let n1 = nonces.reserve_nonce(5.into()); let n1 = nonces.reserve_nonce(5.into());
let n2 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into());
let n3 = nonces.reserve_nonce(5.into()); let n3 = nonces.reserve_nonce(5.into());
let n4 = nonces.reserve_nonce(5.into()); let n4 = nonces.reserve_nonce(5.into());
assert!(!nonces.is_empty());
// Check first nonce // Check first nonce
let r = n1.wait().unwrap(); let r = n1.wait().unwrap();
@ -234,11 +301,13 @@ mod tests {
assert_eq!(r.value(), &U256::from(10)); assert_eq!(r.value(), &U256::from(10));
assert!(r.matches_prospective()); assert!(r.matches_prospective());
r.mark_used(); r.mark_used();
assert!(nonces.is_empty());
} }
#[test] #[test]
fn should_return_prospective_nonce() { fn should_return_prospective_nonce() {
let mut nonces = Reservations::new(); let mut nonces = SenderReservations::new();
let n1 = nonces.reserve_nonce(5.into()); let n1 = nonces.reserve_nonce(5.into());
let n2 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into());