Minimal effective gas price in the queue (#8934)

* Minimal effective gas price.

* Fix naming, add test

* Fix minimal entry score and add test.

* Fix worst_transaction.

* Remove effective gas price threshold.

* Don't leak gas_price decisions out of Scoring.
This commit is contained in:
Tomasz Drwięga 2018-06-30 11:11:31 +02:00 committed by André Silva
parent 768d15c6e5
commit 8b169ecfad
6 changed files with 176 additions and 39 deletions

View File

@ -176,7 +176,20 @@ impl TransactionQueue {
let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import");
let options = self.options.read().clone();
let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());
let transaction_to_replace = {
let pool = self.pool.read();
if pool.is_full() {
pool.worst_transaction().map(|worst| (pool.scoring().clone(), worst))
} else {
None
}
};
let verifier = verifier::Verifier::new(
client,
options,
self.insertion_id.clone(),
transaction_to_replace,
);
let results = transactions
.into_iter()
.map(|transaction| {

View File

@ -31,19 +31,42 @@ use std::cmp;
use ethereum_types::U256;
use txpool;
use super::{PrioritizationStrategy, VerifiedTransaction};
use super::{verifier, PrioritizationStrategy, VerifiedTransaction};
/// Transaction with the same (sender, nonce) can be replaced only if
/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT`
const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25%
/// Calculate minimal gas price requirement.
#[inline]
fn bump_gas_price(old_gp: U256) -> U256 {
old_gp.saturating_add(old_gp >> GAS_PRICE_BUMP_SHIFT)
}
/// Simple, gas-price based scoring for transactions.
///
/// NOTE: Currently penalization does not apply to new transactions that enter the pool.
/// We might want to store penalization status in some persistent state.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NonceAndGasPrice(pub PrioritizationStrategy);
impl NonceAndGasPrice {
/// Decide if the transaction should even be considered into the pool (if the pool is full).
///
/// Used by Verifier to quickly reject transactions that don't have any chance to get into the pool later on,
/// and save time on more expensive checks like sender recovery, etc.
///
/// NOTE The method is never called for zero-gas-price transactions or local transactions
/// (such transactions are always considered to the pool and potentially rejected later on)
pub fn should_reject_early(&self, old: &VerifiedTransaction, new: &verifier::Transaction) -> bool {
if old.priority().is_local() {
return true
}
&old.transaction.gas_price > new.gas_price()
}
}
impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
type Score = U256;
type Event = ();
@ -60,7 +83,7 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
let old_gp = old.transaction.gas_price;
let new_gp = new.transaction.gas_price;
let min_required_gp = old_gp + (old_gp >> GAS_PRICE_BUMP_SHIFT);
let min_required_gp = bump_gas_price(old_gp);
match min_required_gp.cmp(&new_gp) {
cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew,

View File

@ -815,8 +815,8 @@ fn should_avoid_verifying_transaction_already_in_pool() {
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new();
let tx1 = Tx::default().signed().unverified();
let client = TestClient::new().with_balance(1_000_000_000);
let tx1 = Tx::gas_price(2).signed().unverified();
let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Ok(())]);
@ -832,3 +832,41 @@ fn should_avoid_verifying_transaction_already_in_pool() {
// then
assert_eq!(txq.status().status.transaction_count, 1);
}
#[test]
fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new().with_balance(1_000_000_000);
let tx1 = Tx::gas_price(2).signed().unverified();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
assert!(client.was_verification_triggered());
// when
let client = TestClient::new();
let tx1 = Tx::default().signed().unverified();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientGasPrice {
minimal: 2.into(),
got: 1.into(),
})]);
assert!(!client.was_verification_triggered());
// then
assert_eq!(txq.status().status.transaction_count, 1);
}

View File

@ -85,6 +85,15 @@ impl Transaction {
}
}
/// Return transaction gas price
pub fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn gas(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas,
@ -93,15 +102,6 @@ impl Transaction {
}
}
fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn transaction(&self) -> &transaction::Transaction {
match *self {
Transaction::Unverified(ref tx) => &*tx,
@ -129,24 +129,31 @@ impl Transaction {
///
/// Verification can be run in parallel for all incoming transactions.
#[derive(Debug)]
pub struct Verifier<C> {
pub struct Verifier<C, S, V> {
client: C,
options: Options,
id: Arc<AtomicUsize>,
transaction_to_replace: Option<(S, Arc<V>)>,
}
impl<C> Verifier<C> {
impl<C, S, V> Verifier<C, S, V> {
/// Creates new transaction verfier with specified options.
pub fn new(client: C, options: Options, id: Arc<AtomicUsize>) -> Self {
pub fn new(
client: C,
options: Options,
id: Arc<AtomicUsize>,
transaction_to_replace: Option<(S, Arc<V>)>,
) -> Self {
Verifier {
client,
options,
id,
transaction_to_replace,
}
}
}
impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
impl<C: Client> txpool::Verifier<Transaction> for Verifier<C, ::pool::scoring::NonceAndGasPrice, VerifiedTransaction> {
type Error = transaction::Error;
type VerifiedTransaction = VerifiedTransaction;
@ -165,7 +172,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
if tx.gas() > &gas_limit {
debug!(
target: "txqueue",
"[{:?}] Dropping transaction above gas limit: {} > min({}, {})",
"[{:?}] Rejected transaction above gas limit: {} > min({}, {})",
hash,
tx.gas(),
self.options.block_gas_limit,
@ -180,7 +187,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
let minimal_gas = self.client.required_gas(tx.transaction());
if tx.gas() < &minimal_gas {
trace!(target: "txqueue",
"[{:?}] Dropping transaction with insufficient gas: {} < {}",
"[{:?}] Rejected transaction with insufficient gas: {} < {}",
hash,
tx.gas(),
minimal_gas,
@ -193,11 +200,12 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
}
let is_own = tx.is_local();
// Quick exit for non-service transactions
if tx.gas_price() < &self.options.minimal_gas_price
&& !tx.gas_price().is_zero()
&& !is_own
{
// Quick exit for non-service and non-local transactions
//
// We're checking if the transaction is below configured minimal gas price
// or the effective minimal gas price in case the pool is full.
if !tx.gas_price().is_zero() && !is_own {
if tx.gas_price() < &self.options.minimal_gas_price {
trace!(
target: "txqueue",
"[{:?}] Rejected tx below minimal gas price threshold: {} < {}",
@ -211,6 +219,23 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
});
}
if let Some((ref scoring, ref vtx)) = self.transaction_to_replace {
if scoring.should_reject_early(vtx, &tx) {
trace!(
target: "txqueue",
"[{:?}] Rejected tx early, cause it doesn't have any chance to get to the pool: (gas price: {} < {})",
hash,
tx.gas_price(),
vtx.transaction.gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: vtx.transaction.gas_price,
got: *tx.gas_price(),
});
}
}
}
// Some more heavy checks below.
// Actually recover sender and verify that transaction
let is_retracted = tx.is_retracted();

View File

@ -390,7 +390,13 @@ impl<T, S, L> Pool<T, S, L> where
/// Returns worst transaction in the queue (if any).
pub fn worst_transaction(&self) -> Option<Arc<T>> {
self.worst_transactions.iter().next().map(|x| x.transaction.transaction.clone())
self.worst_transactions.iter().next_back().map(|x| x.transaction.transaction.clone())
}
/// Returns true if the pool is at it's capacity.
pub fn is_full(&self) -> bool {
self.by_hash.len() >= self.options.max_count
|| self.mem_usage >= self.options.max_mem_usage
}
/// Returns an iterator of pending (ready) transactions.
@ -477,6 +483,11 @@ impl<T, S, L> Pool<T, S, L> where
&self.listener
}
/// Borrows scoring instance.
pub fn scoring(&self) -> &S {
&self.scoring
}
/// Borrows listener mutably.
pub fn listener_mut(&mut self) -> &mut L {
&mut self.listener

View File

@ -48,6 +48,15 @@ pub type SharedTransaction = Arc<Transaction>;
type TestPool = Pool<Transaction, DummyScoring>;
impl TestPool {
pub fn with_limit(max_count: usize) -> Self {
Self::with_options(Options {
max_count,
..Default::default()
})
}
}
#[test]
fn should_clear_queue() {
// given
@ -445,9 +454,27 @@ fn should_return_worst_transaction() {
// when
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).gas_price(4).new()).unwrap();
// then
assert!(txq.worst_transaction().is_some());
assert_eq!(txq.worst_transaction().unwrap().gas_price, 4.into());
}
#[test]
fn should_return_is_full() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_limit(2);
assert!(!txq.is_full());
// when
txq.import(b.tx().nonce(0).gas_price(110).new()).unwrap();
assert!(!txq.is_full());
txq.import(b.tx().sender(1).nonce(0).gas_price(100).new()).unwrap();
// then
assert!(txq.is_full());
}
mod listener {