Limit the number of transactions in pending set (#8777)

* Unordered iterator.

* Use unordered and limited set if full not required.

* Split timeout work into smaller timers.

* Avoid collecting all pending transactions when mining

* Remove println.

* Use priority ordering in eth-filter.

* Fix ethcore-miner tests and tx propagation.

* Review grumbles addressed.

* Add test for unordered not populating the cache.

* Fix ethcore tests.

* Fix light tests.

* Fix ethcore-sync tests.

* Fix RPC tests.
This commit is contained in:
Tomasz Drwięga 2018-06-12 08:22:54 +02:00 committed by Tomasz Drwięga
parent 485acc5229
commit 54af59de02
No known key found for this signature in database
GPG Key ID: D066F497E62CAF66
25 changed files with 405 additions and 108 deletions

View File

@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
/// Max number of transactions in a single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// minimum interval between updates. // minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
@ -648,7 +651,7 @@ impl LightProtocol {
fn propagate_transactions(&self, io: &IoContext) { fn propagate_transactions(&self, io: &IoContext) {
if self.capabilities.read().tx_relay { return } if self.capabilities.read().tx_relay { return }
let ready_transactions = self.provider.ready_transactions(); let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE);
if ready_transactions.is_empty() { return } if ready_transactions.is_empty() { return }
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len()); trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());

View File

@ -173,8 +173,8 @@ impl Provider for TestProvider {
}) })
} }
fn ready_transactions(&self) -> Vec<PendingTransaction> { fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
self.0.client.ready_transactions() self.0.client.ready_transactions(max_len)
} }
} }

View File

@ -128,7 +128,7 @@ pub trait Provider: Send + Sync {
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>; fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;
/// Provide pending transactions. /// Provide pending transactions.
fn ready_transactions(&self) -> Vec<PendingTransaction>; fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction>;
/// Provide a proof-of-execution for the given transaction proof request. /// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction. /// Returns a vector of all state items necessary to execute the transaction.
@ -283,8 +283,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
.map(|(_, proof)| ::request::ExecutionResponse { items: proof }) .map(|(_, proof)| ::request::ExecutionResponse { items: proof })
} }
fn ready_transactions(&self) -> Vec<PendingTransaction> { fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self) BlockChainClient::ready_transactions(self, max_len)
.into_iter() .into_iter()
.map(|tx| tx.pending().clone()) .map(|tx| tx.pending().clone())
.collect() .collect()
@ -370,9 +370,12 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
None None
} }
fn ready_transactions(&self) -> Vec<PendingTransaction> { fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
let chain_info = self.chain_info(); let chain_info = self.chain_info();
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) let mut transactions = self.txqueue.read()
.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp);
transactions.truncate(max_len);
transactions
} }
} }

View File

@ -1893,8 +1893,8 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
} }
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> { fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self) self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority)
} }
fn signing_chain_id(&self) -> Option<u64> { fn signing_chain_id(&self) -> Option<u64> {

View File

@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry;
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome}; use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
use error::ImportResult; use error::ImportResult;
use vm::Schedule; use vm::Schedule;
use miner::{Miner, MinerService}; use miner::{self, Miner, MinerService};
use spec::Spec; use spec::Spec;
use types::basic_account::BasicAccount; use types::basic_account::BasicAccount;
use types::mode::Mode; use types::mode::Mode;
@ -806,8 +806,8 @@ impl BlockChainClient for TestBlockChainClient {
self.traces.read().clone() self.traces.read().clone()
} }
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> { fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self) self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority)
} }
fn signing_chain_id(&self) -> Option<u64> { None } fn signing_chain_id(&self) -> Option<u64> { None }

View File

