diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index be9dbdeb1..a5bc1e154 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -15,8 +15,7 @@ // along with Parity. If not, see . use std::cmp::PartialEq; -use std::collections::BTreeMap; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::str::FromStr; use std::sync::{Arc, Weak}; diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index c556226b5..35e16f573 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -92,7 +92,11 @@ pub struct FullDispatcher { impl FullDispatcher { /// Create a `FullDispatcher` from Arc references to a client and miner. - pub fn new(client: Arc, miner: Arc, nonces: Arc>) -> Self { + pub fn new( + client: Arc, + miner: Arc, + nonces: Arc>, + ) -> Self { FullDispatcher { client, miner, @@ -162,7 +166,8 @@ impl Dispatcher for FullDispatcher. use std::{cmp, mem}; +use std::collections::HashMap; use std::sync::{atomic, Arc}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use bigint::prelude::U256; use futures::{Future, future, Poll, Async}; use futures::future::Either; use futures::sync::oneshot; use futures_cpupool::CpuPool; +use util::Address; -/// Manages currently reserved and prospective nonces. +/// Manages currently reserved and prospective nonces +/// for multiple senders. #[derive(Debug)] pub struct Reservations { - previous: Option>, + nonces: HashMap, pool: CpuPool, - prospective_value: U256, - dropped: Arc, } - impl Reservations { + /// A maximal number of reserved nonces in the hashmap + /// before we start clearing the unused ones. + const CLEAN_AT: usize = 512; + /// Create new nonces manager and spawn a single-threaded cpu pool /// for progressing execution of dropped nonces. pub fn new() -> Self { Self::with_pool(CpuPool::new(1)) } - /// Create new nonces manager with given cpu pool. + /// Create new nonces manager with given cpupool. pub fn with_pool(pool: CpuPool) -> Self { Reservations { + nonces: Default::default(), + pool, + } + } + + /// Reserve a nonce for particular address. + /// + /// The reserved nonce cannot be smaller than the minimal nonce. + pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved { + if self.nonces.len() + 1 > Self::CLEAN_AT { + self.nonces.retain(|_, v| !v.is_empty()); + } + + let pool = &self.pool; + self.nonces.entry(sender) + .or_insert_with(move || SenderReservations::with_pool(pool.clone())) + .reserve_nonce(minimal) + } +} + +/// Manages currently reserved and prospective nonces. +#[derive(Debug)] +pub struct SenderReservations { + previous: Option>, + previous_ready: Arc, + pool: CpuPool, + prospective_value: U256, + dropped: Arc, +} + +impl SenderReservations { + /// Create new nonces manager and spawn a single-threaded cpu pool + /// for progressing execution of dropped nonces. + #[cfg(test)] + pub fn new() -> Self { + Self::with_pool(CpuPool::new(1)) + } + + /// Create new nonces manager with given cpu pool. + pub fn with_pool(pool: CpuPool) -> Self { + SenderReservations { previous: None, + previous_ready: Arc::new(AtomicBool::new(true)), pool, prospective_value: Default::default(), dropped: Default::default(), @@ -64,12 +110,15 @@ impl Reservations { let (next, rx) = oneshot::channel(); let next = Some(next); + let next_sent = Arc::new(AtomicBool::default()); let pool = self.pool.clone(); let dropped = self.dropped.clone(); + self.previous_ready = next_sent.clone(); match mem::replace(&mut self.previous, Some(rx)) { Some(previous) => Reserved { previous: Either::A(previous), next, + next_sent, minimal, prospective_value, pool, @@ -78,6 +127,7 @@ impl Reservations { None => Reserved { previous: Either::B(future::ok(minimal)), next, + next_sent, minimal, prospective_value, pool, @@ -85,6 +135,11 @@ impl Reservations { }, } } + + /// Returns true if there are no reserved nonces. + pub fn is_empty(&self) -> bool { + self.previous_ready.load(atomic::Ordering::SeqCst) + } } /// Represents a future nonce. @@ -92,9 +147,10 @@ impl Reservations { pub struct Reserved { previous: Either< oneshot::Receiver, - future::FutureResult + future::FutureResult, >, next: Option>, + next_sent: Arc, minimal: U256, prospective_value: U256, pool: CpuPool, @@ -128,6 +184,7 @@ impl Future for Reserved { value, matches_prospective, next: self.next.take(), + next_sent: self.next_sent.clone(), dropped: self.dropped.clone(), })) } @@ -136,10 +193,12 @@ impl Future for Reserved { impl Drop for Reserved { fn drop(&mut self) { if let Some(next) = self.next.take() { + let next_sent = self.next_sent.clone(); self.dropped.fetch_add(1, atomic::Ordering::SeqCst); // If Reserved is dropped just pipe previous and next together. let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default()))); - self.pool.spawn(previous.map(|nonce| { + self.pool.spawn(previous.map(move |nonce| { + next_sent.store(true, atomic::Ordering::SeqCst); next.send(nonce).expect(Ready::RECV_PROOF) })).forget() } @@ -156,6 +215,7 @@ pub struct Ready { value: U256, matches_prospective: bool, next: Option>, + next_sent: Arc, dropped: Arc, } @@ -176,15 +236,17 @@ impl Ready { /// Make sure to call that method after this nonce has been consumed. pub fn mark_used(mut self) { let next = self.next.take().expect("Nonce can be marked as used only once; qed"); + self.next_sent.store(true, atomic::Ordering::SeqCst); next.send(self.value + 1.into()).expect(Self::RECV_PROOF); } } impl Drop for Ready { fn drop(&mut self) { - if let Some(send) = self.next.take() { + if let Some(next) = self.next.take() { self.dropped.fetch_add(1, atomic::Ordering::SeqCst); - send.send(self.value).expect(Self::RECV_PROOF); + self.next_sent.store(true, atomic::Ordering::SeqCst); + next.send(self.value).expect(Self::RECV_PROOF); } } } @@ -195,12 +257,14 @@ mod tests { #[test] fn should_reserve_a_set_of_nonces_and_resolve_them() { - let mut nonces = Reservations::new(); + let mut nonces = SenderReservations::new(); + assert!(nonces.is_empty()); let n1 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into()); let n3 = nonces.reserve_nonce(5.into()); let n4 = nonces.reserve_nonce(5.into()); + assert!(!nonces.is_empty()); // Check first nonce let r = n1.wait().unwrap(); @@ -234,11 +298,13 @@ mod tests { assert_eq!(r.value(), &U256::from(10)); assert!(r.matches_prospective()); r.mark_used(); + + assert!(nonces.is_empty()); } #[test] fn should_return_prospective_nonce() { - let mut nonces = Reservations::new(); + let mut nonces = SenderReservations::new(); let n1 = nonces.reserve_nonce(5.into()); let n2 = nonces.reserve_nonce(5.into());