diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 27d5c12a5..34f426cb7 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); +/// Max number of transactions in a single packet. +const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; + // minimum interval between updates. const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); @@ -648,7 +651,7 @@ impl LightProtocol { fn propagate_transactions(&self, io: &IoContext) { if self.capabilities.read().tx_relay { return } - let ready_transactions = self.provider.ready_transactions(); + let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE); if ready_transactions.is_empty() { return } trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len()); diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 3c04c0ffb..a653a8a7a 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -173,8 +173,8 @@ impl Provider for TestProvider { }) } - fn ready_transactions(&self) -> Vec { - self.0.client.ready_transactions() + fn ready_transactions(&self, max_len: usize) -> Vec { + self.0.client.ready_transactions(max_len) } } diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index c993528e2..70a4a88b5 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -128,7 +128,7 @@ pub trait Provider: Send + Sync { fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option; /// Provide pending transactions. - fn ready_transactions(&self) -> Vec; + fn ready_transactions(&self, max_len: usize) -> Vec; /// Provide a proof-of-execution for the given transaction proof request. /// Returns a vector of all state items necessary to execute the transaction. @@ -283,8 +283,8 @@ impl Provider for T { .map(|(_, proof)| ::request::ExecutionResponse { items: proof }) } - fn ready_transactions(&self) -> Vec { - BlockChainClient::ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec { + BlockChainClient::ready_transactions(self, max_len) .into_iter() .map(|tx| tx.pending().clone()) .collect() @@ -370,9 +370,12 @@ impl Provider for LightProvider { None } - fn ready_transactions(&self) -> Vec { + fn ready_transactions(&self, max_len: usize) -> Vec { let chain_info = self.chain_info(); - self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + let mut transactions = self.txqueue.read() + .ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp); + transactions.truncate(max_len); + transactions } } diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index f703329d6..3b3407e66 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -94,6 +94,7 @@ impl ClientService { let pruning = config.pruning; let client = Client::new(config, &spec, client_db.clone(), miner.clone(), io_service.channel())?; + miner.set_io_channel(io_service.channel()); let snapshot_params = SnapServiceParams { engine: spec.engine.clone(), diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 580cabb3a..305f71209 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -200,7 +200,7 @@ pub struct Client { /// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`. liveness: AtomicBool, - io_channel: Mutex>, + io_channel: RwLock>, /// List of actors to be notified on certain chain events notify: RwLock>>, @@ -712,7 +712,7 @@ impl Client { db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), report: RwLock::new(Default::default()), - io_channel: Mutex::new(message_channel), + io_channel: RwLock::new(message_channel), notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), @@ -947,7 +947,7 @@ impl Client { /// Replace io channel. Useful for testing. pub fn set_io_channel(&self, io_channel: IoChannel) { - *self.io_channel.lock() = io_channel; + *self.io_channel.write() = io_channel; } /// Get a copy of the best block's state. @@ -1893,8 +1893,8 @@ impl BlockChainClient for Client { (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() } - fn ready_transactions(&self) -> Vec> { - self.importer.miner.ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec> { + self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority) } fn signing_chain_id(&self) -> Option { @@ -1952,7 +1952,7 @@ impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { trace_time!("queue_transactions"); let len = transactions.len(); - self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { + self.queue_transactions.queue(&self.io_channel.read(), len, move |client| { trace_time!("import_queued_transactions"); let txs: Vec = transactions @@ -2001,7 +2001,7 @@ impl IoClient for Client { let queued = self.queued_ancient_blocks.clone(); let lock = self.ancient_blocks_import_lock.clone(); - match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { + match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| { trace_time!("import_ancient_block"); // Make sure to hold the lock here to prevent importing out of order. // We use separate lock, cause we don't want to block queueing. @@ -2033,7 +2033,7 @@ impl IoClient for Client { } fn queue_consensus_message(&self, message: Bytes) { - match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { + match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| { if let Err(e) = client.engine().handle_message(&message) { debug!(target: "poa", "Invalid message received: {}", e); } @@ -2142,7 +2142,14 @@ impl ImportSealedBlock for Client { route }; let route = ChainRoute::from([route].as_ref()); - self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true); + self.importer.miner.chain_new_blocks( + self, + &[h.clone()], + &[], + route.enacted(), + route.retracted(), + self.engine.seals_internally().is_some(), + ); self.notify(|notify| { notify.new_blocks( vec![h.clone()], @@ -2452,7 +2459,7 @@ impl IoChannelQueue { } } - pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where + pub fn queue(&self, channel: &IoChannel, count: usize, fun: F) -> Result<(), QueueError> where F: Fn(&Client) + Send + Sync + 'static, { let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 6a3166f7c..cc3892248 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry; use receipt::{Receipt, LocalizedReceipt, TransactionOutcome}; use error::ImportResult; use vm::Schedule; -use miner::{Miner, MinerService}; +use miner::{self, Miner, MinerService}; use spec::Spec; use types::basic_account::BasicAccount; use types::mode::Mode; @@ -806,8 +806,8 @@ impl BlockChainClient for TestBlockChainClient { self.traces.read().clone() } - fn ready_transactions(&self) -> Vec> { - self.miner.ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec> { + self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority) } fn signing_chain_id(&self) -> Option { None } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 358e24fa9..cf62043c9 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra fn last_hashes(&self) -> LastHashes; /// List all transactions that are allowed into the next block. - fn ready_transactions(&self) -> Vec>; + fn ready_transactions(&self, max_len: usize) -> Vec>; /// Sorted list of transaction gas prices from at least last sample_size blocks. fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus { diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index cabce3c91..f0ed74f52 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -27,6 +27,7 @@ use ethcore_miner::gas_pricer::GasPricer; use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy}; use ethcore_miner::work_notify::NotifyWork; use ethereum_types::{H256, U256, Address}; +use io::IoChannel; use parking_lot::{Mutex, RwLock}; use rayon::prelude::*; use transaction::{ @@ -43,7 +44,7 @@ use block::{ClosedBlock, IsBlock, Block, SealedBlock}; use client::{ BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce }; -use client::BlockId; +use client::{BlockId, ClientIoMessage}; use executive::contract_address; use header::{Header, BlockNumber}; use miner; @@ -209,6 +210,7 @@ pub struct Miner { transaction_queue: Arc, engine: Arc, accounts: Option>, + io_channel: RwLock>>, } impl Miner { @@ -224,7 +226,12 @@ impl Miner { } /// Creates new instance of miner Arc. - pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option>) -> Self { + pub fn new( + options: MinerOptions, + gas_pricer: GasPricer, + spec: &Spec, + accounts: Option>, + ) -> Self { let limits = options.pool_limits.clone(); let verifier_options = options.pool_verification_options.clone(); let tx_queue_strategy = options.tx_queue_strategy; @@ -247,6 +254,7 @@ impl Miner { transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)), accounts, engine: spec.engine.clone(), + io_channel: RwLock::new(None), } } @@ -266,6 +274,11 @@ impl Miner { }, GasPricer::new_fixed(minimal_gas_price), spec, accounts) } + /// Sets `IoChannel` + pub fn set_io_channel(&self, io_channel: IoChannel) { + *self.io_channel.write() = Some(io_channel); + } + /// Clear all pending block states pub fn clear(&self) { self.sealing.lock().queue.reset(); @@ -368,18 +381,28 @@ impl Miner { let client = self.pool_client(chain); let engine_params = self.engine.params(); - let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); + let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); let nonce_cap: Option = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition { Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into()) } else { None }; + // we will never need more transactions than limit divided by min gas + let max_transactions = if min_tx_gas.is_zero() { + usize::max_value() + } else { + (*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize + }; let pending: Vec> = self.transaction_queue.pending( client.clone(), - chain_info.best_block_number, - chain_info.best_block_timestamp, - nonce_cap, + pool::PendingSettings { + block_number: chain_info.best_block_number, + current_timestamp: chain_info.best_block_timestamp, + nonce_cap, + max_len: max_transactions, + ordering: miner::PendingOrdering::Priority, + } ); let took_ms = |elapsed: &Duration| { @@ -871,7 +894,7 @@ impl miner::MinerService for Miner { } } - fn ready_transactions(&self, chain: &C) + fn ready_transactions(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering) -> Vec> where C: ChainInfo + Nonce + Sync, @@ -879,14 +902,20 @@ impl miner::MinerService for Miner { let chain_info = chain.chain_info(); let from_queue = || { + // We propagate transactions over the nonce cap. + // The mechanism is only to limit number of transactions in pending block + // those transactions are valid and will just be ready to be included in next block. + let nonce_cap = None; + self.transaction_queue.pending( CachedNonceClient::new(chain, &self.nonce_cache), - chain_info.best_block_number, - chain_info.best_block_timestamp, - // We propagate transactions over the nonce cap. - // The mechanism is only to limit number of transactions in pending block - // those transactions are valid and will just be ready to be included in next block. - None, + pool::PendingSettings { + block_number: chain_info.best_block_number, + current_timestamp: chain_info.best_block_timestamp, + nonce_cap, + max_len, + ordering, + }, ) }; @@ -896,6 +925,7 @@ impl miner::MinerService for Miner { .iter() .map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone())) .map(Arc::new) + .take(max_len) .collect() }, chain_info.best_block_number) }; @@ -1130,7 +1160,32 @@ impl miner::MinerService for Miner { // (thanks to Ready), but culling can take significant amount of time, // so best to leave it after we create some work for miners to prevent increased // uncle rate. - self.transaction_queue.cull(client); + // If the io_channel is available attempt to offload culling to a separate task + // to avoid blocking chain_new_blocks + if let Some(ref channel) = *self.io_channel.read() { + let queue = self.transaction_queue.clone(); + let nonce_cache = self.nonce_cache.clone(); + let engine = self.engine.clone(); + let accounts = self.accounts.clone(); + let refuse_service_transactions = self.options.refuse_service_transactions; + + let cull = move |chain: &::client::Client| { + let client = PoolClient::new( + chain, + &nonce_cache, + &*engine, + accounts.as_ref().map(|x| &**x), + refuse_service_transactions, + ); + queue.cull(client); + }; + + if let Err(e) = channel.send(ClientIoMessage::execute(cull)) { + warn!(target: "miner", "Error queueing cull: {:?}", e); + } + } else { + self.transaction_queue.cull(client); + } } } @@ -1160,7 +1215,7 @@ mod tests { use rustc_hex::FromHex; use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock}; - use miner::MinerService; + use miner::{MinerService, PendingOrdering}; use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts}; use transaction::{Transaction}; @@ -1259,7 +1314,7 @@ mod tests { assert_eq!(res.unwrap(), ()); assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1); assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1); - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); // This method will let us know if pending block was created (before calling that method) assert!(!miner.prepare_pending_block(&client)); } @@ -1278,7 +1333,7 @@ mod tests { assert_eq!(res.unwrap(), ()); assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_receipts(best_block), None); - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] @@ -1297,11 +1352,11 @@ mod tests { assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_receipts(best_block), None); // By default we use PendingSet::AlwaysSealing, so no transactions yet. - assert_eq!(miner.ready_transactions(&client).len(), 0); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0); // This method will let us know if pending block was created (before calling that method) assert!(miner.prepare_pending_block(&client)); // After pending block is created we should see a transaction. - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 340285b9b..cfe6946aa 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -26,6 +26,7 @@ pub mod pool_client; pub mod stratum; pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams}; +pub use ethcore_miner::pool::PendingOrdering; use std::sync::Arc; use std::collections::{BTreeSet, BTreeMap}; @@ -173,7 +174,9 @@ pub trait MinerService : Send + Sync { /// Get a list of all ready transactions either ordered by priority or unordered (cheaper). /// /// Depending on the settings may look in transaction pool or only in pending block. - fn ready_transactions(&self, chain: &C) -> Vec> + /// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of + /// transactions. + fn ready_transactions(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec> where C: ChainInfo + Nonce + Sync; /// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet). diff --git a/ethcore/src/miner/pool_client.rs b/ethcore/src/miner/pool_client.rs index c49eec6ee..d8fadd89a 100644 --- a/ethcore/src/miner/pool_client.rs +++ b/ethcore/src/miner/pool_client.rs @@ -16,8 +16,11 @@ //! Blockchain access for transaction pool. -use std::fmt; -use std::collections::HashMap; +use std::{ + collections::HashMap, + fmt, + sync::Arc, +}; use ethereum_types::{H256, U256, Address}; use ethcore_miner::pool; @@ -37,9 +40,9 @@ use miner; use miner::service_transaction_checker::ServiceTransactionChecker; /// Cache for state nonces. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NonceCache { - nonces: RwLock>, + nonces: Arc>>, limit: usize } @@ -47,7 +50,7 @@ impl NonceCache { /// Create new cache with a limit of `limit` entries. pub fn new(limit: usize) -> Self { NonceCache { - nonces: RwLock::new(HashMap::with_capacity(limit / 2)), + nonces: Arc::new(RwLock::new(HashMap::with_capacity(limit / 2))), limit, } } diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 6dcad9ba6..dbbd50041 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -30,7 +30,7 @@ use test_helpers::{ use types::filter::Filter; use ethereum_types::{U256, Address}; use kvdb_rocksdb::{Database, DatabaseConfig}; -use miner::Miner; +use miner::{Miner, PendingOrdering}; use spec::Spec; use views::BlockView; use ethkey::KeyPair; @@ -345,12 +345,12 @@ fn does_not_propagate_delayed_transactions() { client.miner().import_own_transaction(&*client, tx0).unwrap(); client.miner().import_own_transaction(&*client, tx1).unwrap(); - assert_eq!(0, client.ready_transactions().len()); - assert_eq!(0, client.miner().ready_transactions(&*client).len()); + assert_eq!(0, client.ready_transactions(10).len()); + assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); push_blocks_to_client(&client, 53, 2, 2); client.flush_queue(); - assert_eq!(2, client.ready_transactions().len()); - assert_eq!(2, client.miner().ready_transactions(&*client).len()); + assert_eq!(2, client.ready_transactions(10).len()); + assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); } #[test] diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index d5df80bb7..4f7fc8042 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -357,6 +357,10 @@ impl SyncProvider for EthSync { } } +const PEERS_TIMER: TimerToken = 0; +const SYNC_TIMER: TimerToken = 1; +const TX_TIMER: TimerToken = 2; + struct SyncProtocolHandler { /// Shared blockchain client. chain: Arc, @@ -371,7 +375,9 @@ struct SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { - io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer"); + io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); + io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); } } @@ -396,12 +402,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } } - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { trace_time!("sync::timeout"); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); - self.sync.write().maintain_peers(&mut io); - self.sync.write().maintain_sync(&mut io); - self.sync.write().propagate_new_transactions(&mut io); + match timer { + PEERS_TIMER => self.sync.write().maintain_peers(&mut io), + SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + TX_TIMER => { + self.sync.write().propagate_new_transactions(&mut io); + }, + _ => warn!("Unknown timer {} triggered.", timer), + } } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 7dcd7ef06..0b2efeb1a 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64; const MAX_NEW_BLOCK_AGE: BlockNumber = 20; // maximal packet size with transactions (cannot be greater than 16MB - protocol limitation). const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; +// Maximal number of transactions queried from miner to propagate. +// This set is used to diff with transactions known by the peer and +// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`. +const MAX_TRANSACTIONS_TO_QUERY: usize = 4096; // Maximal number of transactions in sent in single packet. const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; // Min number of blocks to be behind for a snapshot sync @@ -1144,7 +1148,7 @@ pub mod tests { use super::{PeerInfo, PeerAsking}; use ethcore::header::*; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo}; - use ethcore::miner::MinerService; + use ethcore::miner::{MinerService, PendingOrdering}; use private_tx::NoopPrivateTxHandler; pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { @@ -1357,7 +1361,7 @@ pub mod tests { let mut io = TestIo::new(&mut client, &ss, &queue, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); - assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1); + assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1); } // We need to update nonce status (because we say that the block has been imported) for h in &[good_blocks[0]] { @@ -1373,7 +1377,7 @@ pub mod tests { } // then - assert_eq!(client.miner.ready_transactions(&client).len(), 1); + assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 4ae0518a5..75cf550f2 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -33,6 +33,7 @@ use super::{ MAX_PEERS_PROPAGATION, MAX_TRANSACTION_PACKET_SIZE, MAX_TRANSACTIONS_TO_PROPAGATE, + MAX_TRANSACTIONS_TO_QUERY, MIN_PEERS_PROPAGATION, CONSENSUS_DATA_PACKET, NEW_BLOCK_HASHES_PACKET, @@ -114,7 +115,7 @@ impl SyncPropagator { return 0; } - let transactions = io.chain().ready_transactions(); + let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY); if transactions.is_empty() { return 0; } diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 89c716064..497ae7f44 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -35,9 +35,9 @@ extern crate transaction_pool as txpool; #[macro_use] extern crate error_chain; #[macro_use] -extern crate trace_time; -#[macro_use] extern crate log; +#[macro_use] +extern crate trace_time; #[cfg(test)] extern crate rustc_hex; diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 582b99e17..bef1cb164 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -16,7 +16,7 @@ //! Transaction Pool -use ethereum_types::{H256, Address}; +use ethereum_types::{U256, H256, Address}; use heapsize::HeapSizeOf; use transaction; use txpool; @@ -45,6 +45,43 @@ pub enum PrioritizationStrategy { GasPriceOnly, } +/// Transaction ordering when requesting pending set. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PendingOrdering { + /// Get pending transactions ordered by their priority (potentially expensive) + Priority, + /// Get pending transactions without any care of particular ordering (cheaper). + Unordered, +} + +/// Pending set query settings +#[derive(Debug, Clone)] +pub struct PendingSettings { + /// Current block number (affects readiness of some transactions). + pub block_number: u64, + /// Current timestamp (affects readiness of some transactions). + pub current_timestamp: u64, + /// Nonce cap (for dust protection; EIP-168) + pub nonce_cap: Option, + /// Maximal number of transactions in pending the set. + pub max_len: usize, + /// Ordering of transactions. + pub ordering: PendingOrdering, +} + +impl PendingSettings { + /// Get all transactions (no cap or len limit) prioritized. + pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self { + PendingSettings { + block_number, + current_timestamp, + nonce_cap: None, + max_len: usize::max_value(), + ordering: PendingOrdering::Priority, + } + } +} + /// Transaction priority. #[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)] pub(crate) enum Priority { diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 5e44b849e..f7f9a9c19 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -26,7 +26,10 @@ use parking_lot::RwLock; use transaction; use txpool::{self, Verifier}; -use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy}; +use pool::{ + self, scoring, verifier, client, ready, listener, + PrioritizationStrategy, PendingOrdering, PendingSettings, +}; use pool::local_transactions::LocalTransactionsList; type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); @@ -82,6 +85,7 @@ struct CachedPending { nonce_cap: Option, has_local_pending: bool, pending: Option>>, + max_len: usize, } impl CachedPending { @@ -93,6 +97,7 @@ impl CachedPending { has_local_pending: false, pending: None, nonce_cap: None, + max_len: 0, } } @@ -107,6 +112,7 @@ impl CachedPending { block_number: u64, current_timestamp: u64, nonce_cap: Option<&U256>, + max_len: usize, ) -> Option>> { // First check if we have anything in cache. let pending = self.pending.as_ref()?; @@ -131,7 +137,12 @@ impl CachedPending { return None; } - Some(pending.clone()) + // It's fine to just take a smaller subset, but not other way around. + if max_len > self.max_len { + return None; + } + + Some(pending.iter().take(max_len).cloned().collect()) } } @@ -228,7 +239,7 @@ impl TransactionQueue { transactions: Vec, ) -> Vec> { // Run verification - let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import"); + trace_time!("pool::verify_and_import"); let options = self.options.read().clone(); let transaction_to_replace = { @@ -287,10 +298,10 @@ impl TransactionQueue { results } - /// Returns all transactions in the queue ordered by priority. + /// Returns all transactions in the queue without explicit ordering. pub fn all_transactions(&self) -> Vec> { let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready; - self.pool.read().pending(ready).collect() + self.pool.read().unordered_pending(ready).collect() } /// Computes unordered set of pending hashes. @@ -314,24 +325,31 @@ impl TransactionQueue { pub fn pending( &self, client: C, - block_number: u64, - current_timestamp: u64, - nonce_cap: Option, + settings: PendingSettings, ) -> Vec> where C: client::NonceClient, { - - if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) { + let PendingSettings { block_number, current_timestamp, nonce_cap, max_len, ordering } = settings; + if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } // Double check after acquiring write lock let mut cached_pending = self.cached_pending.write(); - if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) { + if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } - let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect()); + // In case we don't have a cached set, but we don't care about order + // just return the unordered set. + if let PendingOrdering::Unordered = ordering { + let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); + return self.pool.read().unordered_pending(ready).take(max_len).collect(); + } + + let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| { + i.take(max_len).collect() + }); *cached_pending = CachedPending { block_number, @@ -339,6 +357,7 @@ impl TransactionQueue { nonce_cap, has_local_pending: self.has_local_pending_transactions(), pending: Some(pending.clone()), + max_len, }; pending @@ -363,15 +382,27 @@ impl TransactionQueue { scoring::NonceAndGasPrice, Listener, >) -> T, + { + debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number); + trace_time!("pool::collect_pending"); + let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); + collect(self.pool.read().pending(ready)) + } + + fn ready( + client: C, + block_number: u64, + current_timestamp: u64, + nonce_cap: Option, + ) -> (ready::Condition, ready::State) where + C: client::NonceClient, { let pending_readiness = ready::Condition::new(block_number, current_timestamp); // don't mark any transactions as stale at this point. let stale_id = None; let state_readiness = ready::State::new(client, stale_id, nonce_cap); - let ready = (pending_readiness, state_readiness); - - collect(self.pool.read().pending(ready)) + (pending_readiness, state_readiness) } /// Culls all stalled transactions from the pool. @@ -523,6 +554,12 @@ impl TransactionQueue { let mut pool = self.pool.write(); (pool.listener_mut().1).0.add(f); } + + /// Check if pending set is cached. + #[cfg(test)] + pub fn is_pending_cached(&self) -> bool { + self.cached_pending.read().pending.is_some() + } } @@ -549,7 +586,7 @@ mod tests { fn should_get_pending_transactions() { let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly); - let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None); + let pending: Vec<_> = queue.pending(TestClient::default(), PendingSettings::all_prioritized(0, 0)); for tx in pending { assert!(tx.signed().nonce > 0.into()); diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index 081a30b6c..c0252f862 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -18,7 +18,7 @@ use ethereum_types::U256; use transaction::{self, PendingTransaction}; use txpool; -use pool::{verifier, TransactionQueue, PrioritizationStrategy}; +use pool::{verifier, TransactionQueue, PrioritizationStrategy, PendingSettings, PendingOrdering}; pub mod tx; pub mod client; @@ -158,7 +158,7 @@ fn should_handle_same_transaction_imported_twice_with_different_state_nonces() { // and then there should be only one transaction in current (the one with higher gas_price) assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 1); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); } @@ -183,7 +183,7 @@ fn should_move_all_transactions_from_future() { // then assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); } @@ -257,7 +257,7 @@ fn should_import_txs_from_same_sender() { txq.import(TestClient::new(), txs.local().into_vec()); // then - let top = txq.pending(TestClient::new(), 0 ,0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0 ,0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -279,7 +279,7 @@ fn should_prioritize_local_transactions_within_same_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(client, 0, 0, None); + let top = txq.pending(client, PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); // local should be first assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -301,7 +301,7 @@ fn should_prioritize_reimported_transactions_within_same_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); // retracted should be first assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -320,7 +320,7 @@ fn should_not_prioritize_local_transactions_with_different_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -338,7 +338,7 @@ fn should_put_transaction_to_futures_if_gap_detected() { // then assert_eq!(res, vec![Ok(()), Ok(())]); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 1); assert_eq!(top[0].hash, hash); } @@ -358,9 +358,9 @@ fn should_handle_min_block() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 0); - let top = txq.pending(TestClient::new(), 1, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(1, 0)); assert_eq!(top.len(), 2); } @@ -391,7 +391,7 @@ fn should_move_transactions_if_gap_filled() { let res = txq.import(TestClient::new(), vec![tx, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); // when let res = txq.import(TestClient::new(), vec![tx1.local()]); @@ -399,7 +399,7 @@ fn should_move_transactions_if_gap_filled() { // then assert_eq!(txq.status().status.transaction_count, 3); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3); } #[test] @@ -411,12 +411,12 @@ fn should_remove_transaction() { let res = txq.import(TestClient::default(), vec![tx, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); // when txq.cull(TestClient::new().with_nonce(124)); assert_eq!(txq.status().status.transaction_count, 1); - assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new().with_nonce(125), PendingSettings::all_prioritized(0, 0)).len(), 1); txq.cull(TestClient::new().with_nonce(126)); // then @@ -434,19 +434,19 @@ fn should_move_transactions_to_future_if_gap_introduced() { let res = txq.import(TestClient::new(), vec![tx3, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); let res = txq.import(TestClient::new(), vec![tx].local()); assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 3); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3); // when txq.remove(vec![&hash], true); // then assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -497,7 +497,7 @@ fn should_prefer_current_transactions_when_hitting_the_limit() { assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 1); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 1); assert_eq!(top[0].hash, hash); assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into())); @@ -545,19 +545,19 @@ fn should_accept_same_transaction_twice_if_removed() { let res = txq.import(TestClient::new(), txs.local().into_vec()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2); // when txq.remove(vec![&hash], true); assert_eq!(txq.status().status.transaction_count, 1); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 0); let res = txq.import(TestClient::new(), vec![tx1].local()); assert_eq!(res, vec![Ok(())]); // then assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2); } #[test] @@ -577,8 +577,8 @@ fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() { // then assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20)); - assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2)); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[0].signed().gas_price, U256::from(20)); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[1].signed().gas_price, U256::from(2)); } #[test] @@ -620,7 +620,7 @@ fn should_return_valid_last_nonce_after_cull() { let client = TestClient::new().with_nonce(124); txq.cull(client.clone()); // tx2 should be not be promoted to current - assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0)).len(), 0); // then assert_eq!(txq.next_nonce(client.clone(), &sender), None); @@ -718,7 +718,7 @@ fn should_accept_local_transactions_below_min_gas_price() { assert_eq!(res, vec![Ok(())]); // then - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -736,7 +736,7 @@ fn should_accept_local_service_transaction() { assert_eq!(res, vec![Ok(())]); // then - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -777,9 +777,15 @@ fn should_not_return_transactions_over_nonce_cap() { assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]); // when - let all = txq.pending(TestClient::new(), 0, 0, None); + let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); // This should invalidate the cache! - let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into())); + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: Some(123.into()), + max_len: usize::max_value(), + ordering: PendingOrdering::Priority, + }); // then @@ -787,6 +793,62 @@ fn should_not_return_transactions_over_nonce_cap() { assert_eq!(limited.len(), 1); } +#[test] +fn should_return_cached_pending_even_if_unordered_is_requested() { + // given + let txq = new_queue(); + let tx1 = Tx::default().signed(); + let (tx2_1, tx2_2)= Tx::default().signed_pair(); + let tx2_1_hash = tx2_1.hash(); + let res = txq.import(TestClient::new(), vec![tx1].unverified()); + assert_eq!(res, vec![Ok(())]); + let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local()); + assert_eq!(res, vec![Ok(()), Ok(())]); + + // when + let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); + assert_eq!(all[0].hash, tx2_1_hash); + assert_eq!(all.len(), 3); + + // This should not invalidate the cache! + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: None, + max_len: 3, + ordering: PendingOrdering::Unordered, + }); + + // then + assert_eq!(all, limited); +} + +#[test] +fn should_return_unordered_and_not_populate_the_cache() { + // given + let txq = new_queue(); + let tx1 = Tx::default().signed(); + let (tx2_1, tx2_2)= Tx::default().signed_pair(); + let res = txq.import(TestClient::new(), vec![tx1].unverified()); + assert_eq!(res, vec![Ok(())]); + let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local()); + assert_eq!(res, vec![Ok(()), Ok(())]); + + // when + // This should not invalidate the cache! + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: None, + max_len: usize::max_value(), + ordering: PendingOrdering::Unordered, + }); + + // then + assert_eq!(limited.len(), 3); + assert!(!txq.is_pending_cached()); +} + #[test] fn should_clear_cache_after_timeout_for_local() { // given @@ -800,12 +862,12 @@ fn should_clear_cache_after_timeout_for_local() { // This should populate cache and set timestamp to 1 // when - assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0); - assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1)).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1000)).len(), 0); // This should invalidate the cache and trigger transaction ready. // then - assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1002)).len(), 2); } #[test] diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 7b914f286..0bcd32e22 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -306,7 +306,7 @@ impl FullDependencies { let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); let h = client.handler(); self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() { - h.new_transactions(hashes); + h.notify_new_transactions(hashes); })); if let Some(h) = client.handler().upgrade() { @@ -527,7 +527,7 @@ impl LightDependencies { let h = client.handler(); self.transaction_queue.write().add_listener(Box::new(move |transactions| { if let Some(h) = h.upgrade() { - h.new_transactions(transactions); + h.notify_new_transactions(transactions); } })); handler.extend_with(EthPubSub::to_delegate(client)); diff --git a/parity/run.rs b/parity/run.rs index d4d1ebd55..746ea09a3 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -550,7 +550,8 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: cmd.miner_options, cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()), &spec, - Some(account_provider.clone()) + Some(account_provider.clone()), + )); miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed"); miner.set_gas_range_target(cmd.miner_extras.gas_range_target); diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index c57d9acb2..d554f8ebf 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -175,7 +175,7 @@ impl ChainNotificationHandler { } /// Notify all subscribers about new transaction hashes. - pub fn new_transactions(&self, hashes: &[H256]) { + pub fn notify_new_transactions(&self, hashes: &[H256]) { for subscriber in self.transactions_subscribers.read().values() { for hash in hashes { Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into())); diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs index c790d0195..d4bc9c602 100644 --- a/rpc/src/v1/impls/light/parity.rs +++ b/rpc/src/v1/impls/light/parity.rs @@ -264,12 +264,13 @@ impl Parity for ParityClient { .map(Into::into) } - fn pending_transactions(&self) -> Result> { + fn pending_transactions(&self, limit: Trailing) -> Result> { let txq = self.light_dispatch.transaction_queue.read(); let chain_info = self.light_dispatch.client.chain_info(); Ok( txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) .into_iter() + .take(limit.unwrap_or_else(usize::max_value)) .map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition)) .collect::>() ) diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 37d07d8bc..209958293 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -313,15 +313,19 @@ impl Parity for ParityClient where .map(Into::into) } - fn pending_transactions(&self) -> Result> { + fn pending_transactions(&self, limit: Trailing) -> Result> { let block_number = self.client.chain_info().best_block_number; - let ready_transactions = self.miner.ready_transactions(&*self.client); + let ready_transactions = self.miner.ready_transactions( + &*self.client, + limit.unwrap_or_else(usize::max_value), + miner::PendingOrdering::Priority, + ); Ok(ready_transactions .into_iter() .map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition)) .collect() - ) + ) } fn all_transactions(&self) -> Result> { diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index c53bf8085..ab6bc2fb9 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -27,7 +27,7 @@ use ethcore::engines::EthEngine; use ethcore::error::Error; use ethcore::header::{BlockNumber, Header}; use ethcore::ids::BlockId; -use ethcore::miner::{MinerService, AuthoringParams}; +use ethcore::miner::{self, MinerService, AuthoringParams}; use ethcore::receipt::{Receipt, RichReceipt}; use ethereum_types::{H256, U256, Address}; use miner::pool::local_transactions::Status as LocalTransactionStatus; @@ -215,7 +215,7 @@ impl MinerService for TestMinerService { self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect() } - fn ready_transactions(&self, _chain: &C) -> Vec> { + fn ready_transactions(&self, _chain: &C, _max_len: usize, _ordering: miner::PendingOrdering) -> Vec> { self.queued_transactions() } diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index afb796c09..1f79badca 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -183,7 +183,7 @@ fn should_subscribe_to_pending_transactions() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Send new transactions - handler.new_transactions(&[5.into(), 7.into()]); + handler.notify_new_transactions(&[5.into(), 7.into()]); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#; diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index 83d8b1981..c06997b2b 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -141,7 +141,7 @@ build_rpc_trait! { /// Returns all pending transactions from transaction queue. #[rpc(name = "parity_pendingTransactions")] - fn pending_transactions(&self) -> Result>; + fn pending_transactions(&self, Trailing) -> Result>; /// Returns all transactions from transaction queue. /// diff --git a/transaction-pool/src/pool.rs b/transaction-pool/src/pool.rs index a4e30ba0b..144d1b974 100644 --- a/transaction-pool/src/pool.rs +++ b/transaction-pool/src/pool.rs @@ -15,7 +15,8 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::collections::{HashMap, BTreeSet}; +use std::slice; +use std::collections::{hash_map, HashMap, BTreeSet}; use error; use listener::{Listener, NoopListener}; @@ -443,7 +444,16 @@ impl Pool where PendingIterator { ready, best_transactions, - pool: self + pool: self, + } + } + + /// Returns unprioritized list of ready transactions. + pub fn unordered_pending>(&self, ready: R) -> UnorderedIterator { + UnorderedIterator { + ready, + senders: self.transactions.iter(), + transactions: None, } } @@ -514,6 +524,50 @@ impl Pool where } } +/// An iterator over all pending (ready) transactions in unoredered fashion. +/// +/// NOTE: Current implementation will iterate over all transactions from particular sender +/// ordered by nonce, but that might change in the future. +/// +/// NOTE: the transactions are not removed from the queue. +/// You might remove them later by calling `cull`. +pub struct UnorderedIterator<'a, T, R, S> where + T: VerifiedTransaction + 'a, + S: Scoring + 'a, +{ + ready: R, + senders: hash_map::Iter<'a, T::Sender, Transactions>, + transactions: Option>>, +} + +impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> where + T: VerifiedTransaction, + R: Ready, + S: Scoring, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + loop { + if let Some(transactions) = self.transactions.as_mut() { + if let Some(tx) = transactions.next() { + match self.ready.is_ready(&tx) { + Readiness::Ready => { + return Some(tx.transaction.clone()); + }, + state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state), + } + } + } + + // otherwise fallback and try next sender + let next_sender = self.senders.next()?; + self.transactions = Some(next_sender.1.iter()); + } + } +} + + /// An iterator over all pending (ready) transactions. /// NOTE: the transactions are not removed from the queue. /// You might remove them later by calling `cull`. diff --git a/transaction-pool/src/tests/mod.rs b/transaction-pool/src/tests/mod.rs index 1f766b89f..b4932e590 100644 --- a/transaction-pool/src/tests/mod.rs +++ b/transaction-pool/src/tests/mod.rs @@ -259,6 +259,66 @@ fn should_construct_pending() { assert_eq!(pending.next(), None); } +#[test] +fn should_return_unordered_iterator() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap(); + let tx2 = txq.import(b.tx().nonce(2).new()).unwrap(); + let tx3 = txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + txq.import(b.tx().nonce(5).new()).unwrap(); + + let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + // gap + txq.import(b.tx().sender(1).nonce(5).new()).unwrap(); + + let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!(txq.status(NonceReady::default()), Status { + stalled: 0, + pending: 9, + future: 2, + }); + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 3, + pending: 6, + future: 2, + }); + + // when + let all: Vec<_> = txq.unordered_pending(NonceReady::default()).collect(); + + let chain1 = vec![tx0, tx1, tx2, tx3]; + let chain2 = vec![tx5, tx6, tx7, tx8]; + let chain3 = vec![tx9]; + + assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len()); + + let mut options = vec![ + vec![chain1.clone(), chain2.clone(), chain3.clone()], + vec![chain2.clone(), chain1.clone(), chain3.clone()], + vec![chain2.clone(), chain3.clone(), chain1.clone()], + vec![chain3.clone(), chain2.clone(), chain1.clone()], + vec![chain3.clone(), chain1.clone(), chain2.clone()], + vec![chain1.clone(), chain3.clone(), chain2.clone()], + ].into_iter().map(|mut v| { + let mut first = v.pop().unwrap(); + for mut x in v { + first.append(&mut x); + } + first + }); + + assert!(options.any(|opt| all == opt)); +} + #[test] fn should_update_scoring_correctly() { // given