Merge pull request #3772 from ethcore/txqueue-gc
Removing all old entries from transaction queue
This commit is contained in:
commit
aa30619b6f
@ -1092,20 +1092,6 @@ impl MinerService for Miner {
|
|||||||
fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
|
fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
|
||||||
trace!(target: "miner", "chain_new_blocks");
|
trace!(target: "miner", "chain_new_blocks");
|
||||||
|
|
||||||
fn fetch_transactions(chain: &MiningBlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
|
||||||
let block = chain
|
|
||||||
.block(BlockId::Hash(*hash))
|
|
||||||
// Client should send message after commit to db and inserting to chain.
|
|
||||||
.expect("Expected in-chain blocks.");
|
|
||||||
let block = BlockView::new(&block);
|
|
||||||
let txs = block.transactions();
|
|
||||||
// populate sender
|
|
||||||
for tx in &txs {
|
|
||||||
let _sender = tx.sender();
|
|
||||||
}
|
|
||||||
txs
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions
|
// 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions
|
||||||
// should be still available in the queue.
|
// should be still available in the queue.
|
||||||
// 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that
|
// 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that
|
||||||
@ -1116,10 +1102,18 @@ impl MinerService for Miner {
|
|||||||
|
|
||||||
// Then import all transactions...
|
// Then import all transactions...
|
||||||
{
|
{
|
||||||
let out_of_chain = retracted
|
retracted.par_iter()
|
||||||
.par_iter()
|
.map(|hash| {
|
||||||
.map(|h| fetch_transactions(chain, h));
|
let block = chain.block(BlockId::Hash(*hash))
|
||||||
out_of_chain.for_each(|txs| {
|
.expect("Client is sending message after commit to db and inserting to chain; the block is available; qed");
|
||||||
|
let block = BlockView::new(&block);
|
||||||
|
let txs = block.transactions();
|
||||||
|
// populate sender
|
||||||
|
for tx in &txs {
|
||||||
|
let _sender = tx.sender();
|
||||||
|
}
|
||||||
|
txs
|
||||||
|
}).for_each(|txs| {
|
||||||
let mut transaction_queue = self.transaction_queue.lock();
|
let mut transaction_queue = self.transaction_queue.lock();
|
||||||
let _ = self.add_transactions_to_queue(
|
let _ = self.add_transactions_to_queue(
|
||||||
chain, txs, TransactionOrigin::RetractedBlock, &mut transaction_queue
|
chain, txs, TransactionOrigin::RetractedBlock, &mut transaction_queue
|
||||||
@ -1127,24 +1121,10 @@ impl MinerService for Miner {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// ...and at the end remove old ones
|
// ...and at the end remove the old ones
|
||||||
{
|
{
|
||||||
let in_chain = enacted
|
|
||||||
.par_iter()
|
|
||||||
.map(|h: &H256| fetch_transactions(chain, h));
|
|
||||||
|
|
||||||
in_chain.for_each(|mut txs| {
|
|
||||||
let mut transaction_queue = self.transaction_queue.lock();
|
let mut transaction_queue = self.transaction_queue.lock();
|
||||||
|
transaction_queue.remove_old(|sender| chain.latest_nonce(sender));
|
||||||
let to_remove = txs.drain(..)
|
|
||||||
.map(|tx| {
|
|
||||||
tx.sender().expect("Transaction is in block, so sender has to be defined.")
|
|
||||||
})
|
|
||||||
.collect::<HashSet<Address>>();
|
|
||||||
for sender in to_remove {
|
|
||||||
transaction_queue.remove_all(sender, chain.latest_nonce(&sender));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if enacted.len() > 0 {
|
if enacted.len() > 0 {
|
||||||
|
@ -81,6 +81,8 @@
|
|||||||
//! 3. `remove_all` is used to inform the queue about client (state) nonce changes.
|
//! 3. `remove_all` is used to inform the queue about client (state) nonce changes.
|
||||||
//! - It removes all transactions (either from `current` or `future`) with nonce < client nonce
|
//! - It removes all transactions (either from `current` or `future`) with nonce < client nonce
|
||||||
//! - It moves matching `future` transactions to `current`
|
//! - It moves matching `future` transactions to `current`
|
||||||
|
//! 4. `remove_old` is used as convenient method to update the state nonce for all senders in the queue.
|
||||||
|
//! - Invokes `remove_all` with latest state nonce for all senders.
|
||||||
|
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
@ -752,6 +754,26 @@ impl TransactionQueue {
|
|||||||
/// Removes all transactions from particular sender up to (excluding) given client (state) nonce.
|
/// Removes all transactions from particular sender up to (excluding) given client (state) nonce.
|
||||||
/// Client (State) Nonce = next valid nonce for this sender.
|
/// Client (State) Nonce = next valid nonce for this sender.
|
||||||
pub fn remove_all(&mut self, sender: Address, client_nonce: U256) {
|
pub fn remove_all(&mut self, sender: Address, client_nonce: U256) {
|
||||||
|
// Check if there is anything in current...
|
||||||
|
let should_check_in_current = self.current.by_address.row(&sender)
|
||||||
|
// If nonce == client_nonce nothing is changed
|
||||||
|
.and_then(|by_nonce| by_nonce.keys().find(|nonce| *nonce < &client_nonce))
|
||||||
|
.map(|_| ());
|
||||||
|
// ... or future
|
||||||
|
let should_check_in_future = self.future.by_address.row(&sender)
|
||||||
|
// if nonce == client_nonce we need to promote to current
|
||||||
|
.and_then(|by_nonce| by_nonce.keys().find(|nonce| *nonce <= &client_nonce))
|
||||||
|
.map(|_| ());
|
||||||
|
|
||||||
|
if should_check_in_current.or(should_check_in_future).is_none() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.remove_all_internal(sender, client_nonce);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Always updates future and moves transactions from current to future.
|
||||||
|
fn remove_all_internal(&mut self, sender: Address, client_nonce: U256) {
|
||||||
// We will either move transaction to future or remove it completely
|
// We will either move transaction to future or remove it completely
|
||||||
// so there will be no transactions from this sender in current
|
// so there will be no transactions from this sender in current
|
||||||
self.last_nonces.remove(&sender);
|
self.last_nonces.remove(&sender);
|
||||||
@ -765,6 +787,20 @@ impl TransactionQueue {
|
|||||||
assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len());
|
assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks the current nonce for all transactions' senders in the queue and removes the old transactions.
|
||||||
|
pub fn remove_old<F>(&mut self, fetch_nonce: F) where
|
||||||
|
F: Fn(&Address) -> U256,
|
||||||
|
{
|
||||||
|
let senders = self.current.by_address.keys()
|
||||||
|
.chain(self.future.by_address.keys())
|
||||||
|
.cloned()
|
||||||
|
.collect::<HashSet<_>>();
|
||||||
|
|
||||||
|
for sender in senders {
|
||||||
|
self.remove_all(sender, fetch_nonce(&sender));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Penalize transactions from sender of transaction with given hash.
|
/// Penalize transactions from sender of transaction with given hash.
|
||||||
/// I.e. it should change the priority of the transaction in the queue.
|
/// I.e. it should change the priority of the transaction in the queue.
|
||||||
///
|
///
|
||||||
@ -847,7 +883,7 @@ impl TransactionQueue {
|
|||||||
if order.is_some() {
|
if order.is_some() {
|
||||||
// This will keep consistency in queue
|
// This will keep consistency in queue
|
||||||
// Moves all to future and then promotes a batch from current:
|
// Moves all to future and then promotes a batch from current:
|
||||||
self.remove_all(sender, current_nonce);
|
self.remove_all_internal(sender, current_nonce);
|
||||||
assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len());
|
assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -2438,7 +2474,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_reject_transactions_below_bas_gas() {
|
fn should_reject_transactions_below_base_gas() {
|
||||||
// given
|
// given
|
||||||
let mut txq = TransactionQueue::default();
|
let mut txq = TransactionQueue::default();
|
||||||
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
|
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
|
||||||
@ -2457,4 +2493,26 @@ mod test {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_clear_all_old_transactions() {
|
||||||
|
// given
|
||||||
|
let mut txq = TransactionQueue::default();
|
||||||
|
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
|
||||||
|
let (tx3, tx4) = new_tx_pair_default(1.into(), 0.into());
|
||||||
|
let nonce1 = tx1.nonce;
|
||||||
|
|
||||||
|
// Insert all transactions
|
||||||
|
txq.add(tx1, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
|
||||||
|
txq.add(tx2, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
|
||||||
|
txq.add(tx3, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
|
||||||
|
txq.add(tx4, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap();
|
||||||
|
assert_eq!(txq.top_transactions().len(), 4);
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.remove_old(|_| nonce1 + U256::one());
|
||||||
|
|
||||||
|
// then
|
||||||
|
assert_eq!(txq.top_transactions().len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::collections::hash_map::Keys;
|
||||||
|
|
||||||
/// Structure to hold double-indexed values
|
/// Structure to hold double-indexed values
|
||||||
///
|
///
|
||||||
@ -41,6 +42,11 @@ impl<Row, Col, Val> Table<Row, Col, Val>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns keys iterator for this Table.
|
||||||
|
pub fn keys(&self) -> Keys<Row, HashMap<Col, Val>> {
|
||||||
|
self.map.keys()
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes all elements from this Table
|
/// Removes all elements from this Table
|
||||||
pub fn clear(&mut self) {
|
pub fn clear(&mut self) {
|
||||||
self.map.clear();
|
self.map.clear();
|
||||||
|
Loading…
Reference in New Issue
Block a user