diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 281b97589..a5bc1e154 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::cmp::PartialEq; -use std::collections::{BTreeMap, HashSet, HashMap}; +use std::collections::{BTreeMap, HashSet}; use std::str::FromStr; use std::sync::{Arc, Weak}; @@ -241,7 +241,7 @@ impl FullDependencies { ($namespace:ident, $handler:expr, $deps:expr, $nonces:expr) => { { let deps = &$deps; - let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), deps.fetch.pool(), $nonces); + let dispatcher = FullDispatcher::new(deps.client.clone(), deps.miner.clone(), $nonces); if deps.signer_service.is_enabled() { $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, deps.remote.clone(), &deps.secret_store))) } else { @@ -251,11 +251,10 @@ impl FullDependencies { } } - let nonces = Arc::new(Mutex::new(HashMap::new())); + let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))); let dispatcher = FullDispatcher::new( self.client.clone(), self.miner.clone(), - self.fetch.pool(), nonces.clone(), ); for api in apis { @@ -440,8 +439,7 @@ impl LightDependencies { self.on_demand.clone(), self.cache.clone(), self.transaction_queue.clone(), - self.fetch.pool(), - Arc::new(Mutex::new(HashMap::new())), + Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))), ); macro_rules! add_signing_methods { diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index f17797619..35e16f573 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -19,7 +19,6 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; -use std::collections::HashMap; use light::cache::Cache as LightDataCache; use light::client::LightChainClient; @@ -33,7 +32,6 @@ use util::Address; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; use stats::Corpus; -use futures_cpupool::CpuPool; use ethkey::Signature; use ethsync::LightSync; @@ -89,8 +87,7 @@ pub trait Dispatcher: Send + Sync + Clone { pub struct FullDispatcher { client: Arc, miner: Arc, - nonces: Arc>>, - pool: CpuPool, + nonces: Arc>, } impl FullDispatcher { @@ -98,14 +95,12 @@ impl FullDispatcher { pub fn new( client: Arc, miner: Arc, - pool: CpuPool, - nonces: Arc>>, + nonces: Arc>, ) -> Self { FullDispatcher { client, miner, nonces, - pool, } } } @@ -116,7 +111,6 @@ impl Clone for FullDispatcher { client: self.client.clone(), miner: self.miner.clone(), nonces: self.nonces.clone(), - pool: self.pool.clone(), } } } @@ -172,9 +166,7 @@ impl Dispatcher for FullDispatcher>, /// Nonce reservations - pub nonces: Arc>>, - /// Cpu pool - pub pool: CpuPool, + pub nonces: Arc>, } impl LightDispatcher { @@ -280,8 +270,7 @@ impl LightDispatcher { on_demand: Arc, cache: Arc>, transaction_queue: Arc>, - pool: CpuPool, - nonces: Arc>>, + nonces: Arc>, ) -> Self { LightDispatcher { sync, @@ -290,7 +279,6 @@ impl LightDispatcher { cache, transaction_queue, nonces, - pool, } } @@ -396,13 +384,11 @@ impl Dispatcher for LightDispatcher { } let nonces = self.nonces.clone(); - let pool = self.pool.clone(); Box::new(self.next_nonce(filled.from) .map_err(|_| errors::no_light_peers()) .and_then(move |nonce| { - let reserved = nonces.lock().entry(filled.from) - .or_insert(nonce::Reservations::with_pool(pool)) - .reserve_nonce(nonce); + let reserved = nonces.lock().reserve(filled.from, nonce); + ProspectiveSigner::new(accounts, filled, chain_id, reserved, password) })) } diff --git a/rpc/src/v1/helpers/nonce.rs b/rpc/src/v1/helpers/nonce.rs index a048d9a87..2b4df49ae 100644 --- a/rpc/src/v1/helpers/nonce.rs +++ b/rpc/src/v1/helpers/nonce.rs @@ -15,35 +15,84 @@ // along with Parity. If not, see . use std::{cmp, mem}; +use std::collections::HashMap; use std::sync::{atomic, Arc}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use bigint::prelude::U256; use futures::{Future, future, Poll, Async}; use futures::future::Either; use futures::sync::oneshot; use futures_cpupool::CpuPool; +use util::Address; -/// Manages currently reserved and prospective nonces. +/// Manages currently reserved and prospective nonces +/// for multiple senders. #[derive(Debug)] pub struct Reservations { - previous: Option>, + nonces: HashMap, pool: CpuPool, - prospective_value: U256, - dropped: Arc, } - impl Reservations { + /// A maximal number of reserved nonces in the hashmap + /// before we start clearing the unused ones. + const CLEAN_AT: usize = 512; + /// Create new nonces manager and spawn a single-threaded cpu pool /// for progressing execution of dropped nonces. pub fn new() -> Self { Self::with_pool(CpuPool::new(1)) } - /// Create new nonces manager with given cpu pool. + /// Create new nonces manager with given cpupool. pub fn with_pool(pool: CpuPool) -> Self { Reservations { + nonces: Default::default(), + pool, + } + } + + /// Reserve a nonce for particular address. + /// + /// The reserved nonce cannot be smaller than the minimal nonce. + pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved { + if self.nonces.len() + 1 > Self::CLEAN_AT { + let to_remove = self.nonces.iter().filter(|&(_, v)| v.is_empty()).map(|(k, _)| *k).collect::>(); + for address in to_remove { + self.nonces.remove(&address); + } + } + + let pool = &self.pool; + self.nonces.entry(sender) + .or_insert_with(move || SenderReservations::with_pool(pool.clone())) + .reserve_nonce(minimal) + } +} + +/// Manages currently reserved and prospective nonces. +#[derive(Debug)] +pub struct SenderReservations { + previous: Option>, + previous_ready: Arc, + pool: CpuPool, + prospective_value: U256, + dropped: Arc, +} + +impl SenderReservations { + /// Create new nonces manager and spawn a single-threaded cpu pool + /// for progressing execution of dropped nonces. + #[cfg(test)] + pub fn new() -> Self { + Self::with_pool(CpuPool::new(1)) + } + + /// Create new nonces manager with given cpu pool. + pub fn with_pool(pool: CpuPool) -> Self { + SenderReservations { previous: None, + previous_ready: Arc::new(AtomicBool::new(true)), pool, prospective_value: Default::default(), dropped: Default::default(), @@ -64,12 +113,15 @@ impl Reservations { let (next, rx) = oneshot::channel(); let next = Some(next); + let next_sent = Arc::new(AtomicBool::default()); let pool = self.pool.clone(); let dropped = self.dropped.clone(); + self.previous_ready = next_sent.clone(); match mem::replace(&mut self.previous, Some(rx)) { Some(previous) => Reserved { previous: Either::A(previous), next, + next_sent, minimal, prospective_value, pool, @@ -78,6 +130,7 @@ impl Reservations { None => Reserved { previous: Either::B(future::ok(minimal)), next, + next_sent, minimal, prospective_value, pool, @@ -85,6 +138,11 @@ impl Reservations { }, } } + + /// Returns true if there are no reserved nonces. + pub fn is_empty(&self) -> bool { + self.previous_ready.load(atomic::Ordering::SeqCst) + } } /// Represents a future nonce. @@ -92,9 +150,10 @@ impl Reservations { pub struct Reserved { previous: Either< oneshot::Receiver, - future::FutureResult + future::FutureResult, >, next: Option>, + next_sent: Arc, minimal: U256, prospective_value: U256, pool: CpuPool, @@ -128,6 +187,7 @@ impl Future for Reserved { value, matches_prospective, next: self.next.take(), + next_sent: self.next_sent.clone(), dropped: self.dropped.clone(), })) } @@ -136,10 +196,12 @@ impl Future for Reserved { impl Drop for Reserved { fn drop(&mut self) { if let Some(next) = self.next.take() { + let next_sent = self.next_sent.clone(); self.dropped.fetch_add(1, atomic::Ordering::SeqCst); // If Reserved is dropped just pipe previous and next together. let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default()))); - self.pool.spawn(previous.map(|nonce| { + self.pool.spawn(previous.map(move |nonce| { + next_sent.store(true, atomic::Ordering::SeqCst); next.send(nonce).expect(Ready::RECV_PROOF) })).forget() } @@ -156,6 +218,7 @@ pub struct Ready { value: U256, matches_prospective: bool, next: Option>, + next_sent: Arc, dropped: Arc, } @@ -176,15 +239,17 @@ impl Ready { /// Make sure to call that method after this nonce has been consumed. pub fn mark_used(mut self) { let next = self.next.take().expect("Nonce can be marked as used only once; qed"); + self.next_sent.store(true, atomic::Ordering::SeqCst); next.send(self.value + 1.into()).expect(Self::RECV_PROOF); } } impl Drop for Ready { fn drop(&mut self) { - if let Some(send) = self.next.take() { + if let Some(next) = self.next.take() { self.dropped.fetch_add(1, atomic::Ordering::SeqCst); - send.send(self.value).expect(Self::RECV_PROOF); + self.next_sent.store(true, atomic::Ordering::SeqCst); + next.send(self.value).expect(Self::RECV_PROOF); } } } @@ -195,12 +260,14 @@ mod tests { #[test] fn should_reserve_a_set_of_nonces_and_resolve_them() { - let mut nonces = Reservations::new(); + let mut nonces = SenderReservations::new(); + assert!(nonces.is_empty()); let n1 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into()); let n3 = nonces.reserve_nonce(5.into()); let n4 = nonces.reserve_nonce(5.into()); + assert!(!nonces.is_empty()); // Check first nonce let r = n1.wait().unwrap(); @@ -234,11 +301,13 @@ mod tests { assert_eq!(r.value(), &U256::from(10)); assert!(r.matches_prospective()); r.mark_used(); + + assert!(nonces.is_empty()); } #[test] fn should_return_prospective_nonce() { - let mut nonces = Reservations::new(); + let mut nonces = SenderReservations::new(); let n1 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into());