@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
fn last_hashes(&self) -> LastHashes; fn last_hashes(&self) -> LastHashes;
/// List all transactions that are allowed into the next block. /// List all transactions that are allowed into the next block.
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>>; fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>>;
/// Sorted list of transaction gas prices from at least last sample_size blocks. /// Sorted list of transaction gas prices from at least last sample_size blocks.
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> { fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {

View File

@ -381,18 +381,28 @@ impl Miner {
let client = self.pool_client(chain); let client = self.pool_client(chain);
let engine_params = self.engine.params(); let engine_params = self.engine.params();
let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition { let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition {
Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into()) Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into())
} else { } else {
None None
}; };
// we will never need more transactions than limit divided by min gas
let max_transactions = if min_tx_gas.is_zero() {
usize::max_value()
} else {
(*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize
};
let pending: Vec<Arc<_>> = self.transaction_queue.pending( let pending: Vec<Arc<_>> = self.transaction_queue.pending(
client.clone(), client.clone(),
chain_info.best_block_number, pool::PendingSettings {
chain_info.best_block_timestamp, block_number: chain_info.best_block_number,
nonce_cap, current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len: max_transactions,
ordering: miner::PendingOrdering::Priority,
}
); );
let took_ms = |elapsed: &Duration| { let took_ms = |elapsed: &Duration| {
@ -884,7 +894,7 @@ impl miner::MinerService for Miner {
} }
} }
fn ready_transactions<C>(&self, chain: &C) fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
-> Vec<Arc<VerifiedTransaction>> -> Vec<Arc<VerifiedTransaction>>
where where
C: ChainInfo + Nonce + Sync, C: ChainInfo + Nonce + Sync,
@ -892,14 +902,20 @@ impl miner::MinerService for Miner {
let chain_info = chain.chain_info(); let chain_info = chain.chain_info();
let from_queue = || { let from_queue = || {
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
let nonce_cap = None;
self.transaction_queue.pending( self.transaction_queue.pending(
CachedNonceClient::new(chain, &self.nonce_cache), CachedNonceClient::new(chain, &self.nonce_cache),
chain_info.best_block_number, pool::PendingSettings {
chain_info.best_block_timestamp, block_number: chain_info.best_block_number,
// We propagate transactions over the nonce cap. current_timestamp: chain_info.best_block_timestamp,
// The mechanism is only to limit number of transactions in pending block nonce_cap,
// those transactions are valid and will just be ready to be included in next block. max_len,
None, ordering,
},
) )
}; };
@ -909,6 +925,7 @@ impl miner::MinerService for Miner {
.iter() .iter()
.map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone())) .map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone()))
.map(Arc::new) .map(Arc::new)
.take(max_len)
.collect() .collect()
}, chain_info.best_block_number) }, chain_info.best_block_number)
}; };
@ -1198,7 +1215,7 @@ mod tests {
use rustc_hex::FromHex; use rustc_hex::FromHex;
use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock}; use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock};
use miner::MinerService; use miner::{MinerService, PendingOrdering};
use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts}; use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts};
use transaction::{Transaction}; use transaction::{Transaction};
@ -1297,7 +1314,7 @@ mod tests {
assert_eq!(res.unwrap(), ()); assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1); assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1);
assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1); assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1);
assert_eq!(miner.ready_transactions(&client).len(), 1); assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
// This method will let us know if pending block was created (before calling that method) // This method will let us know if pending block was created (before calling that method)
assert!(!miner.prepare_pending_block(&client)); assert!(!miner.prepare_pending_block(&client));
} }
@ -1316,7 +1333,7 @@ mod tests {
assert_eq!(res.unwrap(), ()); assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None); assert_eq!(miner.pending_receipts(best_block), None);
assert_eq!(miner.ready_transactions(&client).len(), 1); assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
} }
#[test] #[test]
@ -1335,11 +1352,11 @@ mod tests {
assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None); assert_eq!(miner.pending_receipts(best_block), None);
// By default we use PendingSet::AlwaysSealing, so no transactions yet. // By default we use PendingSet::AlwaysSealing, so no transactions yet.
assert_eq!(miner.ready_transactions(&client).len(), 0); assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0);
// This method will let us know if pending block was created (before calling that method) // This method will let us know if pending block was created (before calling that method)
assert!(miner.prepare_pending_block(&client)); assert!(miner.prepare_pending_block(&client));
// After pending block is created we should see a transaction. // After pending block is created we should see a transaction.
assert_eq!(miner.ready_transactions(&client).len(), 1); assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
} }
#[test] #[test]

View File

@ -26,6 +26,7 @@ pub mod pool_client;
pub mod stratum; pub mod stratum;
pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams}; pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
pub use ethcore_miner::pool::PendingOrdering;
use std::sync::Arc; use std::sync::Arc;
use std::collections::{BTreeSet, BTreeMap}; use std::collections::{BTreeSet, BTreeMap};
@ -173,7 +174,9 @@ pub trait MinerService : Send + Sync {
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper). /// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
/// ///
/// Depending on the settings may look in transaction pool or only in pending block. /// Depending on the settings may look in transaction pool or only in pending block.
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>> /// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of
/// transactions.
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec<Arc<VerifiedTransaction>>
where C: ChainInfo + Nonce + Sync; where C: ChainInfo + Nonce + Sync;
/// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet). /// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet).

View File

@ -30,7 +30,7 @@ use test_helpers::{
use types::filter::Filter; use types::filter::Filter;
use ethereum_types::{U256, Address}; use ethereum_types::{U256, Address};
use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb_rocksdb::{Database, DatabaseConfig};
use miner::Miner; use miner::{Miner, PendingOrdering};
use spec::Spec; use spec::Spec;
use views::BlockView; use views::BlockView;
use ethkey::KeyPair; use ethkey::KeyPair;
@ -345,12 +345,12 @@ fn does_not_propagate_delayed_transactions() {
client.miner().import_own_transaction(&*client, tx0).unwrap(); client.miner().import_own_transaction(&*client, tx0).unwrap();
client.miner().import_own_transaction(&*client, tx1).unwrap(); client.miner().import_own_transaction(&*client, tx1).unwrap();
assert_eq!(0, client.ready_transactions().len()); assert_eq!(0, client.ready_transactions(10).len());
assert_eq!(0, client.miner().ready_transactions(&*client).len()); assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
push_blocks_to_client(&client, 53, 2, 2); push_blocks_to_client(&client, 53, 2, 2);
client.flush_queue(); client.flush_queue();
assert_eq!(2, client.ready_transactions().len()); assert_eq!(2, client.ready_transactions(10).len());
assert_eq!(2, client.miner().ready_transactions(&*client).len()); assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
} }
#[test] #[test]

