TransactionQueue improvements (#5917)

* Order by id instead of hash.

* Minimal gas price bump.

* Avoid to construct oversized transaction packets.

* Fix RPC.

* Never construct oversized transactions packet.

* Never construct oversized packets.
This commit is contained in:
Tomasz Drwięga 2017-06-30 11:57:48 +02:00 committed by Arkadiy Paronyan
parent a3e693d5c3
commit f22745eb0a
3 changed files with 134 additions and 44 deletions

View File

@ -165,7 +165,7 @@ struct TransactionOrder {
/// Gas usage priority factor. Usage depends on strategy. /// Gas usage priority factor. Usage depends on strategy.
/// Represents the linear increment in required gas price for heavy transactions. /// Represents the linear increment in required gas price for heavy transactions.
/// ///
/// High gas limit + Low gas price = Low priority /// High gas limit + Low gas price = Very Low priority
/// High gas limit + High gas price = High priority /// High gas limit + High gas price = High priority
gas_factor: U256, gas_factor: U256,
/// Gas (limit) of the transaction. Usage depends on strategy. /// Gas (limit) of the transaction. Usage depends on strategy.
@ -175,6 +175,8 @@ struct TransactionOrder {
strategy: PrioritizationStrategy, strategy: PrioritizationStrategy,
/// Hash to identify associated transaction /// Hash to identify associated transaction
hash: H256, hash: H256,
/// Incremental id assigned when transaction is inserted to the queue.
insertion_id: u64,
/// Origin of the transaction /// Origin of the transaction
origin: TransactionOrigin, origin: TransactionOrigin,
/// Penalties /// Penalties
@ -193,6 +195,7 @@ impl TransactionOrder {
gas_factor: factor, gas_factor: factor,
strategy: strategy, strategy: strategy,
hash: tx.hash(), hash: tx.hash(),
insertion_id: tx.insertion_id,
origin: tx.origin, origin: tx.origin,
penalties: 0, penalties: 0,
} }
@ -262,8 +265,8 @@ impl Ord for TransactionOrder {
return b.gas_price.cmp(&self.gas_price); return b.gas_price.cmp(&self.gas_price);
} }
// Compare hashes // Lastly compare insertion_id
self.hash.cmp(&b.hash) self.insertion_id.cmp(&b.insertion_id)
} }
} }
@ -274,19 +277,28 @@ struct VerifiedTransaction {
transaction: SignedTransaction, transaction: SignedTransaction,
/// Transaction origin. /// Transaction origin.
origin: TransactionOrigin, origin: TransactionOrigin,
/// Insertion time
insertion_time: QueuingInstant,
/// Delay until specified condition is met. /// Delay until specified condition is met.
condition: Option<Condition>, condition: Option<Condition>,
/// Insertion time
insertion_time: QueuingInstant,
/// ID assigned upon insertion, should be unique.
insertion_id: u64,
} }
impl VerifiedTransaction { impl VerifiedTransaction {
fn new(transaction: SignedTransaction, origin: TransactionOrigin, time: QueuingInstant, condition: Option<Condition>) -> Self { fn new(
transaction: SignedTransaction,
origin: TransactionOrigin,
condition: Option<Condition>,
insertion_time: QueuingInstant,
insertion_id: u64,
) -> Self {
VerifiedTransaction { VerifiedTransaction {
transaction: transaction, transaction,
origin: origin, origin,
insertion_time: time, condition,
condition: condition, insertion_time,
insertion_id,
} }
} }
@ -486,6 +498,9 @@ pub struct AccountDetails {
/// Transactions with `gas > (gas_limit + gas_limit * Factor(in percents))` are not imported to the queue. /// Transactions with `gas > (gas_limit + gas_limit * Factor(in percents))` are not imported to the queue.
const GAS_LIMIT_HYSTERESIS: usize = 200; // (100/GAS_LIMIT_HYSTERESIS) % const GAS_LIMIT_HYSTERESIS: usize = 200; // (100/GAS_LIMIT_HYSTERESIS) %
/// Transaction with the same (sender, nonce) can be replaced only if
/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT`
const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25%
/// Describes the strategy used to prioritize transactions in the queue. /// Describes the strategy used to prioritize transactions in the queue.
#[cfg_attr(feature="dev", allow(enum_variant_names))] #[cfg_attr(feature="dev", allow(enum_variant_names))]
@ -557,6 +572,8 @@ pub struct TransactionQueue {
last_nonces: HashMap<Address, U256>, last_nonces: HashMap<Address, U256>,
/// List of local transactions and their statuses. /// List of local transactions and their statuses.
local_transactions: LocalTransactionsList, local_transactions: LocalTransactionsList,
/// Next id that should be assigned to a transaction imported to the queue.
next_transaction_id: u64,
} }
impl Default for TransactionQueue { impl Default for TransactionQueue {
@ -600,6 +617,7 @@ impl TransactionQueue {
by_hash: HashMap::new(), by_hash: HashMap::new(),
last_nonces: HashMap::new(), last_nonces: HashMap::new(),
local_transactions: LocalTransactionsList::default(), local_transactions: LocalTransactionsList::default(),
next_transaction_id: 0,
} }
} }
@ -824,7 +842,9 @@ impl TransactionQueue {
} }
tx.check_low_s()?; tx.check_low_s()?;
// No invalid transactions beyond this point. // No invalid transactions beyond this point.
let vtx = VerifiedTransaction::new(tx, origin, time, condition); let id = self.next_transaction_id;
self.next_transaction_id += 1;
let vtx = VerifiedTransaction::new(tx, origin, condition, time, id);
let r = self.import_tx(vtx, client_account.nonce).map_err(Error::Transaction); let r = self.import_tx(vtx, client_account.nonce).map_err(Error::Transaction);
assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len()); assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len());
r r
@ -1352,16 +1372,19 @@ impl TransactionQueue {
// There was already transaction in queue. Let's check which one should stay // There was already transaction in queue. Let's check which one should stay
let old_hash = old.hash; let old_hash = old.hash;
let new_hash = order.hash; let new_hash = order.hash;
let old_fee = old.gas_price;
let new_fee = order.gas_price; let old_gas_price = old.gas_price;
if old_fee.cmp(&new_fee) == Ordering::Greater { let new_gas_price = order.gas_price;
let min_required_gas_price = old_gas_price + (old_gas_price >> GAS_PRICE_BUMP_SHIFT);
if min_required_gas_price > new_gas_price {
trace!(target: "txqueue", "Didn't insert transaction because gas price was too low: {:?} ({:?} stays in the queue)", order.hash, old.hash); trace!(target: "txqueue", "Didn't insert transaction because gas price was too low: {:?} ({:?} stays in the queue)", order.hash, old.hash);
// Put back old transaction since it has greater priority (higher gas_price) // Put back old transaction since it has greater priority (higher gas_price)
set.insert(address, nonce, old); set.insert(address, nonce, old);
// and remove new one // and remove new one
let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`."); let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`.");
if order.origin.is_local() { if order.origin.is_local() {
local.mark_replaced(order.transaction, old_fee, old_hash); local.mark_replaced(order.transaction, old_gas_price, old_hash);
} }
false false
} else { } else {
@ -1369,7 +1392,7 @@ impl TransactionQueue {
// Make sure we remove old transaction entirely // Make sure we remove old transaction entirely
let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`."); let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`.");
if old.origin.is_local() { if old.origin.is_local() {
local.mark_replaced(old.transaction, new_fee, new_hash); local.mark_replaced(old.transaction, new_gas_price, new_hash);
} }
true true
} }
@ -1611,12 +1634,12 @@ pub mod test {
gas_limit: !U256::zero(), gas_limit: !U256::zero(),
}; };
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into()); let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External, 0, None); let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External, None, 0, 0);
let tx2 = VerifiedTransaction::new(tx2, TransactionOrigin::External, 0, None); let tx2 = VerifiedTransaction::new(tx2, TransactionOrigin::External, None, 0, 1);
let mut by_hash = { let mut by_hash = {
let mut x = HashMap::new(); let mut x = HashMap::new();
let tx1 = VerifiedTransaction::new(tx1.transaction.clone(), TransactionOrigin::External, 0, None); let tx1 = VerifiedTransaction::new(tx1.transaction.clone(), TransactionOrigin::External, None, 0, 0);
let tx2 = VerifiedTransaction::new(tx2.transaction.clone(), TransactionOrigin::External, 0, None); let tx2 = VerifiedTransaction::new(tx2.transaction.clone(), TransactionOrigin::External, None, 0, 1);
x.insert(tx1.hash(), tx1); x.insert(tx1.hash(), tx1);
x.insert(tx2.hash(), tx2); x.insert(tx2.hash(), tx2);
x x
@ -1654,12 +1677,12 @@ pub mod test {
// Create two transactions with same nonce // Create two transactions with same nonce
// (same hash) // (same hash)
let (tx1, tx2) = new_tx_pair_default(0.into(), 0.into()); let (tx1, tx2) = new_tx_pair_default(0.into(), 0.into());
let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External, 0, None); let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External, None, 0, 0);
let tx2 = VerifiedTransaction::new(tx2, TransactionOrigin::External, 0, None); let tx2 = VerifiedTransaction::new(tx2, TransactionOrigin::External, None, 0, 1);
let by_hash = { let by_hash = {
let mut x = HashMap::new(); let mut x = HashMap::new();
let tx1 = VerifiedTransaction::new(tx1.transaction.clone(), TransactionOrigin::External, 0, None); let tx1 = VerifiedTransaction::new(tx1.transaction.clone(), TransactionOrigin::External, None, 0, 0);
let tx2 = VerifiedTransaction::new(tx2.transaction.clone(), TransactionOrigin::External, 0, None); let tx2 = VerifiedTransaction::new(tx2.transaction.clone(), TransactionOrigin::External, None, 0, 1);
x.insert(tx1.hash(), tx1); x.insert(tx1.hash(), tx1);
x.insert(tx2.hash(), tx2); x.insert(tx2.hash(), tx2);
x x
@ -1701,10 +1724,10 @@ pub mod test {
gas_limit: !U256::zero(), gas_limit: !U256::zero(),
}; };
let tx = new_tx_default(); let tx = new_tx_default();
let tx1 = VerifiedTransaction::new(tx.clone(), TransactionOrigin::External, 0, None); let tx1 = VerifiedTransaction::new(tx.clone(), TransactionOrigin::External, None, 0, 0);
let order1 = TransactionOrder::for_transaction(&tx1, 0.into(), 1.into(), PrioritizationStrategy::GasPriceOnly); let order1 = TransactionOrder::for_transaction(&tx1, 0.into(), 1.into(), PrioritizationStrategy::GasPriceOnly);
assert!(set.insert(tx1.sender(), tx1.nonce(), order1).is_none()); assert!(set.insert(tx1.sender(), tx1.nonce(), order1).is_none());
let tx2 = VerifiedTransaction::new(tx, TransactionOrigin::External, 0, None); let tx2 = VerifiedTransaction::new(tx, TransactionOrigin::External, None, 0, 1);
let order2 = TransactionOrder::for_transaction(&tx2, 0.into(), 1.into(), PrioritizationStrategy::GasPriceOnly); let order2 = TransactionOrder::for_transaction(&tx2, 0.into(), 1.into(), PrioritizationStrategy::GasPriceOnly);
assert!(set.insert(tx2.sender(), tx2.nonce(), order2).is_some()); assert!(set.insert(tx2.sender(), tx2.nonce(), order2).is_some());
} }
@ -1721,7 +1744,7 @@ pub mod test {
assert_eq!(set.gas_price_entry_limit(), 0.into()); assert_eq!(set.gas_price_entry_limit(), 0.into());
let tx = new_tx_default(); let tx = new_tx_default();
let tx1 = VerifiedTransaction::new(tx.clone(), TransactionOrigin::External, 0, None); let tx1 = VerifiedTransaction::new(tx.clone(), TransactionOrigin::External, None, 0, 0);
let order1 = TransactionOrder::for_transaction(&tx1, 0.into(), 1.into(), PrioritizationStrategy::GasPriceOnly); let order1 = TransactionOrder::for_transaction(&tx1, 0.into(), 1.into(), PrioritizationStrategy::GasPriceOnly);
assert!(set.insert(tx1.sender(), tx1.nonce(), order1.clone()).is_none()); assert!(set.insert(tx1.sender(), tx1.nonce(), order1.clone()).is_none());
assert_eq!(set.gas_price_entry_limit(), 2.into()); assert_eq!(set.gas_price_entry_limit(), 2.into());
@ -2473,6 +2496,32 @@ pub mod test {
assert_eq!(stats.pending, 2); assert_eq!(stats.pending, 2);
} }
#[test]
fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() {
use ethcore_logger::init_log;
init_log();
// given
let mut txq = TransactionQueue::default();
let keypair = Random.generate().unwrap();
let tx = new_unsigned_tx(123.into(), default_gas_val(), 20.into()).sign(keypair.secret(), None);
let tx2 = {
let mut tx2 = (**tx).clone();
tx2.gas_price = U256::from(21);
tx2.sign(keypair.secret(), None)
};
// when
txq.add(tx, TransactionOrigin::External, 0, None, &default_tx_provider()).unwrap();
let res = txq.add(tx2, TransactionOrigin::External, 0, None, &default_tx_provider());
// then
assert_eq!(unwrap_tx_err(res), TransactionError::TooCheapToReplace);
let stats = txq.status();
assert_eq!(stats.pending, 1);
assert_eq!(stats.future, 0);
assert_eq!(txq.top_transactions()[0].gas_price, U256::from(20));
}
#[test] #[test]
fn should_replace_same_transaction_when_has_higher_fee() { fn should_replace_same_transaction_when_has_higher_fee() {
use ethcore_logger::init_log; use ethcore_logger::init_log;
@ -2480,10 +2529,10 @@ pub mod test {
// given // given
let mut txq = TransactionQueue::default(); let mut txq = TransactionQueue::default();
let keypair = Random.generate().unwrap(); let keypair = Random.generate().unwrap();
let tx = new_unsigned_tx(123.into(), default_gas_val(), 1.into()).sign(keypair.secret(), None); let tx = new_unsigned_tx(123.into(), default_gas_val(), 10.into()).sign(keypair.secret(), None);
let tx2 = { let tx2 = {
let mut tx2 = (**tx).clone(); let mut tx2 = (**tx).clone();
tx2.gas_price = U256::from(200); tx2.gas_price = U256::from(20);
tx2.sign(keypair.secret(), None) tx2.sign(keypair.secret(), None)
}; };
@ -2495,7 +2544,7 @@ pub mod test {
let stats = txq.status(); let stats = txq.status();
assert_eq!(stats.pending, 1); assert_eq!(stats.pending, 1);
assert_eq!(stats.future, 0); assert_eq!(stats.future, 0);
assert_eq!(txq.top_transactions()[0].gas_price, U256::from(200)); assert_eq!(txq.top_transactions()[0].gas_price, U256::from(20));
} }
#[test] #[test]
@ -2815,6 +2864,24 @@ pub mod test {
assert_eq!(txq.top_transactions().len(), 1); assert_eq!(txq.top_transactions().len(), 1);
} }
#[test]
fn should_not_order_transactions_by_hash() {
// given
let secret1 = "0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap();
let secret2 = "0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap();
let tx1 = new_unsigned_tx(123.into(), default_gas_val(), 0.into()).sign(&secret1, None);
let tx2 = new_unsigned_tx(123.into(), default_gas_val(), 0.into()).sign(&secret2, None);
let mut txq = TransactionQueue::default();
// when
txq.add(tx1.clone(), TransactionOrigin::External, 0, None, &default_tx_provider()).unwrap();
txq.add(tx2, TransactionOrigin::External, 0, None, &default_tx_provider()).unwrap();
// then
assert_eq!(txq.top_transactions()[0], tx1);
assert_eq!(txq.top_transactions().len(), 2);
}
#[test] #[test]
fn should_not_return_transactions_over_nonce_cap() { fn should_not_return_transactions_over_nonce_cap() {
// given // given

View File

@ -126,6 +126,8 @@ const MAX_NEW_HASHES: usize = 64;
const MAX_TX_TO_IMPORT: usize = 512; const MAX_TX_TO_IMPORT: usize = 512;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20; const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
const MAX_TRANSACTION_SIZE: usize = 300*1024; const MAX_TRANSACTION_SIZE: usize = 300*1024;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
// Maximal number of transactions in sent in single packet. // Maximal number of transactions in sent in single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// Min number of blocks to be behind for a snapshot sync // Min number of blocks to be behind for a snapshot sync
@ -2044,7 +2046,7 @@ impl ChainSync {
// update stats // update stats
for hash in &all_transactions_hashes { for hash in &all_transactions_hashes {
let id = io.peer_session_info(peer_id).and_then(|info| info.id); let id = io.peer_session_info(peer_id).and_then(|info| info.id);
stats.propagated(*hash, id, block_number); stats.propagated(hash, id, block_number);
} }
peer_info.last_sent_transactions = all_transactions_hashes.clone(); peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone())); return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
@ -2060,14 +2062,35 @@ impl ChainSync {
} }
// Construct RLP // Construct RLP
let mut packet = RlpStream::new_list(to_send.len()); let (packet, to_send) = {
let mut to_send = to_send;
let mut packet = RlpStream::new();
packet.begin_unbounded_list();
let mut pushed = 0;
for tx in &transactions { for tx in &transactions {
if to_send.contains(&tx.transaction.hash()) { let hash = tx.transaction.hash();
packet.append(&tx.transaction); if to_send.contains(&hash) {
// update stats let mut transaction = RlpStream::new();
let id = io.peer_session_info(peer_id).and_then(|info| info.id); tx.transaction.rlp_append(&mut transaction);
stats.propagated(tx.transaction.hash(), id, block_number); let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
if !appended {
// Maximal packet size reached just proceed with sending
debug!("Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
to_send = to_send.into_iter().take(pushed).collect();
break;
} }
pushed += 1;
}
}
packet.complete_unbounded_list();
(packet, to_send)
};
// Update stats
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
for hash in &to_send {
// update stats
stats.propagated(hash, id, block_number);
} }
peer_info.last_sent_transactions = all_transactions_hashes peer_info.last_sent_transactions = all_transactions_hashes

View File

@ -56,9 +56,9 @@ pub struct TransactionsStats {
impl TransactionsStats { impl TransactionsStats {
/// Increases number of propagations to given `enodeid`. /// Increases number of propagations to given `enodeid`.
pub fn propagated(&mut self, hash: H256, enode_id: Option<NodeId>, current_block_num: BlockNumber) { pub fn propagated(&mut self, hash: &H256, enode_id: Option<NodeId>, current_block_num: BlockNumber) {
let enode_id = enode_id.unwrap_or_default(); let enode_id = enode_id.unwrap_or_default();
let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); let mut stats = self.pending_transactions.entry(*hash).or_insert_with(|| Stats::new(current_block_num));
let mut count = stats.propagated_to.entry(enode_id).or_insert(0); let mut count = stats.propagated_to.entry(enode_id).or_insert(0);
*count = count.saturating_add(1); *count = count.saturating_add(1);
} }
@ -101,9 +101,9 @@ mod tests {
let enodeid2 = 5.into(); let enodeid2 = 5.into();
// when // when
stats.propagated(hash, Some(enodeid1), 5); stats.propagated(&hash, Some(enodeid1), 5);
stats.propagated(hash, Some(enodeid1), 10); stats.propagated(&hash, Some(enodeid1), 10);
stats.propagated(hash, Some(enodeid2), 15); stats.propagated(&hash, Some(enodeid2), 15);
// then // then
let stats = stats.get(&hash); let stats = stats.get(&hash);
@ -122,7 +122,7 @@ mod tests {
let mut stats = TransactionsStats::default(); let mut stats = TransactionsStats::default();
let hash = 5.into(); let hash = 5.into();
let enodeid1 = 5.into(); let enodeid1 = 5.into();
stats.propagated(hash, Some(enodeid1), 10); stats.propagated(&hash, Some(enodeid1), 10);
// when // when
stats.retain(&HashSet::new()); stats.retain(&HashSet::new());