Minimal effective gas price in the queue (#8934)

* Minimal effective gas price.

* Fix naming, add test

* Fix minimal entry score and add test.

* Fix worst_transaction.

* Remove effective gas price threshold.

* Don't leak gas_price decisions out of Scoring.
This commit is contained in:
Tomasz Drwięga 2018-06-30 11:11:31 +02:00 committed by Afri Schoedon
parent 47ff3a9bee
commit 1792725651
6 changed files with 176 additions and 38 deletions

View File

@ -187,7 +187,20 @@ impl TransactionQueue {
trace_time!("pool::verify_and_import"); trace_time!("pool::verify_and_import");
let options = self.options.read().clone(); let options = self.options.read().clone();
let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone()); let transaction_to_replace = {
let pool = self.pool.read();
if pool.is_full() {
pool.worst_transaction().map(|worst| (pool.scoring().clone(), worst))
} else {
None
}
};
let verifier = verifier::Verifier::new(
client,
options,
self.insertion_id.clone(),
transaction_to_replace,
);
let results = transactions let results = transactions
.into_iter() .into_iter()
.map(|transaction| { .map(|transaction| {

View File

@ -31,19 +31,42 @@ use std::cmp;
use ethereum_types::U256; use ethereum_types::U256;
use txpool; use txpool;
use super::{PrioritizationStrategy, VerifiedTransaction}; use super::{verifier, PrioritizationStrategy, VerifiedTransaction};
/// Transaction with the same (sender, nonce) can be replaced only if /// Transaction with the same (sender, nonce) can be replaced only if
/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT` /// `new_gas_price > old_gas_price + old_gas_price >> SHIFT`
const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25% const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25%
/// Calculate minimal gas price requirement.
#[inline]
fn bump_gas_price(old_gp: U256) -> U256 {
old_gp.saturating_add(old_gp >> GAS_PRICE_BUMP_SHIFT)
}
/// Simple, gas-price based scoring for transactions. /// Simple, gas-price based scoring for transactions.
/// ///
/// NOTE: Currently penalization does not apply to new transactions that enter the pool. /// NOTE: Currently penalization does not apply to new transactions that enter the pool.
/// We might want to store penalization status in some persistent state. /// We might want to store penalization status in some persistent state.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct NonceAndGasPrice(pub PrioritizationStrategy); pub struct NonceAndGasPrice(pub PrioritizationStrategy);
impl NonceAndGasPrice {
/// Decide if the transaction should even be considered into the pool (if the pool is full).
///
/// Used by Verifier to quickly reject transactions that don't have any chance to get into the pool later on,
/// and save time on more expensive checks like sender recovery, etc.
///
/// NOTE The method is never called for zero-gas-price transactions or local transactions
/// (such transactions are always considered to the pool and potentially rejected later on)
pub fn should_reject_early(&self, old: &VerifiedTransaction, new: &verifier::Transaction) -> bool {
if old.priority().is_local() {
return true
}
&old.transaction.gas_price > new.gas_price()
}
}
impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice { impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
type Score = U256; type Score = U256;
type Event = (); type Event = ();
@ -60,7 +83,7 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
let old_gp = old.transaction.gas_price; let old_gp = old.transaction.gas_price;
let new_gp = new.transaction.gas_price; let new_gp = new.transaction.gas_price;
let min_required_gp = old_gp + (old_gp >> GAS_PRICE_BUMP_SHIFT); let min_required_gp = bump_gas_price(old_gp);
match min_required_gp.cmp(&new_gp) { match min_required_gp.cmp(&new_gp) {
cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew, cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew,

View File

@ -875,8 +875,8 @@ fn should_avoid_verifying_transaction_already_in_pool() {
}, },
PrioritizationStrategy::GasPriceOnly, PrioritizationStrategy::GasPriceOnly,
); );
let client = TestClient::new(); let client = TestClient::new().with_balance(1_000_000_000);
let tx1 = Tx::default().signed().unverified(); let tx1 = Tx::gas_price(2).signed().unverified();
let res = txq.import(client.clone(), vec![tx1.clone()]); let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
@ -892,3 +892,41 @@ fn should_avoid_verifying_transaction_already_in_pool() {
// then // then
assert_eq!(txq.status().status.transaction_count, 1); assert_eq!(txq.status().status.transaction_count, 1);
} }
#[test]
fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new().with_balance(1_000_000_000);
let tx1 = Tx::gas_price(2).signed().unverified();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
assert!(client.was_verification_triggered());
// when
let client = TestClient::new();
let tx1 = Tx::default().signed().unverified();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientGasPrice {
minimal: 2.into(),
got: 1.into(),
})]);
assert!(!client.was_verification_triggered());
// then
assert_eq!(txq.status().status.transaction_count, 1);
}

View File

@ -85,6 +85,15 @@ impl Transaction {
} }
} }
/// Return transaction gas price
pub fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn gas(&self) -> &U256 { fn gas(&self) -> &U256 {
match *self { match *self {
Transaction::Unverified(ref tx) => &tx.gas, Transaction::Unverified(ref tx) => &tx.gas,
@ -93,14 +102,6 @@ impl Transaction {
} }
} }
fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn transaction(&self) -> &transaction::Transaction { fn transaction(&self) -> &transaction::Transaction {
match *self { match *self {
Transaction::Unverified(ref tx) => &*tx, Transaction::Unverified(ref tx) => &*tx,
@ -128,24 +129,31 @@ impl Transaction {
/// ///
/// Verification can be run in parallel for all incoming transactions. /// Verification can be run in parallel for all incoming transactions.
#[derive(Debug)] #[derive(Debug)]
pub struct Verifier<C> { pub struct Verifier<C, S, V> {
client: C, client: C,
options: Options, options: Options,
id: Arc<AtomicUsize>, id: Arc<AtomicUsize>,
transaction_to_replace: Option<(S, Arc<V>)>,
} }
impl<C> Verifier<C> { impl<C, S, V> Verifier<C, S, V> {
/// Creates new transaction verfier with specified options. /// Creates new transaction verfier with specified options.
pub fn new(client: C, options: Options, id: Arc<AtomicUsize>) -> Self { pub fn new(
client: C,
options: Options,
id: Arc<AtomicUsize>,
transaction_to_replace: Option<(S, Arc<V>)>,
) -> Self {
Verifier { Verifier {
client, client,
options, options,
id, id,
transaction_to_replace,
} }
} }
} }
impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> { impl<C: Client> txpool::Verifier<Transaction> for Verifier<C, ::pool::scoring::NonceAndGasPrice, VerifiedTransaction> {
type Error = transaction::Error; type Error = transaction::Error;
type VerifiedTransaction = VerifiedTransaction; type VerifiedTransaction = VerifiedTransaction;
@ -164,7 +172,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
if tx.gas() > &gas_limit { if tx.gas() > &gas_limit {
debug!( debug!(
target: "txqueue", target: "txqueue",
"[{:?}] Dropping transaction above gas limit: {} > min({}, {})", "[{:?}] Rejected transaction above gas limit: {} > min({}, {})",
hash, hash,
tx.gas(), tx.gas(),
self.options.block_gas_limit, self.options.block_gas_limit,
@ -179,7 +187,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
let minimal_gas = self.client.required_gas(tx.transaction()); let minimal_gas = self.client.required_gas(tx.transaction());
if tx.gas() < &minimal_gas { if tx.gas() < &minimal_gas {
trace!(target: "txqueue", trace!(target: "txqueue",
"[{:?}] Dropping transaction with insufficient gas: {} < {}", "[{:?}] Rejected transaction with insufficient gas: {} < {}",
hash, hash,
tx.gas(), tx.gas(),
minimal_gas, minimal_gas,
@ -192,11 +200,12 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
} }
let is_own = tx.is_local(); let is_own = tx.is_local();
// Quick exit for non-service transactions // Quick exit for non-service and non-local transactions
if tx.gas_price() < &self.options.minimal_gas_price //
&& !tx.gas_price().is_zero() // We're checking if the transaction is below configured minimal gas price
&& !is_own // or the effective minimal gas price in case the pool is full.
{ if !tx.gas_price().is_zero() && !is_own {
if tx.gas_price() < &self.options.minimal_gas_price {
trace!( trace!(
target: "txqueue", target: "txqueue",
"[{:?}] Rejected tx below minimal gas price threshold: {} < {}", "[{:?}] Rejected tx below minimal gas price threshold: {} < {}",
@ -210,6 +219,23 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
}); });
} }
if let Some((ref scoring, ref vtx)) = self.transaction_to_replace {
if scoring.should_reject_early(vtx, &tx) {
trace!(
target: "txqueue",
"[{:?}] Rejected tx early, cause it doesn't have any chance to get to the pool: (gas price: {} < {})",
hash,
tx.gas_price(),
vtx.transaction.gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: vtx.transaction.gas_price,
got: *tx.gas_price(),
});
}
}
}
// Some more heavy checks below. // Some more heavy checks below.
// Actually recover sender and verify that transaction // Actually recover sender and verify that transaction
let is_retracted = tx.is_retracted(); let is_retracted = tx.is_retracted();

View File

@ -390,7 +390,13 @@ impl<T, S, L> Pool<T, S, L> where
/// Returns worst transaction in the queue (if any). /// Returns worst transaction in the queue (if any).
pub fn worst_transaction(&self) -> Option<Arc<T>> { pub fn worst_transaction(&self) -> Option<Arc<T>> {
self.worst_transactions.iter().next().map(|x| x.transaction.transaction.clone()) self.worst_transactions.iter().next_back().map(|x| x.transaction.transaction.clone())
}
/// Returns true if the pool is at it's capacity.
pub fn is_full(&self) -> bool {
self.by_hash.len() >= self.options.max_count
|| self.mem_usage >= self.options.max_mem_usage
} }
/// Returns an iterator of pending (ready) transactions. /// Returns an iterator of pending (ready) transactions.
@ -486,6 +492,11 @@ impl<T, S, L> Pool<T, S, L> where
&self.listener &self.listener
} }
/// Borrows scoring instance.
pub fn scoring(&self) -> &S {
&self.scoring
}
/// Borrows listener mutably. /// Borrows listener mutably.
pub fn listener_mut(&mut self) -> &mut L { pub fn listener_mut(&mut self) -> &mut L {
&mut self.listener &mut self.listener

View File

@ -48,6 +48,15 @@ pub type SharedTransaction = Arc<Transaction>;
type TestPool = Pool<Transaction, DummyScoring>; type TestPool = Pool<Transaction, DummyScoring>;
impl TestPool {
pub fn with_limit(max_count: usize) -> Self {
Self::with_options(Options {
max_count,
..Default::default()
})
}
}
#[test] #[test]
fn should_clear_queue() { fn should_clear_queue() {
// given // given
@ -505,9 +514,27 @@ fn should_return_worst_transaction() {
// when // when
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).gas_price(4).new()).unwrap();
// then // then
assert!(txq.worst_transaction().is_some()); assert_eq!(txq.worst_transaction().unwrap().gas_price, 4.into());
}
#[test]
fn should_return_is_full() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_limit(2);
assert!(!txq.is_full());
// when
txq.import(b.tx().nonce(0).gas_price(110).new()).unwrap();
assert!(!txq.is_full());
txq.import(b.tx().sender(1).nonce(0).gas_price(100).new()).unwrap();
// then
assert!(txq.is_full());
} }
mod listener { mod listener {