View File

@ -357,6 +357,10 @@ impl SyncProvider for EthSync {
} }
} }
const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;
struct SyncProtocolHandler { struct SyncProtocolHandler {
/// Shared blockchain client. /// Shared blockchain client.
chain: Arc<BlockChainClient>, chain: Arc<BlockChainClient>,
@ -371,7 +375,9 @@ struct SyncProtocolHandler {
impl NetworkProtocolHandler for SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer"); io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
} }
} }
@ -396,12 +402,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
} }
} }
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
trace_time!("sync::timeout"); trace_time!("sync::timeout");
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
self.sync.write().maintain_peers(&mut io); match timer {
self.sync.write().maintain_sync(&mut io); PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
self.sync.write().propagate_new_transactions(&mut io); SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
_ => warn!("Unknown timer {} triggered.", timer),
}
} }
} }

View File

@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20; const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation). // maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
// Maximal number of transactions queried from miner to propagate.
// This set is used to diff with transactions known by the peer and
// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`.
const MAX_TRANSACTIONS_TO_QUERY: usize = 4096;
// Maximal number of transactions in sent in single packet. // Maximal number of transactions in sent in single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// Min number of blocks to be behind for a snapshot sync // Min number of blocks to be behind for a snapshot sync
@ -1144,7 +1148,7 @@ pub mod tests {
use super::{PeerInfo, PeerAsking}; use super::{PeerInfo, PeerAsking};
use ethcore::header::*; use ethcore::header::*;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo}; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
use ethcore::miner::MinerService; use ethcore::miner::{MinerService, PendingOrdering};
use private_tx::NoopPrivateTxHandler; use private_tx::NoopPrivateTxHandler;
pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
@ -1357,7 +1361,7 @@ pub mod tests {
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1); assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1);
} }
// We need to update nonce status (because we say that the block has been imported) // We need to update nonce status (because we say that the block has been imported)
for h in &[good_blocks[0]] { for h in &[good_blocks[0]] {
@ -1373,7 +1377,7 @@ pub mod tests {
} }
// then // then
assert_eq!(client.miner.ready_transactions(&client).len(), 1); assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
} }
#[test] #[test]

View File

@ -33,6 +33,7 @@ use super::{
MAX_PEERS_PROPAGATION, MAX_PEERS_PROPAGATION,
MAX_TRANSACTION_PACKET_SIZE, MAX_TRANSACTION_PACKET_SIZE,
MAX_TRANSACTIONS_TO_PROPAGATE, MAX_TRANSACTIONS_TO_PROPAGATE,
MAX_TRANSACTIONS_TO_QUERY,
MIN_PEERS_PROPAGATION, MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET, CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET, NEW_BLOCK_HASHES_PACKET,
@ -114,7 +115,7 @@ impl SyncPropagator {
return 0; return 0;
} }
let transactions = io.chain().ready_transactions(); let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY);
if transactions.is_empty() { if transactions.is_empty() {
return 0; return 0;
} }

View File

@ -35,9 +35,9 @@ extern crate transaction_pool as txpool;
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
extern crate trace_time;
#[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate trace_time;
#[cfg(test)] #[cfg(test)]
extern crate rustc_hex; extern crate rustc_hex;

View File

