Merge pull request #7025 from paritytech/fix-nonce-reservation
Fix nonce reservation
This commit is contained in:
commit
dd7177dbb2
@ -15,8 +15,7 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::cmp::PartialEq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
|
@ -92,7 +92,11 @@ pub struct FullDispatcher<C, M> {
|
||||
|
||||
impl<C, M> FullDispatcher<C, M> {
|
||||
/// Create a `FullDispatcher` from Arc references to a client and miner.
|
||||
pub fn new(client: Arc<C>, miner: Arc<M>, nonces: Arc<Mutex<nonce::Reservations>>) -> Self {
|
||||
pub fn new(
|
||||
client: Arc<C>,
|
||||
miner: Arc<M>,
|
||||
nonces: Arc<Mutex<nonce::Reservations>>,
|
||||
) -> Self {
|
||||
FullDispatcher {
|
||||
client,
|
||||
miner,
|
||||
@ -162,7 +166,8 @@ impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C
|
||||
}
|
||||
|
||||
let state = self.state_nonce(&filled.from);
|
||||
let reserved = self.nonces.lock().reserve_nonce(state);
|
||||
let reserved = self.nonces.lock().reserve(filled.from, state);
|
||||
|
||||
Box::new(ProspectiveSigner::new(accounts, filled, chain_id, reserved, password))
|
||||
}
|
||||
|
||||
@ -382,7 +387,8 @@ impl Dispatcher for LightDispatcher {
|
||||
Box::new(self.next_nonce(filled.from)
|
||||
.map_err(|_| errors::no_light_peers())
|
||||
.and_then(move |nonce| {
|
||||
let reserved = nonces.lock().reserve_nonce(nonce);
|
||||
let reserved = nonces.lock().reserve(filled.from, nonce);
|
||||
|
||||
ProspectiveSigner::new(accounts, filled, chain_id, reserved, password)
|
||||
}))
|
||||
}
|
||||
|
@ -15,35 +15,81 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{cmp, mem};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{atomic, Arc};
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
|
||||
use bigint::prelude::U256;
|
||||
use futures::{Future, future, Poll, Async};
|
||||
use futures::future::Either;
|
||||
use futures::sync::oneshot;
|
||||
use futures_cpupool::CpuPool;
|
||||
use util::Address;
|
||||
|
||||
/// Manages currently reserved and prospective nonces.
|
||||
/// Manages currently reserved and prospective nonces
|
||||
/// for multiple senders.
|
||||
#[derive(Debug)]
|
||||
pub struct Reservations {
|
||||
previous: Option<oneshot::Receiver<U256>>,
|
||||
nonces: HashMap<Address, SenderReservations>,
|
||||
pool: CpuPool,
|
||||
prospective_value: U256,
|
||||
dropped: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl Reservations {
|
||||
/// A maximal number of reserved nonces in the hashmap
|
||||
/// before we start clearing the unused ones.
|
||||
const CLEAN_AT: usize = 512;
|
||||
|
||||
/// Create new nonces manager and spawn a single-threaded cpu pool
|
||||
/// for progressing execution of dropped nonces.
|
||||
pub fn new() -> Self {
|
||||
Self::with_pool(CpuPool::new(1))
|
||||
}
|
||||
|
||||
/// Create new nonces manager with given cpu pool.
|
||||
/// Create new nonces manager with given cpupool.
|
||||
pub fn with_pool(pool: CpuPool) -> Self {
|
||||
Reservations {
|
||||
nonces: Default::default(),
|
||||
pool,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reserve a nonce for particular address.
|
||||
///
|
||||
/// The reserved nonce cannot be smaller than the minimal nonce.
|
||||
pub fn reserve(&mut self, sender: Address, minimal: U256) -> Reserved {
|
||||
if self.nonces.len() + 1 > Self::CLEAN_AT {
|
||||
self.nonces.retain(|_, v| !v.is_empty());
|
||||
}
|
||||
|
||||
let pool = &self.pool;
|
||||
self.nonces.entry(sender)
|
||||
.or_insert_with(move || SenderReservations::with_pool(pool.clone()))
|
||||
.reserve_nonce(minimal)
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages currently reserved and prospective nonces.
|
||||
#[derive(Debug)]
|
||||
pub struct SenderReservations {
|
||||
previous: Option<oneshot::Receiver<U256>>,
|
||||
previous_ready: Arc<AtomicBool>,
|
||||
pool: CpuPool,
|
||||
prospective_value: U256,
|
||||
dropped: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl SenderReservations {
|
||||
/// Create new nonces manager and spawn a single-threaded cpu pool
|
||||
/// for progressing execution of dropped nonces.
|
||||
#[cfg(test)]
|
||||
pub fn new() -> Self {
|
||||
Self::with_pool(CpuPool::new(1))
|
||||
}
|
||||
|
||||
/// Create new nonces manager with given cpu pool.
|
||||
pub fn with_pool(pool: CpuPool) -> Self {
|
||||
SenderReservations {
|
||||
previous: None,
|
||||
previous_ready: Arc::new(AtomicBool::new(true)),
|
||||
pool,
|
||||
prospective_value: Default::default(),
|
||||
dropped: Default::default(),
|
||||
@ -64,12 +110,15 @@ impl Reservations {
|
||||
|
||||
let (next, rx) = oneshot::channel();
|
||||
let next = Some(next);
|
||||
let next_sent = Arc::new(AtomicBool::default());
|
||||
let pool = self.pool.clone();
|
||||
let dropped = self.dropped.clone();
|
||||
self.previous_ready = next_sent.clone();
|
||||
match mem::replace(&mut self.previous, Some(rx)) {
|
||||
Some(previous) => Reserved {
|
||||
previous: Either::A(previous),
|
||||
next,
|
||||
next_sent,
|
||||
minimal,
|
||||
prospective_value,
|
||||
pool,
|
||||
@ -78,6 +127,7 @@ impl Reservations {
|
||||
None => Reserved {
|
||||
previous: Either::B(future::ok(minimal)),
|
||||
next,
|
||||
next_sent,
|
||||
minimal,
|
||||
prospective_value,
|
||||
pool,
|
||||
@ -85,6 +135,11 @@ impl Reservations {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if there are no reserved nonces.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.previous_ready.load(atomic::Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a future nonce.
|
||||
@ -92,9 +147,10 @@ impl Reservations {
|
||||
pub struct Reserved {
|
||||
previous: Either<
|
||||
oneshot::Receiver<U256>,
|
||||
future::FutureResult<U256, oneshot::Canceled>
|
||||
future::FutureResult<U256, oneshot::Canceled>,
|
||||
>,
|
||||
next: Option<oneshot::Sender<U256>>,
|
||||
next_sent: Arc<AtomicBool>,
|
||||
minimal: U256,
|
||||
prospective_value: U256,
|
||||
pool: CpuPool,
|
||||
@ -128,6 +184,7 @@ impl Future for Reserved {
|
||||
value,
|
||||
matches_prospective,
|
||||
next: self.next.take(),
|
||||
next_sent: self.next_sent.clone(),
|
||||
dropped: self.dropped.clone(),
|
||||
}))
|
||||
}
|
||||
@ -136,10 +193,12 @@ impl Future for Reserved {
|
||||
impl Drop for Reserved {
|
||||
fn drop(&mut self) {
|
||||
if let Some(next) = self.next.take() {
|
||||
let next_sent = self.next_sent.clone();
|
||||
self.dropped.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
// If Reserved is dropped just pipe previous and next together.
|
||||
let previous = mem::replace(&mut self.previous, Either::B(future::ok(U256::default())));
|
||||
self.pool.spawn(previous.map(|nonce| {
|
||||
self.pool.spawn(previous.map(move |nonce| {
|
||||
next_sent.store(true, atomic::Ordering::SeqCst);
|
||||
next.send(nonce).expect(Ready::RECV_PROOF)
|
||||
})).forget()
|
||||
}
|
||||
@ -156,6 +215,7 @@ pub struct Ready {
|
||||
value: U256,
|
||||
matches_prospective: bool,
|
||||
next: Option<oneshot::Sender<U256>>,
|
||||
next_sent: Arc<AtomicBool>,
|
||||
dropped: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
@ -176,15 +236,17 @@ impl Ready {
|
||||
/// Make sure to call that method after this nonce has been consumed.
|
||||
pub fn mark_used(mut self) {
|
||||
let next = self.next.take().expect("Nonce can be marked as used only once; qed");
|
||||
self.next_sent.store(true, atomic::Ordering::SeqCst);
|
||||
next.send(self.value + 1.into()).expect(Self::RECV_PROOF);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Ready {
|
||||
fn drop(&mut self) {
|
||||
if let Some(send) = self.next.take() {
|
||||
if let Some(next) = self.next.take() {
|
||||
self.dropped.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
send.send(self.value).expect(Self::RECV_PROOF);
|
||||
self.next_sent.store(true, atomic::Ordering::SeqCst);
|
||||
next.send(self.value).expect(Self::RECV_PROOF);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -195,12 +257,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn should_reserve_a_set_of_nonces_and_resolve_them() {
|
||||
let mut nonces = Reservations::new();
|
||||
let mut nonces = SenderReservations::new();
|
||||
|
||||
assert!(nonces.is_empty());
|
||||
let n1 = nonces.reserve_nonce(5.into());
|
||||
let n2 = nonces.reserve_nonce(5.into());
|
||||
let n3 = nonces.reserve_nonce(5.into());
|
||||
let n4 = nonces.reserve_nonce(5.into());
|
||||
assert!(!nonces.is_empty());
|
||||
|
||||
// Check first nonce
|
||||
let r = n1.wait().unwrap();
|
||||
@ -234,11 +298,13 @@ mod tests {
|
||||
assert_eq!(r.value(), &U256::from(10));
|
||||
assert!(r.matches_prospective());
|
||||
r.mark_used();
|
||||
|
||||
assert!(nonces.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_return_prospective_nonce() {
|
||||
let mut nonces = Reservations::new();
|
||||
let mut nonces = SenderReservations::new();
|
||||
|
||||
let n1 = nonces.reserve_nonce(5.into());
|
||||
let n2 = nonces.reserve_nonce(5.into());
|
||||
|
Loading…
Reference in New Issue
Block a user