diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index 54e792af9..359220405 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -30,7 +30,7 @@ use std::cmp; use ethereum_types::U256; -use txpool; +use txpool::{self, scoring}; use super::{verifier, PrioritizationStrategy, VerifiedTransaction}; /// Transaction with the same (sender, nonce) can be replaced only if @@ -75,9 +75,9 @@ impl txpool::Scoring for NonceAndGasPrice { old.transaction.nonce.cmp(&other.transaction.nonce) } - fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> txpool::scoring::Choice { + fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice { if old.transaction.nonce != new.transaction.nonce { - return txpool::scoring::Choice::InsertNew + return scoring::Choice::InsertNew } let old_gp = old.transaction.gas_price; @@ -86,13 +86,13 @@ impl txpool::Scoring for NonceAndGasPrice { let min_required_gp = bump_gas_price(old_gp); match min_required_gp.cmp(&new_gp) { - cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew, - _ => txpool::scoring::Choice::ReplaceOld, + cmp::Ordering::Greater => scoring::Choice::RejectNew, + _ => scoring::Choice::ReplaceOld, } } - fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: txpool::scoring::Change) { - use self::txpool::scoring::Change; + fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: scoring::Change) { + use self::scoring::Change; match change { Change::Culled(_) => {}, @@ -122,19 +122,26 @@ impl txpool::Scoring for NonceAndGasPrice { } } - fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> bool { + fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice { if old.sender == new.sender { // prefer earliest transaction match new.transaction.nonce.cmp(&old.transaction.nonce) { - cmp::Ordering::Less => true, - cmp::Ordering::Greater => false, - cmp::Ordering::Equal => self.choose(old, new) == txpool::scoring::Choice::ReplaceOld, + cmp::Ordering::Less => scoring::Choice::ReplaceOld, + cmp::Ordering::Greater => scoring::Choice::RejectNew, + cmp::Ordering::Equal => self.choose(old, new), } + } else if old.priority().is_local() && new.priority().is_local() { + // accept local transactions over the limit + scoring::Choice::InsertNew } else { let old_score = (old.priority(), old.transaction.gas_price); let new_score = (new.priority(), new.transaction.gas_price); - new_score > old_score - } + if new_score > old_score { + scoring::Choice::ReplaceOld + } else { + scoring::Choice::RejectNew + } + } } } @@ -146,6 +153,7 @@ mod tests { use ethkey::{Random, Generator}; use pool::tests::tx::{Tx, TxExt}; use txpool::Scoring; + use txpool::scoring::Choice::*; #[test] fn should_replace_same_sender_by_nonce() { @@ -181,14 +189,14 @@ mod tests { } }).collect::>(); - assert!(!scoring.should_replace(&txs[0], &txs[1])); - assert!(scoring.should_replace(&txs[1], &txs[0])); + assert_eq!(scoring.should_replace(&txs[0], &txs[1]), RejectNew); + assert_eq!(scoring.should_replace(&txs[1], &txs[0]), ReplaceOld); - assert!(!scoring.should_replace(&txs[1], &txs[2])); - assert!(!scoring.should_replace(&txs[2], &txs[1])); + assert_eq!(scoring.should_replace(&txs[1], &txs[2]), RejectNew); + assert_eq!(scoring.should_replace(&txs[2], &txs[1]), RejectNew); - assert!(scoring.should_replace(&txs[1], &txs[3])); - assert!(!scoring.should_replace(&txs[3], &txs[1])); + assert_eq!(scoring.should_replace(&txs[1], &txs[3]), ReplaceOld); + assert_eq!(scoring.should_replace(&txs[3], &txs[1]), RejectNew); } #[test] @@ -246,14 +254,14 @@ mod tests { } }; - assert!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas)); - assert!(!scoring.should_replace(&tx_regular_high_gas, &tx_regular_low_gas)); + assert_eq!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas), ReplaceOld); + assert_eq!(scoring.should_replace(&tx_regular_high_gas, &tx_regular_low_gas), RejectNew); - assert!(scoring.should_replace(&tx_regular_high_gas, &tx_local_low_gas)); - assert!(!scoring.should_replace(&tx_local_low_gas, &tx_regular_high_gas)); + assert_eq!(scoring.should_replace(&tx_regular_high_gas, &tx_local_low_gas), ReplaceOld); + assert_eq!(scoring.should_replace(&tx_local_low_gas, &tx_regular_high_gas), RejectNew); - assert!(scoring.should_replace(&tx_local_low_gas, &tx_local_high_gas)); - assert!(!scoring.should_replace(&tx_local_high_gas, &tx_regular_low_gas)); + assert_eq!(scoring.should_replace(&tx_local_low_gas, &tx_local_high_gas), InsertNew); + assert_eq!(scoring.should_replace(&tx_local_high_gas, &tx_regular_low_gas), RejectNew); } #[test] @@ -277,35 +285,35 @@ mod tests { // No update required let mut scores = initial_scores.clone(); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(0)); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(1)); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(2)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(0)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(1)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(2)); assert_eq!(scores, initial_scores); let mut scores = initial_scores.clone(); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(0)); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(1)); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(2)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(0)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(1)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(2)); assert_eq!(scores, initial_scores); // Compute score at given index let mut scores = initial_scores.clone(); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(0)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(0)); assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(1)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(1)); assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(2)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(2)); assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]); let mut scores = initial_scores.clone(); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(0)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(0)); assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(1)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(1)); assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]); - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(2)); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(2)); assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]); // Check penalization - scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Event(())); + scoring.update_scores(&transactions, &mut *scores, scoring::Change::Event(())); assert_eq!(scores, vec![32768.into(), 128.into(), 0.into()]); } } diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index ab33c2ce3..a1a6ae461 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -41,7 +41,6 @@ fn new_queue() -> TransactionQueue { PrioritizationStrategy::GasPriceOnly, ) } - #[test] fn should_return_correct_nonces_when_dropped_because_of_limit() { // given @@ -63,8 +62,8 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() { let nonce = tx1.nonce; // when - let r1= txq.import(TestClient::new(), vec![tx1].local()); - let r2= txq.import(TestClient::new(), vec![tx2].local()); + let r1 = txq.import(TestClient::new(), vec![tx1].retracted()); + let r2 = txq.import(TestClient::new(), vec![tx2].retracted()); assert_eq!(r1, vec![Ok(())]); assert_eq!(r2, vec![Err(transaction::Error::LimitReached)]); assert_eq!(txq.status().status.transaction_count, 1); @@ -72,6 +71,58 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() { // then assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into())); + // when + let tx1 = Tx::gas_price(2).signed(); + let tx2 = Tx::gas_price(2).signed(); + let tx3 = Tx::gas_price(1).signed(); + let tx4 = Tx::gas_price(3).signed(); + let res = txq.import(TestClient::new(), vec![tx1, tx2].retracted()); + let res2 = txq.import(TestClient::new(), vec![tx3, tx4].retracted()); + + // then + assert_eq!(res, vec![Ok(()), Ok(())]); + assert_eq!(res2, vec![ + // The error here indicates reaching the limit + // and minimal effective gas price taken into account. + Err(transaction::Error::InsufficientGasPrice { minimal: 2.into(), got: 1.into() }), + Ok(()) + ]); + assert_eq!(txq.status().status.transaction_count, 3); + // First inserted transacton got dropped because of limit + assert_eq!(txq.next_nonce(TestClient::new(), &sender), None); +} + +#[test] +fn should_never_drop_local_transactions_from_different_senders() { + // given + let txq = TransactionQueue::new( + txpool::Options { + max_count: 3, + max_per_sender: 1, + max_mem_usage: 50 + }, + verifier::Options { + minimal_gas_price: 1.into(), + block_gas_limit: 1_000_000.into(), + tx_gas_limit: 1_000_000.into(), + }, + PrioritizationStrategy::GasPriceOnly, + ); + let (tx1, tx2) = Tx::gas_price(2).signed_pair(); + let sender = tx1.sender(); + let nonce = tx1.nonce; + + // when + let r1 = txq.import(TestClient::new(), vec![tx1].local()); + let r2 = txq.import(TestClient::new(), vec![tx2].local()); + assert_eq!(r1, vec![Ok(())]); + // max-per-sender is reached, that's ok. + assert_eq!(r2, vec![Err(transaction::Error::LimitReached)]); + assert_eq!(txq.status().status.transaction_count, 1); + + // then + assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into())); + // when let tx1 = Tx::gas_price(2).signed(); let tx2 = Tx::gas_price(2).signed(); @@ -82,10 +133,9 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() { // then assert_eq!(res, vec![Ok(()), Ok(())]); - assert_eq!(res2, vec![Err(transaction::Error::LimitReached), Ok(())]); - assert_eq!(txq.status().status.transaction_count, 3); - // First inserted transacton got dropped because of limit - assert_eq!(txq.next_nonce(TestClient::new(), &sender), None); + assert_eq!(res2, vec![Ok(()), Ok(())]); + assert_eq!(txq.status().status.transaction_count, 5); + assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into())); } #[test] diff --git a/transaction-pool/src/pool.rs b/transaction-pool/src/pool.rs index 67da1b1d4..6fa17e1b2 100644 --- a/transaction-pool/src/pool.rs +++ b/transaction-pool/src/pool.rs @@ -22,7 +22,7 @@ use error; use listener::{Listener, NoopListener}; use options::Options; use ready::{Ready, Readiness}; -use scoring::{Scoring, ScoreWithRef}; +use scoring::{self, Scoring, ScoreWithRef}; use status::{LightStatus, Status}; use transactions::{AddResult, Transactions}; @@ -139,7 +139,7 @@ impl Pool where ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash()))); self.insertion_id += 1; - let mut transaction = Transaction { + let transaction = Transaction { insertion_id: self.insertion_id, transaction: Arc::new(transaction), }; @@ -148,27 +148,32 @@ impl Pool where // Avoid using should_replace, but rather use scoring for that. { let remove_worst = |s: &mut Self, transaction| { - match s.remove_worst(&transaction) { + match s.remove_worst(transaction) { Err(err) => { - s.listener.rejected(&transaction, err.kind()); + s.listener.rejected(transaction, err.kind()); Err(err) }, - Ok(removed) => { - s.listener.dropped(&removed, Some(&transaction)); + Ok(None) => Ok(false), + Ok(Some(removed)) => { + s.listener.dropped(&removed, Some(transaction)); s.finalize_remove(removed.hash()); - Ok(transaction) + Ok(true) }, } }; while self.by_hash.len() + 1 > self.options.max_count { trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count); - transaction = remove_worst(self, transaction)?; + if !remove_worst(self, &transaction)? { + break; + } } while self.mem_usage + mem_usage > self.options.max_mem_usage { trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage); - transaction = remove_worst(self, transaction)?; + if !remove_worst(self, &transaction)? { + break; + } } } @@ -273,28 +278,38 @@ impl Pool where } /// Attempts to remove the worst transaction from the pool if it's worse than the given one. - fn remove_worst(&mut self, transaction: &Transaction) -> error::Result> { + /// + /// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not. + /// In such case we will accept the transaction even though it is going to exceed the limit. + fn remove_worst(&mut self, transaction: &Transaction) -> error::Result>> { let to_remove = match self.worst_transactions.iter().next_back() { // No elements to remove? and the pool is still full? None => { warn!("The pool is full but there are no transactions to remove."); return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), "unknown".into()).into()); }, - Some(old) => if self.scoring.should_replace(&old.transaction, transaction) { + Some(old) => match self.scoring.should_replace(&old.transaction, transaction) { + // We can't decide which of them should be removed, so accept both. + scoring::Choice::InsertNew => None, // New transaction is better than the worst one so we can replace it. - old.clone() - } else { + scoring::Choice::ReplaceOld => Some(old.clone()), // otherwise fail - return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into()) + scoring::Choice::RejectNew => { + return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into()) + }, }, }; - // Remove from transaction set - self.remove_from_set(to_remove.transaction.sender(), |set, scoring| { - set.remove(&to_remove.transaction, scoring) - }); + if let Some(to_remove) = to_remove { + // Remove from transaction set + self.remove_from_set(to_remove.transaction.sender(), |set, scoring| { + set.remove(&to_remove.transaction, scoring) + }); - Ok(to_remove.transaction) + Ok(Some(to_remove.transaction)) + } else { + Ok(None) + } } /// Removes transaction from sender's transaction `HashMap`. diff --git a/transaction-pool/src/scoring.rs b/transaction-pool/src/scoring.rs index 462b70865..390e016af 100644 --- a/transaction-pool/src/scoring.rs +++ b/transaction-pool/src/scoring.rs @@ -99,7 +99,9 @@ pub trait Scoring: fmt::Debug { fn update_scores(&self, txs: &[Transaction], scores: &mut [Self::Score], change: Change); /// Decides if `new` should push out `old` transaction from the pool. - fn should_replace(&self, old: &T, new: &T) -> bool; + /// + /// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits. + fn should_replace(&self, old: &T, new: &T) -> Choice; } /// A score with a reference to the transaction. diff --git a/transaction-pool/src/tests/helpers.rs b/transaction-pool/src/tests/helpers.rs index b71959b08..76a81eb9a 100644 --- a/transaction-pool/src/tests/helpers.rs +++ b/transaction-pool/src/tests/helpers.rs @@ -22,7 +22,17 @@ use {pool, scoring, Scoring, Ready, Readiness}; use super::Transaction; #[derive(Debug, Default)] -pub struct DummyScoring; +pub struct DummyScoring { + always_insert: bool, +} + +impl DummyScoring { + pub fn always_insert() -> Self { + DummyScoring { + always_insert: true, + } + } +} impl Scoring for DummyScoring { type Score = U256; @@ -58,8 +68,14 @@ impl Scoring for DummyScoring { } } - fn should_replace(&self, old: &Transaction, new: &Transaction) -> bool { - new.gas_price > old.gas_price + fn should_replace(&self, old: &Transaction, new: &Transaction) -> scoring::Choice { + if self.always_insert { + scoring::Choice::InsertNew + } else if new.gas_price > old.gas_price { + scoring::Choice::ReplaceOld + } else { + scoring::Choice::RejectNew + } } } diff --git a/transaction-pool/src/tests/mod.rs b/transaction-pool/src/tests/mod.rs index 808f804cc..0029d0622 100644 --- a/transaction-pool/src/tests/mod.rs +++ b/transaction-pool/src/tests/mod.rs @@ -537,6 +537,60 @@ fn should_return_is_full() { assert!(txq.is_full()); } +#[test] +fn should_import_even_if_limit_is_reached_and_should_replace_returns_insert_new() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_scoring(DummyScoring::always_insert(), Options { + max_count: 1, + ..Default::default() + }); + txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + assert_eq!(txq.light_status(), LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + }); + + // when + txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap(); + + // then + assert_eq!(txq.light_status(), LightStatus { + transaction_count: 2, + senders: 1, + mem_usage: 0, + }); +} + +#[test] +fn should_not_import_even_if_limit_is_reached_and_should_replace_returns_false() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_scoring(DummyScoring::default(), Options { + max_count: 1, + ..Default::default() + }); + txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + assert_eq!(txq.light_status(), LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + }); + + // when + let err = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap_err(); + + // then + assert_eq!(err.kind(), + &error::ErrorKind::TooCheapToEnter("0x00000000000000000000000000000000000000000000000000000000000001f5".into(), "0x5".into())); + assert_eq!(txq.light_status(), LightStatus { + transaction_count: 1, + senders: 1, + mem_usage: 0, + }); +} + mod listener { use std::cell::RefCell; use std::rc::Rc; @@ -577,7 +631,7 @@ mod listener { let b = TransactionBuilder::default(); let listener = MyListener::default(); let results = listener.0.clone(); - let mut txq = Pool::new(listener, DummyScoring, Options { + let mut txq = Pool::new(listener, DummyScoring::default(), Options { max_per_sender: 1, max_count: 2, ..Default::default() @@ -615,7 +669,7 @@ mod listener { let b = TransactionBuilder::default(); let listener = MyListener::default(); let results = listener.0.clone(); - let mut txq = Pool::new(listener, DummyScoring, Options::default()); + let mut txq = Pool::new(listener, DummyScoring::default(), Options::default()); // insert let tx1 = txq.import(b.tx().nonce(1).new()).unwrap(); @@ -634,7 +688,7 @@ mod listener { let b = TransactionBuilder::default(); let listener = MyListener::default(); let results = listener.0.clone(); - let mut txq = Pool::new(listener, DummyScoring, Options::default()); + let mut txq = Pool::new(listener, DummyScoring::default(), Options::default()); // insert txq.import(b.tx().nonce(1).new()).unwrap(); @@ -652,7 +706,7 @@ mod listener { let b = TransactionBuilder::default(); let listener = MyListener::default(); let results = listener.0.clone(); - let mut txq = Pool::new(listener, DummyScoring, Options::default()); + let mut txq = Pool::new(listener, DummyScoring::default(), Options::default()); // insert txq.import(b.tx().nonce(1).new()).unwrap();