@ -16,7 +16,7 @@
//! Transaction Pool //! Transaction Pool
use ethereum_types::{H256, Address}; use ethereum_types::{U256, H256, Address};
use heapsize::HeapSizeOf; use heapsize::HeapSizeOf;
use transaction; use transaction;
use txpool; use txpool;
@ -45,6 +45,43 @@ pub enum PrioritizationStrategy {
GasPriceOnly, GasPriceOnly,
} }
/// Transaction ordering when requesting pending set.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PendingOrdering {
/// Get pending transactions ordered by their priority (potentially expensive)
Priority,
/// Get pending transactions without any care of particular ordering (cheaper).
Unordered,
}
/// Pending set query settings
#[derive(Debug, Clone)]
pub struct PendingSettings {
/// Current block number (affects readiness of some transactions).
pub block_number: u64,
/// Current timestamp (affects readiness of some transactions).
pub current_timestamp: u64,
/// Nonce cap (for dust protection; EIP-168)
pub nonce_cap: Option<U256>,
/// Maximal number of transactions in pending the set.
pub max_len: usize,
/// Ordering of transactions.
pub ordering: PendingOrdering,
}
impl PendingSettings {
/// Get all transactions (no cap or len limit) prioritized.
pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self {
PendingSettings {
block_number,
current_timestamp,
nonce_cap: None,
max_len: usize::max_value(),
ordering: PendingOrdering::Priority,
}
}
}
/// Transaction priority. /// Transaction priority.
#[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)] #[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)]
pub(crate) enum Priority { pub(crate) enum Priority {

View File

@ -26,7 +26,10 @@ use parking_lot::RwLock;
use transaction; use transaction;
use txpool::{self, Verifier}; use txpool::{self, Verifier};
use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy}; use pool::{
self, scoring, verifier, client, ready, listener,
PrioritizationStrategy, PendingOrdering, PendingSettings,
};
use pool::local_transactions::LocalTransactionsList; use pool::local_transactions::LocalTransactionsList;
type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger));
@ -82,6 +85,7 @@ struct CachedPending {
nonce_cap: Option<U256>, nonce_cap: Option<U256>,
has_local_pending: bool, has_local_pending: bool,
pending: Option<Vec<Arc<pool::VerifiedTransaction>>>, pending: Option<Vec<Arc<pool::VerifiedTransaction>>>,
max_len: usize,
} }
impl CachedPending { impl CachedPending {
@ -93,6 +97,7 @@ impl CachedPending {
has_local_pending: false, has_local_pending: false,
pending: None, pending: None,
nonce_cap: None, nonce_cap: None,
max_len: 0,
} }
} }
@ -107,6 +112,7 @@ impl CachedPending {
block_number: u64, block_number: u64,
current_timestamp: u64, current_timestamp: u64,
nonce_cap: Option<&U256>, nonce_cap: Option<&U256>,
max_len: usize,
) -> Option<Vec<Arc<pool::VerifiedTransaction>>> { ) -> Option<Vec<Arc<pool::VerifiedTransaction>>> {
// First check if we have anything in cache. // First check if we have anything in cache.
let pending = self.pending.as_ref()?; let pending = self.pending.as_ref()?;
@ -131,7 +137,12 @@ impl CachedPending {
return None; return None;
} }
Some(pending.clone()) // It's fine to just take a smaller subset, but not other way around.
if max_len > self.max_len {
return None;
}
Some(pending.iter().take(max_len).cloned().collect())
} }
} }
@ -228,7 +239,7 @@ impl TransactionQueue {
transactions: Vec<verifier::Transaction>, transactions: Vec<verifier::Transaction>,
) -> Vec<Result<(), transaction::Error>> { ) -> Vec<Result<(), transaction::Error>> {
// Run verification // Run verification
let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import"); trace_time!("pool::verify_and_import");
let options = self.options.read().clone(); let options = self.options.read().clone();
let transaction_to_replace = { let transaction_to_replace = {
@ -287,10 +298,10 @@ impl TransactionQueue {
results results
} }
/// Returns all transactions in the queue ordered by priority. /// Returns all transactions in the queue without explicit ordering.
pub fn all_transactions(&self) -> Vec<Arc<pool::VerifiedTransaction>> { pub fn all_transactions(&self) -> Vec<Arc<pool::VerifiedTransaction>> {
let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready; let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready;
self.pool.read().pending(ready).collect() self.pool.read().unordered_pending(ready).collect()
} }
/// Computes unordered set of pending hashes. /// Computes unordered set of pending hashes.
@ -314,24 +325,31 @@ impl TransactionQueue {
pub fn pending<C>( pub fn pending<C>(
&self, &self,
client: C, client: C,
block_number: u64, settings: PendingSettings,
current_timestamp: u64,
nonce_cap: Option<U256>,
) -> Vec<Arc<pool::VerifiedTransaction>> where ) -> Vec<Arc<pool::VerifiedTransaction>> where
C: client::NonceClient, C: client::NonceClient,
{ {
let PendingSettings { block_number, current_timestamp, nonce_cap, max_len, ordering } = settings;
if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) { if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) {
return pending; return pending;
} }
// Double check after acquiring write lock // Double check after acquiring write lock
let mut cached_pending = self.cached_pending.write(); let mut cached_pending = self.cached_pending.write();
if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) { if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) {
return pending; return pending;
} }
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect()); // In case we don't have a cached set, but we don't care about order
// just return the unordered set.
if let PendingOrdering::Unordered = ordering {
let ready = Self::ready(client, block_number, current_timestamp, nonce_cap);
return self.pool.read().unordered_pending(ready).take(max_len).collect();
}
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| {
i.take(max_len).collect()
});
*cached_pending = CachedPending { *cached_pending = CachedPending {
block_number, block_number,
@ -339,6 +357,7 @@ impl TransactionQueue {
nonce_cap, nonce_cap,
has_local_pending: self.has_local_pending_transactions(), has_local_pending: self.has_local_pending_transactions(),
pending: Some(pending.clone()), pending: Some(pending.clone()),
max_len,
}; };
pending pending
@ -363,15 +382,27 @@ impl TransactionQueue {
scoring::NonceAndGasPrice, scoring::NonceAndGasPrice,
Listener, Listener,
>) -> T, >) -> T,
{
debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number);
trace_time!("pool::collect_pending");
let ready = Self::ready(client, block_number, current_timestamp, nonce_cap);
collect(self.pool.read().pending(ready))
}
fn ready<C>(
client: C,
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<U256>,
) -> (ready::Condition, ready::State<C>) where
C: client::NonceClient,
{ {
let pending_readiness = ready::Condition::new(block_number, current_timestamp); let pending_readiness = ready::Condition::new(block_number, current_timestamp);
// don't mark any transactions as stale at this point. // don't mark any transactions as stale at this point.
let stale_id = None; let stale_id = None;
let state_readiness = ready::State::new(client, stale_id, nonce_cap); let state_readiness = ready::State::new(client, stale_id, nonce_cap);
let ready = (pending_readiness, state_readiness); (pending_readiness, state_readiness)
collect(self.pool.read().pending(ready))
} }
/// Culls all stalled transactions from the pool. /// Culls all stalled transactions from the pool.
@ -523,6 +554,12 @@ impl TransactionQueue {
let mut pool = self.pool.write(); let mut pool = self.pool.write();
(pool.listener_mut().1).0.add(f); (pool.listener_mut().1).0.add(f);
} }
/// Check if pending set is cached.
#[cfg(test)]
pub fn is_pending_cached(&self) -> bool {
self.cached_pending.read().pending.is_some()
}
} }
@ -549,7 +586,7 @@ mod tests {
fn should_get_pending_transactions() { fn should_get_pending_transactions() {
let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly); let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly);
let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None); let pending: Vec<_> = queue.pending(TestClient::default(), PendingSettings::all_prioritized(0, 0));
for tx in pending { for tx in pending {
assert!(tx.signed().nonce > 0.into()); assert!(tx.signed().nonce > 0.into());

View File

@ -18,7 +18,7 @@ use ethereum_types::U256;
use transaction::{self, PendingTransaction}; use transaction::{self, PendingTransaction};
use txpool; use txpool;
use pool::{verifier, TransactionQueue, PrioritizationStrategy}; use pool::{verifier, TransactionQueue, PrioritizationStrategy, PendingSettings, PendingOrdering};
pub mod tx; pub mod tx;
pub mod client; pub mod client;
@ -158,7 +158,7 @@ fn should_handle_same_transaction_imported_twice_with_different_state_nonces() {
// and then there should be only one transaction in current (the one with higher gas_price) // and then there should be only one transaction in current (the one with higher gas_price)
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1); assert_eq!(txq.status().status.transaction_count, 1);
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); assert_eq!(top[0].hash, hash);
} }
@ -183,7 +183,7 @@ fn should_move_all_transactions_from_future() {
// then // then
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2); assert_eq!(top[1].hash, hash2);
} }
@ -257,7 +257,7 @@ fn should_import_txs_from_same_sender() {
txq.import(TestClient::new(), txs.local().into_vec()); txq.import(TestClient::new(), txs.local().into_vec());
// then // then
let top = txq.pending(TestClient::new(), 0 ,0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0 ,0));
assert_eq!(top[0].hash, hash); assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2); assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
@ -279,7 +279,7 @@ fn should_prioritize_local_transactions_within_same_nonce_height() {
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
// then // then
let top = txq.pending(client, 0, 0, None); let top = txq.pending(client, PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); // local should be first assert_eq!(top[0].hash, hash); // local should be first
assert_eq!(top[1].hash, hash2); assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
@ -301,7 +301,7 @@ fn should_prioritize_reimported_transactions_within_same_nonce_height() {
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
// then // then
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); // retracted should be first assert_eq!(top[0].hash, hash); // retracted should be first
assert_eq!(top[1].hash, hash2); assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
@ -320,7 +320,7 @@ fn should_not_prioritize_local_transactions_with_different_nonce_height() {
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
// then // then
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top[0].hash, hash); assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2); assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
@ -338,7 +338,7 @@ fn should_put_transaction_to_futures_if_gap_detected() {
// then // then
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top.len(), 1); assert_eq!(top.len(), 1);
assert_eq!(top[0].hash, hash); assert_eq!(top[0].hash, hash);
} }
@ -358,9 +358,9 @@ fn should_handle_min_block() {
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
// then // then
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top.len(), 0); assert_eq!(top.len(), 0);
let top = txq.pending(TestClient::new(), 1, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(1, 0));
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
} }
@ -391,7 +391,7 @@ fn should_move_transactions_if_gap_filled() {
let res = txq.import(TestClient::new(), vec![tx, tx2].local()); let res = txq.import(TestClient::new(), vec![tx, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
// when // when
let res = txq.import(TestClient::new(), vec![tx1.local()]); let res = txq.import(TestClient::new(), vec![tx1.local()]);
@ -399,7 +399,7 @@ fn should_move_transactions_if_gap_filled() {
// then // then
assert_eq!(txq.status().status.transaction_count, 3); assert_eq!(txq.status().status.transaction_count, 3);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3);
} }
#[test] #[test]
@ -411,12 +411,12 @@ fn should_remove_transaction() {
let res = txq.import(TestClient::default(), vec![tx, tx2].local()); let res = txq.import(TestClient::default(), vec![tx, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
// when // when
txq.cull(TestClient::new().with_nonce(124)); txq.cull(TestClient::new().with_nonce(124));
assert_eq!(txq.status().status.transaction_count, 1); assert_eq!(txq.status().status.transaction_count, 1);
assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new().with_nonce(125), PendingSettings::all_prioritized(0, 0)).len(), 1);
txq.cull(TestClient::new().with_nonce(126)); txq.cull(TestClient::new().with_nonce(126));
// then // then
@ -434,19 +434,19 @@ fn should_move_transactions_to_future_if_gap_introduced() {
let res = txq.import(TestClient::new(), vec![tx3, tx2].local()); let res = txq.import(TestClient::new(), vec![tx3, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
let res = txq.import(TestClient::new(), vec![tx].local()); let res = txq.import(TestClient::new(), vec![tx].local());
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 3); assert_eq!(txq.status().status.transaction_count, 3);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3);
// when // when
txq.remove(vec![&hash], true); txq.remove(vec![&hash], true);
// then // then
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
} }
#[test] #[test]
@ -497,7 +497,7 @@ fn should_prefer_current_transactions_when_hitting_the_limit() {
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1); assert_eq!(txq.status().status.transaction_count, 1);
let top = txq.pending(TestClient::new(), 0, 0, None); let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(top.len(), 1); assert_eq!(top.len(), 1);
assert_eq!(top[0].hash, hash); assert_eq!(top[0].hash, hash);
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into())); assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into()));
@ -545,19 +545,19 @@ fn should_accept_same_transaction_twice_if_removed() {
let res = txq.import(TestClient::new(), txs.local().into_vec()); let res = txq.import(TestClient::new(), txs.local().into_vec());
assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2);
// when // when
txq.remove(vec![&hash], true); txq.remove(vec![&hash], true);
assert_eq!(txq.status().status.transaction_count, 1); assert_eq!(txq.status().status.transaction_count, 1);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 0);
let res = txq.import(TestClient::new(), vec![tx1].local()); let res = txq.import(TestClient::new(), vec![tx1].local());
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
// then // then
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2);
} }
#[test] #[test]
@ -577,8 +577,8 @@ fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() {
// then // then
assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]); assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2); assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20)); assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[0].signed().gas_price, U256::from(20));
assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2)); assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[1].signed().gas_price, U256::from(2));
} }
#[test] #[test]
@ -620,7 +620,7 @@ fn should_return_valid_last_nonce_after_cull() {
let client = TestClient::new().with_nonce(124); let client = TestClient::new().with_nonce(124);
txq.cull(client.clone()); txq.cull(client.clone());
// tx2 should be not be promoted to current // tx2 should be not be promoted to current
assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0); assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0)).len(), 0);
// then // then
assert_eq!(txq.next_nonce(client.clone(), &sender), None); assert_eq!(txq.next_nonce(client.clone(), &sender), None);
@ -718,7 +718,7 @@ fn should_accept_local_transactions_below_min_gas_price() {
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
// then // then
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
} }
#[test] #[test]
@ -736,7 +736,7 @@ fn should_accept_local_service_transaction() {
assert_eq!(res, vec![Ok(())]); assert_eq!(res, vec![Ok(())]);
// then // then
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
} }
#[test] #[test]
@ -777,9 +777,15 @@ fn should_not_return_transactions_over_nonce_cap() {
assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]); assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]);
// when // when
let all = txq.pending(TestClient::new(), 0, 0, None); let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
// This should invalidate the cache! // This should invalidate the cache!
let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into())); let limited = txq.pending(TestClient::new(), PendingSettings {
block_number: 0,
current_timestamp: 0,
nonce_cap: Some(123.into()),
max_len: usize::max_value(),
ordering: PendingOrdering::Priority,
});
// then // then
@ -787,6 +793,62 @@ fn should_not_return_transactions_over_nonce_cap() {
assert_eq!(limited.len(), 1); assert_eq!(limited.len(), 1);
} }
#[test]
fn should_return_cached_pending_even_if_unordered_is_requested() {
// given
let txq = new_queue();
let tx1 = Tx::default().signed();
let (tx2_1, tx2_2)= Tx::default().signed_pair();
let tx2_1_hash = tx2_1.hash();
let res = txq.import(TestClient::new(), vec![tx1].unverified());
assert_eq!(res, vec![Ok(())]);
let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
// when
let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
assert_eq!(all[0].hash, tx2_1_hash);
assert_eq!(all.len(), 3);
// This should not invalidate the cache!
let limited = txq.pending(TestClient::new(), PendingSettings {
block_number: 0,
current_timestamp: 0,
nonce_cap: None,
max_len: 3,
ordering: PendingOrdering::Unordered,
});
// then
assert_eq!(all, limited);
}
#[test]
fn should_return_unordered_and_not_populate_the_cache() {
// given
let txq = new_queue();
let tx1 = Tx::default().signed();
let (tx2_1, tx2_2)= Tx::default().signed_pair();
let res = txq.import(TestClient::new(), vec![tx1].unverified());
assert_eq!(res, vec![Ok(())]);
let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
// when
// This should not invalidate the cache!
let limited = txq.pending(TestClient::new(), PendingSettings {
block_number: 0,
current_timestamp: 0,
nonce_cap: None,
max_len: usize::max_value(),
ordering: PendingOrdering::Unordered,
});
// then
assert_eq!(limited.len(), 3);
assert!(!txq.is_pending_cached());
}
#[test] #[test]
fn should_clear_cache_after_timeout_for_local() { fn should_clear_cache_after_timeout_for_local() {
// given // given
@ -800,12 +862,12 @@ fn should_clear_cache_after_timeout_for_local() {
// This should populate cache and set timestamp to 1 // This should populate cache and set timestamp to 1
// when // when
assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1)).len(), 0);
assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1000)).len(), 0);
// This should invalidate the cache and trigger transaction ready. // This should invalidate the cache and trigger transaction ready.
// then // then
assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2); assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1002)).len(), 2);
} }
#[test] #[test]

