Limit the number of transactions in pending set (#8777)

* Unordered iterator.

* Use unordered and limited set if full not required.

* Split timeout work into smaller timers.

* Avoid collecting all pending transactions when mining

* Remove println.

* Use priority ordering in eth-filter.

* Fix ethcore-miner tests and tx propagation.

* Review grumbles addressed.

* Add test for unordered not populating the cache.

* Fix ethcore tests.

* Fix light tests.

* Fix ethcore-sync tests.

* Fix RPC tests.
This commit is contained in:
Tomasz Drwięga
2018-06-12 08:22:54 +02:00
committed by Marek Kotewicz
parent 4817b94d0b
commit 4938d5dde5
29 changed files with 415 additions and 115 deletions

View File

@@ -30,13 +30,14 @@ extern crate linked_hash_map;
extern crate parking_lot;
extern crate price_info;
extern crate rlp;
extern crate trace_time;
extern crate transaction_pool as txpool;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
extern crate trace_time;
#[cfg(test)]
extern crate rustc_hex;

View File

@@ -16,7 +16,7 @@
//! Transaction Pool
use ethereum_types::{H256, Address};
use ethereum_types::{U256, H256, Address};
use heapsize::HeapSizeOf;
use transaction;
use txpool;
@@ -45,6 +45,43 @@ pub enum PrioritizationStrategy {
GasPriceOnly,
}
/// Transaction ordering when requesting pending set.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PendingOrdering {
/// Get pending transactions ordered by their priority (potentially expensive)
Priority,
/// Get pending transactions without any care of particular ordering (cheaper).
Unordered,
}
/// Pending set query settings
#[derive(Debug, Clone)]
pub struct PendingSettings {
/// Current block number (affects readiness of some transactions).
pub block_number: u64,
/// Current timestamp (affects readiness of some transactions).
pub current_timestamp: u64,
/// Nonce cap (for dust protection; EIP-168)
pub nonce_cap: Option<U256>,
/// Maximal number of transactions in pending the set.
pub max_len: usize,
/// Ordering of transactions.
pub ordering: PendingOrdering,
}
impl PendingSettings {
/// Get all transactions (no cap or len limit) prioritized.
pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self {
PendingSettings {
block_number,
current_timestamp,
nonce_cap: None,
max_len: usize::max_value(),
ordering: PendingOrdering::Priority,
}
}
}
/// Transaction priority.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) enum Priority {

View File

