Transaction queue improvements
This commit is contained in:
@@ -564,7 +564,7 @@ impl Miner {
|
||||
prepare_new
|
||||
}
|
||||
|
||||
fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut BanningTransactionQueue) ->
|
||||
fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, default_origin: TransactionOrigin, transaction_queue: &mut BanningTransactionQueue) ->
|
||||
Vec<Result<TransactionImportResult, Error>> {
|
||||
|
||||
let fetch_account = |a: &Address| AccountDetails {
|
||||
@@ -572,15 +572,28 @@ impl Miner {
|
||||
balance: chain.latest_balance(a),
|
||||
};
|
||||
|
||||
let accounts = self.accounts.as_ref()
|
||||
.and_then(|provider| provider.accounts().ok())
|
||||
.map(|accounts| accounts.into_iter().collect::<HashSet<_>>());
|
||||
|
||||
let schedule = chain.latest_schedule();
|
||||
let gas_required = |tx: &SignedTransaction| tx.gas_required(&schedule).into();
|
||||
transactions.into_iter()
|
||||
.map(|tx| match origin {
|
||||
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => {
|
||||
transaction_queue.add(tx, origin, &fetch_account, &gas_required)
|
||||
},
|
||||
TransactionOrigin::External => {
|
||||
transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required)
|
||||
.map(|tx| {
|
||||
let origin = accounts.as_ref().and_then(|accounts| {
|
||||
tx.sender().ok().and_then(|sender| match accounts.contains(&sender) {
|
||||
true => Some(TransactionOrigin::Local),
|
||||
false => None,
|
||||
})
|
||||
}).unwrap_or(default_origin);
|
||||
|
||||
match origin {
|
||||
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => {
|
||||
transaction_queue.add(tx, origin, &fetch_account, &gas_required)
|
||||
},
|
||||
TransactionOrigin::External => {
|
||||
transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required)
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
|
||||
@@ -250,6 +250,7 @@ impl Ord for TransactionOrder {
|
||||
}
|
||||
|
||||
/// Verified transaction (with sender)
|
||||
#[derive(Debug)]
|
||||
struct VerifiedTransaction {
|
||||
/// Transaction
|
||||
transaction: SignedTransaction,
|
||||
@@ -637,7 +638,11 @@ impl TransactionQueue {
|
||||
self.local_transactions.mark_future(hash);
|
||||
},
|
||||
Err(Error::Transaction(ref err)) => {
|
||||
self.local_transactions.mark_rejected(cloned_tx, err.clone());
|
||||
// Sometimes transactions are re-imported, so
|
||||
// don't overwrite transactions if they are already on the list
|
||||
if !self.local_transactions.contains(&cloned_tx.hash()) {
|
||||
self.local_transactions.mark_rejected(cloned_tx, err.clone());
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
self.local_transactions.mark_invalid(cloned_tx);
|
||||
@@ -772,6 +777,12 @@ impl TransactionQueue {
|
||||
None => return,
|
||||
Some(t) => t,
|
||||
};
|
||||
|
||||
// Never penalize local transactions
|
||||
if transaction.origin.is_local() {
|
||||
return;
|
||||
}
|
||||
|
||||
let sender = transaction.sender();
|
||||
|
||||
// Penalize all transactions from this sender
|
||||
@@ -843,6 +854,33 @@ impl TransactionQueue {
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks all transactions from particular sender as local transactions
|
||||
fn mark_transactions_local(&mut self, sender: &Address) {
|
||||
fn mark_local<F: FnMut(H256)>(sender: &Address, set: &mut TransactionSet, mut mark: F) {
|
||||
// Mark all transactions from this sender as local
|
||||
let nonces_from_sender = set.by_address.row(sender)
|
||||
.map(|row_map| {
|
||||
row_map.iter().filter_map(|(nonce, order)| if order.origin.is_local() {
|
||||
None
|
||||
} else {
|
||||
Some(*nonce)
|
||||
}).collect::<Vec<U256>>()
|
||||
})
|
||||
.unwrap_or_else(Vec::new);
|
||||
|
||||
for k in nonces_from_sender {
|
||||
let mut order = set.drop(sender, &k).expect("transaction known to be in self.current/self.future; qed");
|
||||
order.origin = TransactionOrigin::Local;
|
||||
mark(order.hash);
|
||||
set.insert(*sender, k, order);
|
||||
}
|
||||
}
|
||||
|
||||
let local = &mut self.local_transactions;
|
||||
mark_local(sender, &mut self.current, |hash| local.mark_pending(hash));
|
||||
mark_local(sender, &mut self.future, |hash| local.mark_future(hash));
|
||||
}
|
||||
|
||||
/// Update height of all transactions in future transactions set.
|
||||
fn update_future(&mut self, sender: &Address, current_nonce: U256) {
|
||||
// We need to drain all transactions for current sender from future and reinsert them with updated height
|
||||
@@ -1022,6 +1060,10 @@ impl TransactionQueue {
|
||||
.cloned()
|
||||
.map_or(state_nonce, |n| n + U256::one());
|
||||
|
||||
if tx.origin.is_local() {
|
||||
self.mark_transactions_local(&address);
|
||||
}
|
||||
|
||||
// Future transaction
|
||||
if nonce > next_nonce {
|
||||
// We have a gap - put to future.
|
||||
@@ -1099,6 +1141,7 @@ impl TransactionQueue {
|
||||
let old_hash = by_hash.insert(hash, tx);
|
||||
assert!(old_hash.is_none(), "Each hash has to be inserted exactly once.");
|
||||
|
||||
trace!(target: "txqueue", "Inserting: {:?}", order);
|
||||
|
||||
if let Some(old) = set.insert(address, nonce, order.clone()) {
|
||||
Self::replace_orders(address, nonce, old, order, set, by_hash, local)
|
||||
@@ -1726,6 +1769,31 @@ mod test {
|
||||
assert_eq!(top.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn when_importing_local_should_mark_others_from_the_same_sender_as_local() {
|
||||
// given
|
||||
let mut txq = TransactionQueue::default();
|
||||
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
|
||||
// the second one has same nonce but higher `gas_price`
|
||||
let (_, tx0) = new_similar_tx_pair();
|
||||
|
||||
txq.add(tx0.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
|
||||
txq.add(tx1.clone(), TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
|
||||
// the one with higher gas price is first
|
||||
assert_eq!(txq.top_transactions()[0], tx0);
|
||||
assert_eq!(txq.top_transactions()[1], tx1);
|
||||
|
||||
// when
|
||||
// insert second as local
|
||||
txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
|
||||
|
||||
// then
|
||||
// the order should be updated
|
||||
assert_eq!(txq.top_transactions()[0], tx1);
|
||||
assert_eq!(txq.top_transactions()[1], tx0);
|
||||
assert_eq!(txq.top_transactions()[2], tx2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_prioritize_reimported_transactions_within_same_nonce_height() {
|
||||
// given
|
||||
@@ -1793,6 +1861,38 @@ mod test {
|
||||
assert_eq!(top.len(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_not_penalize_local_transactions() {
|
||||
// given
|
||||
let mut txq = TransactionQueue::default();
|
||||
// txa, txb - slightly bigger gas price to have consistent ordering
|
||||
let (txa, txb) = new_tx_pair_default(1.into(), 0.into());
|
||||
let (tx1, tx2) = new_tx_pair_with_gas_price_increment(3.into());
|
||||
|
||||
// insert everything
|
||||
txq.add(txa.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
|
||||
txq.add(txb.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
|
||||
txq.add(tx1.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
|
||||
txq.add(tx2.clone(), TransactionOrigin::Local, &default_account_details, &gas_estimator).unwrap();
|
||||
|
||||
let top = txq.top_transactions();
|
||||
assert_eq!(top[0], tx1);
|
||||
assert_eq!(top[1], txa);
|
||||
assert_eq!(top[2], tx2);
|
||||
assert_eq!(top[3], txb);
|
||||
assert_eq!(top.len(), 4);
|
||||
|
||||
// when
|
||||
txq.penalize(&tx1.hash());
|
||||
|
||||
// then (order is the same)
|
||||
let top = txq.top_transactions();
|
||||
assert_eq!(top[0], tx1);
|
||||
assert_eq!(top[1], txa);
|
||||
assert_eq!(top[2], tx2);
|
||||
assert_eq!(top[3], txb);
|
||||
assert_eq!(top.len(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_penalize_transactions_from_sender() {
|
||||
|
||||
Reference in New Issue
Block a user