View File

@ -306,7 +306,7 @@ impl FullDependencies {
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); let client = EthPubSubClient::new(self.client.clone(), self.remote.clone());
let h = client.handler(); let h = client.handler();
self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() { self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() {
h.new_transactions(hashes); h.notify_new_transactions(hashes);
})); }));
if let Some(h) = client.handler().upgrade() { if let Some(h) = client.handler().upgrade() {
@ -527,7 +527,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
let h = client.handler(); let h = client.handler();
self.transaction_queue.write().add_listener(Box::new(move |transactions| { self.transaction_queue.write().add_listener(Box::new(move |transactions| {
if let Some(h) = h.upgrade() { if let Some(h) = h.upgrade() {
h.new_transactions(transactions); h.notify_new_transactions(transactions);
} }
})); }));
handler.extend_with(EthPubSub::to_delegate(client)); handler.extend_with(EthPubSub::to_delegate(client));

View File

@ -175,7 +175,7 @@ impl<C> ChainNotificationHandler<C> {
} }
/// Notify all subscribers about new transaction hashes. /// Notify all subscribers about new transaction hashes.
pub fn new_transactions(&self, hashes: &[H256]) { pub fn notify_new_transactions(&self, hashes: &[H256]) {
for subscriber in self.transactions_subscribers.read().values() { for subscriber in self.transactions_subscribers.read().values() {
for hash in hashes { for hash in hashes {
Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into())); Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into()));

View File

@ -264,12 +264,13 @@ impl Parity for ParityClient {
.map(Into::into) .map(Into::into)
} }
fn pending_transactions(&self) -> Result<Vec<Transaction>> { fn pending_transactions(&self, limit: Trailing<usize>) -> Result<Vec<Transaction>> {
let txq = self.light_dispatch.transaction_queue.read(); let txq = self.light_dispatch.transaction_queue.read();
let chain_info = self.light_dispatch.client.chain_info(); let chain_info = self.light_dispatch.client.chain_info();
Ok( Ok(
txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
.into_iter() .into_iter()
.take(limit.unwrap_or_else(usize::max_value))
.map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition)) .map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition))
.collect::<Vec<_>>() .collect::<Vec<_>>()
) )