@@ -26,7 +26,10 @@ use parking_lot::RwLock;
use transaction;
use txpool::{self, Verifier};
use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy};
use pool::{
self, scoring, verifier, client, ready, listener,
PrioritizationStrategy, PendingOrdering, PendingSettings,
};
use pool::local_transactions::LocalTransactionsList;
type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger));
@@ -74,6 +77,7 @@ struct CachedPending {
nonce_cap: Option<U256>,
has_local_pending: bool,
pending: Option<Vec<Arc<pool::VerifiedTransaction>>>,
max_len: usize,
}
impl CachedPending {
@@ -85,6 +89,7 @@ impl CachedPending {
has_local_pending: false,
pending: None,
nonce_cap: None,
max_len: 0,
}
}
@@ -99,6 +104,7 @@ impl CachedPending {
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<&U256>,
max_len: usize,
) -> Option<Vec<Arc<pool::VerifiedTransaction>>> {
// First check if we have anything in cache.
let pending = self.pending.as_ref()?;
@@ -123,7 +129,12 @@ impl CachedPending {
return None;
}
Some(pending.clone())
// It's fine to just take a smaller subset, but not other way around.
if max_len > self.max_len {
return None;
}
Some(pending.iter().take(max_len).cloned().collect())
}
}
@@ -173,7 +184,7 @@ impl TransactionQueue {
transactions: Vec<verifier::Transaction>,
) -> Vec<Result<(), transaction::Error>> {
// Run verification
let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import");
trace_time!("pool::verify_and_import");
let options = self.options.read().clone();
let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());
@@ -203,13 +214,13 @@ impl TransactionQueue {
results
}
/// Returns all transactions in the queue ordered by priority.
/// Returns all transactions in the queue without explicit ordering.
pub fn all_transactions(&self) -> Vec<Arc<pool::VerifiedTransaction>> {
let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready;
self.pool.read().pending(ready).collect()
self.pool.read().unordered_pending(ready).collect()
}
/// Returns current pneding transactions.
/// Returns current pending transactions ordered by priority.
///
/// NOTE: This may return a cached version of pending transaction set.
/// Re-computing the pending set is possible with `#collect_pending` method,
@@ -217,24 +228,31 @@ impl TransactionQueue {
pub fn pending<C>(
&self,
client: C,
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<U256>,
settings: PendingSettings,
) -> Vec<Arc<pool::VerifiedTransaction>> where
C: client::NonceClient,
{
if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) {
let PendingSettings { block_number, current_timestamp, nonce_cap, max_len, ordering } = settings;
if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) {
return pending;
}
// Double check after acquiring write lock
let mut cached_pending = self.cached_pending.write();
if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) {
if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) {
return pending;
}
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect());
// In case we don't have a cached set, but we don't care about order
// just return the unordered set.
if let PendingOrdering::Unordered = ordering {
let ready = Self::ready(client, block_number, current_timestamp, nonce_cap);
return self.pool.read().unordered_pending(ready).take(max_len).collect();
}
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| {
i.take(max_len).collect()
});
*cached_pending = CachedPending {
block_number,
@@ -242,6 +260,7 @@ impl TransactionQueue {
nonce_cap,
has_local_pending: self.has_local_pending_transactions(),
pending: Some(pending.clone()),
max_len,
};
pending
@@ -266,15 +285,27 @@ impl TransactionQueue {
scoring::NonceAndGasPrice,
Listener,
>) -> T,
{
debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number);
trace_time!("pool::collect_pending");
let ready = Self::ready(client, block_number, current_timestamp, nonce_cap);
collect(self.pool.read().pending(ready))
}
fn ready<C>(
client: C,
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<U256>,
) -> (ready::Condition, ready::State<C>) where
C: client::NonceClient,
{
let pending_readiness = ready::Condition::new(block_number, current_timestamp);
// don't mark any transactions as stale at this point.
let stale_id = None;
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
let ready = (pending_readiness, state_readiness);
collect(self.pool.read().pending(ready))
(pending_readiness, state_readiness)
}
/// Culls all stalled transactions from the pool.
@@ -415,6 +446,12 @@ impl TransactionQueue {
let mut pool = self.pool.write();
(pool.listener_mut().1).0.add(f);
}
/// Check if pending set is cached.
#[cfg(test)]
pub fn is_pending_cached(&self) -> bool {
self.cached_pending.read().pending.is_some()
}
}
fn convert_error(err: txpool::Error) -> transaction::Error {
@@ -440,7 +477,7 @@ mod tests {
fn should_get_pending_transactions() {
let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly);
let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None);
let pending: Vec<_> = queue.pending(TestClient::default(), PendingSettings::all_prioritized(0, 0));
for tx in pending {
assert!(tx.signed().nonce > 0.into());

View File

@@ -18,7 +18,7 @@ use ethereum_types::U256;
use transaction::{self, PendingTransaction};
use txpool;
use pool::{verifier, TransactionQueue, PrioritizationStrategy};
use pool::{verifier, TransactionQueue, PrioritizationStrategy, PendingSettings, PendingOrdering};
pub mod tx;
pub mod client;
@@ -108,7 +108,7 @@ fn should_handle_same_transaction_imported_twice_with_different_state_nonces() {
// and then there should be only one transaction in current (the one with higher gas_price)
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash);
}
@@ -133,7 +133,7 @@ fn should_move_all_transactions_from_future() {
// then
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2);
}
@@ -207,7 +207,7 @@ fn should_import_txs_from_same_sender() {
txq.import(TestClient::new(), txs.local().into_vec());
// then
let top = txq.pending(TestClient::new(), 0 ,0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0 ,0));
assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
@@ -229,7 +229,7 @@ fn should_prioritize_local_transactions_within_same_nonce_height() {
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(client, 0, 0, None);
let top = txq.pending(client, PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); // local should be first
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
@@ -251,7 +251,7 @@ fn should_prioritize_reimported_transactions_within_same_nonce_height() {
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); // retracted should be first
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
@@ -270,7 +270,7 @@ fn should_not_prioritize_local_transactions_with_different_nonce_height() {
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
@@ -288,7 +288,7 @@ fn should_put_transaction_to_futures_if_gap_detected() {
// then
assert_eq!(res, vec![Ok(()), Ok(())]);
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top.len(), 1);
assert_eq!(top[0].hash, hash);
}
@@ -308,9 +308,9 @@ fn should_handle_min_block() {
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top.len(), 0);
let top = txq.pending(TestClient::new(), 1, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(1, 0));
assert_eq!(top.len(), 2);
}
@@ -341,7 +341,7 @@ fn should_move_transactions_if_gap_filled() {
let res = txq.import(TestClient::new(), vec![tx, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
// when
let res = txq.import(TestClient::new(), vec![tx1.local()]);
@@ -349,7 +349,7 @@ fn should_move_transactions_if_gap_filled() {
// then
assert_eq!(txq.status().status.transaction_count, 3);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3);
}
#[test]
@@ -361,12 +361,12 @@ fn should_remove_transaction() {
let res = txq.import(TestClient::default(), vec![tx, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
// when
txq.cull(TestClient::new().with_nonce(124));
assert_eq!(txq.status().status.transaction_count, 1);
assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new().with_nonce(125), PendingSettings::all_prioritized(0, 0)).len(), 1);
txq.cull(TestClient::new().with_nonce(126));
// then
@@ -384,19 +384,19 @@ fn should_move_transactions_to_future_if_gap_introduced() {
let res = txq.import(TestClient::new(), vec![tx3, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
let res = txq.import(TestClient::new(), vec![tx].local());
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 3);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3);
// when
txq.remove(vec![&hash], true);
// then
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
}
#[test]
@@ -447,7 +447,7 @@ fn should_prefer_current_transactions_when_hitting_the_limit() {
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
let top = txq.pending(TestClient::new(), 0, 0, None);
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top.len(), 1);
assert_eq!(top[0].hash, hash);
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into()));
@@ -494,19 +494,19 @@ fn should_accept_same_transaction_twice_if_removed() {
let res = txq.import(TestClient::new(), txs.local().into_vec());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2);
// when
txq.remove(vec![&hash], true);
assert_eq!(txq.status().status.transaction_count, 1);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 0);
let res = txq.import(TestClient::new(), vec![tx1].local());
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2);
}
#[test]
@@ -526,8 +526,8 @@ fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() {
// then
assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20));
assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2));
assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[0].signed().gas_price, U256::from(20));
assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[1].signed().gas_price, U256::from(2));
}
#[test]
@@ -569,7 +569,7 @@ fn should_return_valid_last_nonce_after_cull() {
let client = TestClient::new().with_nonce(124);
txq.cull(client.clone());
// tx2 should be not be promoted to current
assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0);
assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0)).len(), 0);
// then
assert_eq!(txq.next_nonce(client.clone(), &sender), None);
@@ -667,7 +667,7 @@ fn should_accept_local_transactions_below_min_gas_price() {
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
}
#[test]
@@ -685,7 +685,7 @@ fn should_accept_local_service_transaction() {
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
}
#[test]
@@ -726,15 +726,77 @@ fn should_not_return_transactions_over_nonce_cap() {
assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]);
// when
let all = txq.pending(TestClient::new(), 0, 0, None);
let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
// This should invalidate the cache!
let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into()));
let limited = txq.pending(TestClient::new(), PendingSettings {
block_number: 0,
current_timestamp: 0,
nonce_cap: Some(123.into()),
max_len: usize::max_value(),
ordering: PendingOrdering::Priority,
});
// then
assert_eq!(all.len(), 3);
assert_eq!(limited.len(), 1);
}
#[test]
fn should_return_cached_pending_even_if_unordered_is_requested() {
// given
let txq = new_queue();
let tx1 = Tx::default().signed();
let (tx2_1, tx2_2)= Tx::default().signed_pair();
let tx2_1_hash = tx2_1.hash();
let res = txq.import(TestClient::new(), vec![tx1].unverified());
assert_eq!(res, vec![Ok(())]);
let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
// when
let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(all[0].hash, tx2_1_hash);
assert_eq!(all.len(), 3);
// This should not invalidate the cache!
let limited = txq.pending(TestClient::new(), PendingSettings {
block_number: 0,
current_timestamp: 0,
nonce_cap: None,
max_len: 3,
ordering: PendingOrdering::Unordered,
});
// then
assert_eq!(all, limited);
}
#[test]
fn should_return_unordered_and_not_populate_the_cache() {
// given
let txq = new_queue();
let tx1 = Tx::default().signed();
let (tx2_1, tx2_2)= Tx::default().signed_pair();
let res = txq.import(TestClient::new(), vec![tx1].unverified());
assert_eq!(res, vec![Ok(())]);
let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
// when
// This should not invalidate the cache!
let limited = txq.pending(TestClient::new(), PendingSettings {
block_number: 0,
current_timestamp: 0,
nonce_cap: None,
max_len: usize::max_value(),
ordering: PendingOrdering::Unordered,
});
// then
assert_eq!(limited.len(), 3);
assert!(!txq.is_pending_cached());
}
#[test]
fn should_clear_cache_after_timeout_for_local() {
// given
@@ -748,12 +810,12 @@ fn should_clear_cache_after_timeout_for_local() {
// This should populate cache and set timestamp to 1
// when
assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0);
assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1)).len(), 0);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1000)).len(), 0);
// This should invalidate the cache and trigger transaction ready.
// then
assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2);
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1002)).len(), 2);
}
#[test]