openethereum/ethcore/light/src/transaction_queue.rs

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

598 lines
19 KiB
Rust
Raw Normal View History

// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
2017-02-08 19:21:12 +01:00
// Parity Ethereum is free software: you can redistribute it and/or modify
2017-02-08 19:21:12 +01:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
2017-02-08 19:21:12 +01:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
2017-02-08 19:21:12 +01:00
//! Light Transaction Queue.
//!
//! Manages local transactions,
//! but stores all local transactions, removing only on invalidated nonce.
//!
//! Under the assumption that light nodes will have a relatively limited set of
//! accounts for which they create transactions, this queue is structured in an
//! address-wise manner.
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
2020-08-05 06:08:03 +02:00
fmt,
};
use common_types::transaction::{self, Condition, PendingTransaction, SignedTransaction};
use ethereum_types::{Address, H256, U256};
use fastmap::H256FastMap;
// Knowledge of an account's current nonce.
#[derive(Debug, Clone, PartialEq, Eq)]
enum CurrentNonce {
// Assumed current nonce.
Assumed(U256),
// Known current nonce.
Known(U256),
}
impl CurrentNonce {
// whether this nonce is assumed
fn is_assumed(&self) -> bool {
match *self {
CurrentNonce::Assumed(_) => true,
CurrentNonce::Known(_) => false,
}
}
2020-08-05 06:08:03 +02:00
// whether this nonce is known for certain from an external source.
fn is_known(&self) -> bool {
!self.is_assumed()
}
2020-08-05 06:08:03 +02:00
// the current nonce's value.
fn value(&self) -> &U256 {
match *self {
CurrentNonce::Assumed(ref val) => val,
CurrentNonce::Known(ref val) => val,
}
}
}
2017-02-14 12:12:26 +01:00
#[derive(Debug, Clone, PartialEq, Eq)]
struct TransactionInfo {
hash: H256,
nonce: U256,
condition: Option<Condition>,
}
impl<'a> From<&'a PendingTransaction> for TransactionInfo {
fn from(tx: &'a PendingTransaction) -> Self {
TransactionInfo {
hash: tx.hash(),
nonce: tx.nonce,
2017-02-14 12:12:26 +01:00
condition: tx.condition.clone(),
}
}
}
// transactions associated with a specific account.
#[derive(Debug, Clone, PartialEq, Eq)]
struct AccountTransactions {
// believed current nonce (gotten from initial given TX or `cull` calls).
cur_nonce: CurrentNonce,
2017-02-14 12:12:26 +01:00
current: Vec<TransactionInfo>, // ordered "current" transactions (cur_nonce onwards)
future: BTreeMap<U256, TransactionInfo>, // "future" transactions.
}
impl AccountTransactions {
fn is_empty(&self) -> bool {
self.current.is_empty() && self.future.is_empty()
}
2020-08-05 06:08:03 +02:00
fn next_nonce(&self) -> U256 {
self.current
.last()
.map(|last| last.nonce.saturating_add(1.into()))
2017-02-09 18:10:59 +01:00
.unwrap_or_else(|| *self.cur_nonce.value())
}
2020-08-05 06:08:03 +02:00
// attempt to move transactions from the future queue into the current queue.
fn adjust_future(&mut self) -> Vec<H256> {
let mut promoted = Vec::new();
let mut next_nonce = self.next_nonce();
2020-08-05 06:08:03 +02:00
while let Some(tx) = self.future.remove(&next_nonce) {
promoted.push(tx.hash);
self.current.push(tx);
next_nonce = next_nonce.saturating_add(1.into());
}
2020-08-05 06:08:03 +02:00
promoted
}
}
2017-02-08 19:21:12 +01:00
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
/// Transaction import result.
pub enum ImportDestination {
/// Transaction has been imported to the current queue.
///
/// It's going to be propagated to peers.
Current,
/// Transaction has been imported to future queue.
///
/// It means it won't be propagated until the gap is filled.
Future,
}
2020-07-29 10:36:15 +02:00
type Listener = Box<dyn Fn(&[H256]) + Send + Sync>;
2017-02-08 19:21:12 +01:00
/// Light transaction queue. See module docs for more details.
#[derive(Default)]
pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>,
listeners: Vec<Listener>,
}
impl fmt::Debug for TransactionQueue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionQueue")
.field("by_account", &self.by_account)
.field("by_hash", &self.by_hash)
.field("listeners", &self.listeners.len())
.finish()
}
}
2017-02-08 19:21:12 +01:00
impl TransactionQueue {
2017-02-09 19:17:37 +01:00
/// Import a pending transaction to be queued.
pub fn import(
&mut self,
tx: PendingTransaction,
) -> Result<ImportDestination, transaction::Error> {
let sender = tx.sender();
let hash = tx.hash();
let nonce = tx.nonce;
2017-02-14 12:12:26 +01:00
let tx_info = TransactionInfo::from(&tx);
2020-08-05 06:08:03 +02:00
if self.by_hash.contains_key(&hash) {
return Err(transaction::Error::AlreadyImported);
}
2020-08-05 06:08:03 +02:00
let (res, promoted) = match self.by_account.entry(sender) {
Entry::Vacant(entry) => {
entry.insert(AccountTransactions {
cur_nonce: CurrentNonce::Assumed(nonce),
2017-02-14 12:12:26 +01:00
current: vec![tx_info],
future: BTreeMap::new(),
});
2020-08-05 06:08:03 +02:00
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
(ImportDestination::Current, vec![hash])
}
Entry::Occupied(mut entry) => {
let acct_txs = entry.get_mut();
if nonce < *acct_txs.cur_nonce.value() {
// don't accept txs from before known current nonce.
2017-02-09 19:17:37 +01:00
if acct_txs.cur_nonce.is_known() {
return Err(transaction::Error::Old);
2017-02-09 19:17:37 +01:00
}
2020-08-05 06:08:03 +02:00
// lower our assumption until corrected later.
acct_txs.cur_nonce = CurrentNonce::Assumed(nonce);
}
2020-08-05 06:08:03 +02:00
match acct_txs.current.binary_search_by(|x| x.nonce.cmp(&nonce)) {
Ok(idx) => {
trace!(target: "txqueue", "Replacing existing transaction from {} with nonce {}",
sender, nonce);
2017-02-14 12:12:26 +01:00
let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info);
self.by_hash.remove(&old.hash);
2020-08-05 06:08:03 +02:00
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
(ImportDestination::Current, vec![hash])
}
Err(idx) => {
let cur_len = acct_txs.current.len();
let incr_nonce = nonce + 1;
2020-08-05 06:08:03 +02:00
// current is sorted with one tx per nonce,
// so if a tx with given nonce wasn't found that means it is either
// earlier in nonce than all other "current" transactions or later.
2017-02-14 12:05:24 +01:00
assert!(idx == 0 || idx == cur_len);
2020-08-05 06:08:03 +02:00
if idx == 0
&& acct_txs
.current
.first()
.map_or(false, |f| f.nonce != incr_nonce)
{
2017-02-14 12:12:26 +01:00
let old_cur = ::std::mem::replace(&mut acct_txs.current, vec![tx_info]);
2020-08-05 06:08:03 +02:00
trace!(target: "txqueue", "Moving {} transactions with nonce > {} to future",
old_cur.len(), incr_nonce);
for future in old_cur {
let future_nonce = future.nonce;
acct_txs.future.insert(future_nonce, future);
}
2020-08-05 06:08:03 +02:00
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
(ImportDestination::Current, vec![hash])
} else if idx == cur_len
&& acct_txs
.current
.last()
.map_or(false, |f| f.nonce + 1 != nonce)
{
trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce);
let future_nonce = nonce;
2017-02-14 12:12:26 +01:00
acct_txs.future.insert(future_nonce, tx_info);
2020-08-05 06:08:03 +02:00
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
(ImportDestination::Future, vec![])
} else {
trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce);
2020-08-05 06:08:03 +02:00
// insert, then check if we've filled any gaps.
2017-02-14 12:12:26 +01:00
acct_txs.current.insert(idx, tx_info);
let mut promoted = acct_txs.adjust_future();
promoted.insert(0, hash);
2020-08-05 06:08:03 +02:00
New Transaction Queue implementation (#8074) * Implementation of Verifier, Scoring and Ready. * Queue in progress. * TransactionPool. * Prepare for txpool release. * Miner refactor [WiP] * WiP reworking miner. * Make it compile. * Add some docs. * Split blockchain access to a separate file. * Work on miner API. * Fix ethcore tests. * Refactor miner interface for sealing/work packages. * Implement next nonce. * RPC compiles. * Implement couple of missing methdods for RPC. * Add transaction queue listeners. * Compiles! * Clean-up and parallelize. * Get rid of RefCell in header. * Revert "Get rid of RefCell in header." This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb. * Override Sync requirement. * Fix status display. * Unify logging. * Extract some cheap checks. * Measurements and optimizations. * Fix scoring bug, heap size of bug and add cache * Disable tx queueing and parallel verification. * Make ethcore and ethcore-miner compile again. * Make RPC compile again. * Bunch of txpool tests. * Migrate transaction queue tests. * Nonce Cap * Nonce cap cache and tests. * Remove stale future transactions from the queue. * Optimize scoring and write some tests. * Simple penalization. * Clean up and support for different scoring algorithms. * Add CLI parameters for the new queue. * Remove banning queue. * Disable debug build. * Change per_sender limit to be 1% instead of 5% * Avoid cloning when propagating transactions. * Remove old todo. * Post-review fixes. * Fix miner options default. * Implement back ready transactions for light client. * Get rid of from_pending_block * Pass rejection reason. * Add more details to drop. * Rollback heap size of. * Avoid cloning hashes when propagating and include more details on rejection. * Fix tests. * Introduce nonces cache. * Remove uneccessary hashes allocation. * Lower the mem limit. * Re-enable parallel verification. * Add miner log. Don't check the type if not below min_gas_price. * Add more traces, fix disabling miner. * Fix creating pending blocks twice on AuRa authorities. * Fix tests. * re-use pending blocks in AuRa * Use reseal_min_period to prevent too frequent update_sealing. * Fix log to contain hash not sender. * Optimize local transactions. * Fix aura tests. * Update locks comments. * Get rid of unsafe Sync impl. * Review fixes. * Remove excessive matches. * Fix compilation errors. * Use new pool in private transactions. * Fix private-tx test. * Fix secret store tests. * Actually use gas_floor_target * Fix config tests. * Fix pool tests. * Address grumbles.
2018-04-13 17:34:27 +02:00
(ImportDestination::Current, promoted)
}
}
}
}
2017-02-09 19:17:37 +01:00
};
2020-08-05 06:08:03 +02:00
self.by_hash.insert(hash, tx);
self.notify(&promoted);
2017-02-09 19:17:37 +01:00
Ok(res)
}
2020-08-05 06:08:03 +02:00
/// Get pending transaction by hash.
pub fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
self.by_hash.get(hash).map(|tx| (&**tx).clone())
2017-02-08 19:21:12 +01:00
}
2020-08-05 06:08:03 +02:00
2017-02-08 19:21:12 +01:00
/// Get the next nonce for a given address based on what's within the queue.
/// If the address has no queued transactions, then `None` will be returned
/// and the next nonce will have to be deduced via other means.
pub fn next_nonce(&self, address: &Address) -> Option<U256> {
self.by_account
.get(address)
.map(AccountTransactions::next_nonce)
2017-02-08 19:21:12 +01:00
}
2020-08-05 06:08:03 +02:00
/// Get all transactions ready to be propagated.
2017-02-08 19:21:12 +01:00
/// `best_block_number` and `best_block_timestamp` are used to filter out conditionally
/// propagated transactions.
2017-02-14 12:14:02 +01:00
///
/// Returned transactions are batched by sender, in order of ascending nonce.
pub fn ready_transactions(
&self,
best_block_number: u64,
best_block_timestamp: u64,
) -> Vec<PendingTransaction> {
2017-02-14 12:12:26 +01:00
self.by_account.values()
.flat_map(|acct_txs| {
acct_txs.current.iter().take_while(|tx| match tx.condition {
None => true,
Some(Condition::Number(blk_num)) => blk_num <= best_block_number,
Some(Condition::Timestamp(time)) => time <= best_block_timestamp,
}).map(|info| info.hash)
})
2017-02-14 19:16:46 +01:00
.filter_map(|hash| match self.by_hash.get(&hash) {
Some(tx) => Some(tx.clone()),
None => {
warn!(target: "txqueue", "Inconsistency detected between `by_hash` and `by_account`: {} not stored.",
hash);
None
}
})
2017-02-14 12:12:26 +01:00
.collect()
}
2020-08-05 06:08:03 +02:00
2017-02-17 21:38:43 +01:00
/// Get all transactions not ready to be propagated.
/// `best_block_number` and `best_block_timestamp` are used to filter out conditionally
/// propagated transactions.
///
/// Returned transactions are batched by sender, in order of ascending nonce.
pub fn future_transactions(
&self,
best_block_number: u64,
best_block_timestamp: u64,
) -> Vec<PendingTransaction> {
self.by_account.values()
.flat_map(|acct_txs| {
acct_txs.current.iter().skip_while(|tx| match tx.condition {
None => true,
Some(Condition::Number(blk_num)) => blk_num <= best_block_number,
Some(Condition::Timestamp(time)) => time <= best_block_timestamp,
}).chain(acct_txs.future.values()).map(|info| info.hash)
})
.filter_map(|hash| match self.by_hash.get(&hash) {
Some(tx) => Some(tx.clone()),
None => {
warn!(target: "txqueue", "Inconsistency detected between `by_hash` and `by_account`: {} not stored.",
hash);
None
}
})
.collect()
}
2020-08-05 06:08:03 +02:00
/// Addresses for which we store transactions.
pub fn queued_senders(&self) -> Vec<Address> {
self.by_account.keys().cloned().collect()
2017-02-08 19:21:12 +01:00
}
2020-08-05 06:08:03 +02:00
2017-02-08 19:21:12 +01:00
/// Cull out all transactions by the given address which are invalidated by the given nonce.
pub fn cull(&mut self, address: Address, cur_nonce: U256) {
let mut removed_hashes = vec![];
if let Entry::Occupied(mut entry) = self.by_account.entry(address) {
{
let acct_txs = entry.get_mut();
acct_txs.cur_nonce = CurrentNonce::Known(cur_nonce);
2020-08-05 06:08:03 +02:00
// cull old "future" keys.
let old_future: Vec<_> = acct_txs
.future
.keys()
.take_while(|&&k| k < cur_nonce)
.cloned()
.collect();
2020-08-05 06:08:03 +02:00
for old in old_future {
let hash = acct_txs
.future
.remove(&old)
.expect("key extracted from keys iterator; known to exist; qed")
2017-02-14 12:12:26 +01:00
.hash;
removed_hashes.push(hash);
}
2020-08-05 06:08:03 +02:00
// then cull from "current".
let valid_pos = acct_txs.current.iter().position(|tx| tx.nonce >= cur_nonce);
match valid_pos {
2017-02-14 12:12:26 +01:00
None => removed_hashes.extend(acct_txs.current.drain(..).map(|tx| tx.hash)),
Some(valid) => {
2017-02-14 12:12:26 +01:00
removed_hashes.extend(acct_txs.current.drain(..valid).map(|tx| tx.hash))
}
2020-08-05 06:08:03 +02:00
}
// now try and move stuff out of future into current.
acct_txs.adjust_future();
}
2020-08-05 06:08:03 +02:00
if entry.get_mut().is_empty() {
trace!(target: "txqueue", "No more queued transactions for {} after nonce {}",
address, cur_nonce);
entry.remove();
}
}
trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})",
removed_hashes.len(), address, cur_nonce);
for hash in removed_hashes {
self.by_hash.remove(&hash);
}
2017-02-08 19:21:12 +01:00
}
2020-08-05 06:08:03 +02:00
2017-10-24 07:09:48 +02:00
/// Get a transaction by hash.
pub fn get(&self, hash: &H256) -> Option<&PendingTransaction> {
self.by_hash.get(&hash)
}
2020-08-05 06:08:03 +02:00
/// Add a transaction queue listener.
pub fn add_listener(&mut self, f: Listener) {
self.listeners.push(f);
}
2020-08-05 06:08:03 +02:00
/// Notifies all listeners about new pending transaction.
fn notify(&self, hashes: &[H256]) {
for listener in &self.listeners {
listener(hashes)
}
}
2017-02-08 19:21:12 +01:00
}
#[cfg(test)]
mod tests {
2017-02-09 18:10:59 +01:00
use super::TransactionQueue;
use common_types::transaction::{Condition, PendingTransaction, Transaction};
use ethereum_types::Address;
2017-02-09 18:10:59 +01:00
#[test]
fn queued_senders() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
let tx = Transaction::default().fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(tx.into()).unwrap();
2017-02-09 18:10:59 +01:00
assert_eq!(txq.queued_senders(), vec![sender]);
txq.cull(sender, 1.into());
assert_eq!(txq.queued_senders(), vec![]);
assert!(txq.by_hash.is_empty());
}
#[test]
fn next_nonce() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in (0..5).chain(10..15) {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(tx.into()).unwrap();
2017-02-09 18:10:59 +01:00
}
// current: 0..5, future: 10..15
assert_eq!(txq.ready_transactions(0, 0).len(), 5);
assert_eq!(txq.next_nonce(&sender).unwrap(), 5.into());
txq.cull(sender, 8.into());
// current: empty, future: 10..15
assert_eq!(txq.ready_transactions(0, 0).len(), 0);
assert_eq!(txq.next_nonce(&sender).unwrap(), 8.into());
txq.cull(sender, 10.into());
// current: 10..15, future: empty
assert_eq!(txq.ready_transactions(0, 0).len(), 5);
assert_eq!(txq.next_nonce(&sender).unwrap(), 15.into());
}
#[test]
fn current_to_future() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in 5..10 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(tx.into()).unwrap();
2017-02-09 18:10:59 +01:00
}
assert_eq!(txq.ready_transactions(0, 0).len(), 5);
assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into());
for i in 0..3 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(tx.into()).unwrap();
2017-02-09 18:10:59 +01:00
}
assert_eq!(txq.ready_transactions(0, 0).len(), 3);
assert_eq!(txq.next_nonce(&sender).unwrap(), 3.into());
for i in 3..5 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(tx.into()).unwrap();
2017-02-09 18:10:59 +01:00
}
assert_eq!(txq.ready_transactions(0, 0).len(), 10);
assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into());
}
#[test]
fn conditional() {
let mut txq = TransactionQueue::default();
let sender = Address::default();
for i in 0..5 {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(match i {
2017-02-09 18:10:59 +01:00
3 => PendingTransaction::new(tx, Some(Condition::Number(100))),
4 => PendingTransaction::new(tx, Some(Condition::Timestamp(1234))),
_ => tx.into(),
2017-02-09 19:17:37 +01:00
})
.unwrap();
2017-02-09 18:10:59 +01:00
}
assert_eq!(txq.ready_transactions(0, 0).len(), 3);
assert_eq!(txq.ready_transactions(0, 1234).len(), 3);
assert_eq!(txq.ready_transactions(100, 0).len(), 4);
assert_eq!(txq.ready_transactions(100, 1234).len(), 5);
}
#[test]
fn cull_from_future() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in (0..1).chain(3..10) {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
2017-02-09 19:17:37 +01:00
txq.import(tx.into()).unwrap();
2017-02-09 18:10:59 +01:00
}
txq.cull(sender, 6.into());
assert_eq!(txq.ready_transactions(0, 0).len(), 4);
assert_eq!(txq.next_nonce(&sender).unwrap(), 10.into());
}
2017-02-09 19:17:37 +01:00
#[test]
fn import_old() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
let mut tx_a = Transaction::default();
tx_a.nonce = 3.into();
let mut tx_b = Transaction::default();
tx_b.nonce = 2.into();
txq.import(tx_a.fake_sign(sender).into()).unwrap();
txq.cull(sender, 3.into());
assert!(txq.import(tx_b.fake_sign(sender).into()).is_err())
}
2017-02-14 12:05:24 +01:00
#[test]
fn replace_is_removed() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
let tx_b: PendingTransaction = Transaction::default().fake_sign(sender).into();
let tx_a: PendingTransaction = {
let mut tx_a = Transaction::default();
tx_a.gas_price = tx_b.gas_price + 1;
2017-02-14 12:05:24 +01:00
tx_a.fake_sign(sender).into()
};
let hash = tx_a.hash();
txq.import(tx_a).unwrap();
txq.import(tx_b).unwrap();
assert!(txq.transaction(&hash).is_none());
}
2017-02-17 21:38:43 +01:00
#[test]
fn future_transactions() {
let sender = Address::default();
let mut txq = TransactionQueue::default();
for i in (0..1).chain(3..10) {
let mut tx = Transaction::default();
tx.nonce = i.into();
let tx = tx.fake_sign(sender);
txq.import(tx.into()).unwrap();
}
assert_eq!(txq.future_transactions(0, 0).len(), 7);
assert_eq!(txq.next_nonce(&sender).unwrap(), 1.into());
}
}