View File

@ -313,15 +313,19 @@ impl<C, M, U, S> Parity for ParityClient<C, M, U> where
.map(Into::into) .map(Into::into)
} }
fn pending_transactions(&self) -> Result<Vec<Transaction>> { fn pending_transactions(&self, limit: Trailing<usize>) -> Result<Vec<Transaction>> {
let block_number = self.client.chain_info().best_block_number; let block_number = self.client.chain_info().best_block_number;
let ready_transactions = self.miner.ready_transactions(&*self.client); let ready_transactions = self.miner.ready_transactions(
&*self.client,
limit.unwrap_or_else(usize::max_value),
miner::PendingOrdering::Priority,
);
Ok(ready_transactions Ok(ready_transactions
.into_iter() .into_iter()
.map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition)) .map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition))
.collect() .collect()
) )
} }
fn all_transactions(&self) -> Result<Vec<Transaction>> { fn all_transactions(&self) -> Result<Vec<Transaction>> {

View File

@ -27,7 +27,7 @@ use ethcore::engines::EthEngine;
use ethcore::error::Error; use ethcore::error::Error;
use ethcore::header::{BlockNumber, Header}; use ethcore::header::{BlockNumber, Header};
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::miner::{MinerService, AuthoringParams}; use ethcore::miner::{self, MinerService, AuthoringParams};
use ethcore::receipt::{Receipt, RichReceipt}; use ethcore::receipt::{Receipt, RichReceipt};
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use miner::pool::local_transactions::Status as LocalTransactionStatus; use miner::pool::local_transactions::Status as LocalTransactionStatus;
@ -215,7 +215,7 @@ impl MinerService for TestMinerService {
self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect() self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect()
} }
fn ready_transactions<C>(&self, _chain: &C) -> Vec<Arc<VerifiedTransaction>> { fn ready_transactions<C>(&self, _chain: &C, _max_len: usize, _ordering: miner::PendingOrdering) -> Vec<Arc<VerifiedTransaction>> {
self.queued_transactions() self.queued_transactions()
} }

View File

@ -183,7 +183,7 @@ fn should_subscribe_to_pending_transactions() {
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Send new transactions // Send new transactions
handler.new_transactions(&[5.into(), 7.into()]); handler.notify_new_transactions(&[5.into(), 7.into()]);
let (res, receiver) = receiver.into_future().wait().unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#; let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#;

View File

@ -141,7 +141,7 @@ build_rpc_trait! {
/// Returns all pending transactions from transaction queue. /// Returns all pending transactions from transaction queue.
#[rpc(name = "parity_pendingTransactions")] #[rpc(name = "parity_pendingTransactions")]
fn pending_transactions(&self) -> Result<Vec<Transaction>>; fn pending_transactions(&self, Trailing<usize>) -> Result<Vec<Transaction>>;
/// Returns all transactions from transaction queue. /// Returns all transactions from transaction queue.
/// ///

View File

@ -15,7 +15,8 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::sync::Arc;
use std::collections::{HashMap, BTreeSet}; use std::slice;
use std::collections::{hash_map, HashMap, BTreeSet};
use error; use error;
use listener::{Listener, NoopListener}; use listener::{Listener, NoopListener};
@ -443,7 +444,16 @@ impl<T, S, L> Pool<T, S, L> where
PendingIterator { PendingIterator {
ready, ready,
best_transactions, best_transactions,
pool: self pool: self,
}
}
/// Returns unprioritized list of ready transactions.
pub fn unordered_pending<R: Ready<T>>(&self, ready: R) -> UnorderedIterator<T, R, S> {
UnorderedIterator {
ready,
senders: self.transactions.iter(),
transactions: None,
} }
} }
@ -514,6 +524,50 @@ impl<T, S, L> Pool<T, S, L> where
} }
} }
/// An iterator over all pending (ready) transactions in unoredered fashion.
///
/// NOTE: Current implementation will iterate over all transactions from particular sender
/// ordered by nonce, but that might change in the future.
///
/// NOTE: the transactions are not removed from the queue.
/// You might remove them later by calling `cull`.
pub struct UnorderedIterator<'a, T, R, S> where
T: VerifiedTransaction + 'a,
S: Scoring<T> + 'a,
{
ready: R,
senders: hash_map::Iter<'a, T::Sender, Transactions<T, S>>,
transactions: Option<slice::Iter<'a, Transaction<T>>>,
}
impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> where
T: VerifiedTransaction,
R: Ready<T>,
S: Scoring<T>,
{
type Item = Arc<T>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(transactions) = self.transactions.as_mut() {
if let Some(tx) = transactions.next() {
match self.ready.is_ready(&tx) {
Readiness::Ready => {
return Some(tx.transaction.clone());
},
state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state),
}
}
}
// otherwise fallback and try next sender
let next_sender = self.senders.next()?;
self.transactions = Some(next_sender.1.iter());
}
}
}
/// An iterator over all pending (ready) transactions. /// An iterator over all pending (ready) transactions.
/// NOTE: the transactions are not removed from the queue. /// NOTE: the transactions are not removed from the queue.
/// You might remove them later by calling `cull`. /// You might remove them later by calling `cull`.

View File

@ -259,6 +259,66 @@ fn should_construct_pending() {
assert_eq!(pending.next(), None); assert_eq!(pending.next(), None);
} }
#[test]
fn should_return_unordered_iterator() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
let tx2 = txq.import(b.tx().nonce(2).new()).unwrap();
let tx3 = txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap();
//gap
txq.import(b.tx().nonce(5).new()).unwrap();
let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap();
let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap();
// gap
txq.import(b.tx().sender(1).nonce(5).new()).unwrap();
let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap();
assert_eq!(txq.light_status().transaction_count, 11);
assert_eq!(txq.status(NonceReady::default()), Status {
stalled: 0,
pending: 9,
future: 2,
});
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 3,
pending: 6,
future: 2,
});
// when
let all: Vec<_> = txq.unordered_pending(NonceReady::default()).collect();
let chain1 = vec![tx0, tx1, tx2, tx3];
let chain2 = vec![tx5, tx6, tx7, tx8];
let chain3 = vec![tx9];
assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len());
let mut options = vec![
vec![chain1.clone(), chain2.clone(), chain3.clone()],
vec![chain2.clone(), chain1.clone(), chain3.clone()],
vec![chain2.clone(), chain3.clone(), chain1.clone()],
vec![chain3.clone(), chain2.clone(), chain1.clone()],
vec![chain3.clone(), chain1.clone(), chain2.clone()],
vec![chain1.clone(), chain3.clone(), chain2.clone()],
].into_iter().map(|mut v| {
let mut first = v.pop().unwrap();
for mut x in v {
first.append(&mut x);
}
first
});
assert!(options.any(|opt| all == opt));
}
#[test] #[test]
fn should_update_scoring_correctly() { fn should_update_scoring_correctly() {
// given // given