Merge branch 'td-cullio-stable' into a5-stable-lize
This commit is contained in:
commit
9218b33ba3
@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
|
|||||||
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
|
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
|
||||||
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
||||||
|
|
||||||
|
/// Max number of transactions in a single packet.
|
||||||
|
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
|
||||||
|
|
||||||
// minimum interval between updates.
|
// minimum interval between updates.
|
||||||
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
|
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
|
||||||
|
|
||||||
@ -648,7 +651,7 @@ impl LightProtocol {
|
|||||||
fn propagate_transactions(&self, io: &IoContext) {
|
fn propagate_transactions(&self, io: &IoContext) {
|
||||||
if self.capabilities.read().tx_relay { return }
|
if self.capabilities.read().tx_relay { return }
|
||||||
|
|
||||||
let ready_transactions = self.provider.ready_transactions();
|
let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE);
|
||||||
if ready_transactions.is_empty() { return }
|
if ready_transactions.is_empty() { return }
|
||||||
|
|
||||||
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
|
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
|
||||||
|
@ -173,8 +173,8 @@ impl Provider for TestProvider {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions(&self) -> Vec<PendingTransaction> {
|
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
|
||||||
self.0.client.ready_transactions()
|
self.0.client.ready_transactions(max_len)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ pub trait Provider: Send + Sync {
|
|||||||
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;
|
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;
|
||||||
|
|
||||||
/// Provide pending transactions.
|
/// Provide pending transactions.
|
||||||
fn ready_transactions(&self) -> Vec<PendingTransaction>;
|
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction>;
|
||||||
|
|
||||||
/// Provide a proof-of-execution for the given transaction proof request.
|
/// Provide a proof-of-execution for the given transaction proof request.
|
||||||
/// Returns a vector of all state items necessary to execute the transaction.
|
/// Returns a vector of all state items necessary to execute the transaction.
|
||||||
@ -283,8 +283,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
|
|||||||
.map(|(_, proof)| ::request::ExecutionResponse { items: proof })
|
.map(|(_, proof)| ::request::ExecutionResponse { items: proof })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions(&self) -> Vec<PendingTransaction> {
|
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
|
||||||
BlockChainClient::ready_transactions(self)
|
BlockChainClient::ready_transactions(self, max_len)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|tx| tx.pending().clone())
|
.map(|tx| tx.pending().clone())
|
||||||
.collect()
|
.collect()
|
||||||
@ -370,9 +370,12 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions(&self) -> Vec<PendingTransaction> {
|
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
|
||||||
let chain_info = self.chain_info();
|
let chain_info = self.chain_info();
|
||||||
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
|
let mut transactions = self.txqueue.read()
|
||||||
|
.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp);
|
||||||
|
transactions.truncate(max_len);
|
||||||
|
transactions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,6 +94,7 @@ impl ClientService {
|
|||||||
|
|
||||||
let pruning = config.pruning;
|
let pruning = config.pruning;
|
||||||
let client = Client::new(config, &spec, client_db.clone(), miner.clone(), io_service.channel())?;
|
let client = Client::new(config, &spec, client_db.clone(), miner.clone(), io_service.channel())?;
|
||||||
|
miner.set_io_channel(io_service.channel());
|
||||||
|
|
||||||
let snapshot_params = SnapServiceParams {
|
let snapshot_params = SnapServiceParams {
|
||||||
engine: spec.engine.clone(),
|
engine: spec.engine.clone(),
|
||||||
|
@ -200,7 +200,7 @@ pub struct Client {
|
|||||||
|
|
||||||
/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
|
/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
|
||||||
liveness: AtomicBool,
|
liveness: AtomicBool,
|
||||||
io_channel: Mutex<IoChannel<ClientIoMessage>>,
|
io_channel: RwLock<IoChannel<ClientIoMessage>>,
|
||||||
|
|
||||||
/// List of actors to be notified on certain chain events
|
/// List of actors to be notified on certain chain events
|
||||||
notify: RwLock<Vec<Weak<ChainNotify>>>,
|
notify: RwLock<Vec<Weak<ChainNotify>>>,
|
||||||
@ -712,7 +712,7 @@ impl Client {
|
|||||||
db: RwLock::new(db.clone()),
|
db: RwLock::new(db.clone()),
|
||||||
state_db: RwLock::new(state_db),
|
state_db: RwLock::new(state_db),
|
||||||
report: RwLock::new(Default::default()),
|
report: RwLock::new(Default::default()),
|
||||||
io_channel: Mutex::new(message_channel),
|
io_channel: RwLock::new(message_channel),
|
||||||
notify: RwLock::new(Vec::new()),
|
notify: RwLock::new(Vec::new()),
|
||||||
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
|
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
|
||||||
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
|
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
|
||||||
@ -947,7 +947,7 @@ impl Client {
|
|||||||
|
|
||||||
/// Replace io channel. Useful for testing.
|
/// Replace io channel. Useful for testing.
|
||||||
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
|
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
|
||||||
*self.io_channel.lock() = io_channel;
|
*self.io_channel.write() = io_channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a copy of the best block's state.
|
/// Get a copy of the best block's state.
|
||||||
@ -1893,8 +1893,8 @@ impl BlockChainClient for Client {
|
|||||||
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
|
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
|
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
|
||||||
self.importer.miner.ready_transactions(self)
|
self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signing_chain_id(&self) -> Option<u64> {
|
fn signing_chain_id(&self) -> Option<u64> {
|
||||||
@ -1952,7 +1952,7 @@ impl IoClient for Client {
|
|||||||
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
|
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
|
||||||
trace_time!("queue_transactions");
|
trace_time!("queue_transactions");
|
||||||
let len = transactions.len();
|
let len = transactions.len();
|
||||||
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
|
self.queue_transactions.queue(&self.io_channel.read(), len, move |client| {
|
||||||
trace_time!("import_queued_transactions");
|
trace_time!("import_queued_transactions");
|
||||||
|
|
||||||
let txs: Vec<UnverifiedTransaction> = transactions
|
let txs: Vec<UnverifiedTransaction> = transactions
|
||||||
@ -2001,7 +2001,7 @@ impl IoClient for Client {
|
|||||||
|
|
||||||
let queued = self.queued_ancient_blocks.clone();
|
let queued = self.queued_ancient_blocks.clone();
|
||||||
let lock = self.ancient_blocks_import_lock.clone();
|
let lock = self.ancient_blocks_import_lock.clone();
|
||||||
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
|
match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
|
||||||
trace_time!("import_ancient_block");
|
trace_time!("import_ancient_block");
|
||||||
// Make sure to hold the lock here to prevent importing out of order.
|
// Make sure to hold the lock here to prevent importing out of order.
|
||||||
// We use separate lock, cause we don't want to block queueing.
|
// We use separate lock, cause we don't want to block queueing.
|
||||||
@ -2033,7 +2033,7 @@ impl IoClient for Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn queue_consensus_message(&self, message: Bytes) {
|
fn queue_consensus_message(&self, message: Bytes) {
|
||||||
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
|
match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| {
|
||||||
if let Err(e) = client.engine().handle_message(&message) {
|
if let Err(e) = client.engine().handle_message(&message) {
|
||||||
debug!(target: "poa", "Invalid message received: {}", e);
|
debug!(target: "poa", "Invalid message received: {}", e);
|
||||||
}
|
}
|
||||||
@ -2142,7 +2142,14 @@ impl ImportSealedBlock for Client {
|
|||||||
route
|
route
|
||||||
};
|
};
|
||||||
let route = ChainRoute::from([route].as_ref());
|
let route = ChainRoute::from([route].as_ref());
|
||||||
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
|
self.importer.miner.chain_new_blocks(
|
||||||
|
self,
|
||||||
|
&[h.clone()],
|
||||||
|
&[],
|
||||||
|
route.enacted(),
|
||||||
|
route.retracted(),
|
||||||
|
self.engine.seals_internally().is_some(),
|
||||||
|
);
|
||||||
self.notify(|notify| {
|
self.notify(|notify| {
|
||||||
notify.new_blocks(
|
notify.new_blocks(
|
||||||
vec![h.clone()],
|
vec![h.clone()],
|
||||||
@ -2452,7 +2459,7 @@ impl IoChannelQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
|
pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
|
||||||
F: Fn(&Client) + Send + Sync + 'static,
|
F: Fn(&Client) + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
|
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
|
||||||
|
@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry;
|
|||||||
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
|
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
|
||||||
use error::ImportResult;
|
use error::ImportResult;
|
||||||
use vm::Schedule;
|
use vm::Schedule;
|
||||||
use miner::{Miner, MinerService};
|
use miner::{self, Miner, MinerService};
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use types::basic_account::BasicAccount;
|
use types::basic_account::BasicAccount;
|
||||||
use types::mode::Mode;
|
use types::mode::Mode;
|
||||||
@ -806,8 +806,8 @@ impl BlockChainClient for TestBlockChainClient {
|
|||||||
self.traces.read().clone()
|
self.traces.read().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
|
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
|
||||||
self.miner.ready_transactions(self)
|
self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signing_chain_id(&self) -> Option<u64> { None }
|
fn signing_chain_id(&self) -> Option<u64> { None }
|
||||||
|
@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
|
|||||||
fn last_hashes(&self) -> LastHashes;
|
fn last_hashes(&self) -> LastHashes;
|
||||||
|
|
||||||
/// List all transactions that are allowed into the next block.
|
/// List all transactions that are allowed into the next block.
|
||||||
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>>;
|
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>>;
|
||||||
|
|
||||||
/// Sorted list of transaction gas prices from at least last sample_size blocks.
|
/// Sorted list of transaction gas prices from at least last sample_size blocks.
|
||||||
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {
|
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {
|
||||||
|
@ -27,6 +27,7 @@ use ethcore_miner::gas_pricer::GasPricer;
|
|||||||
use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy};
|
use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy};
|
||||||
use ethcore_miner::work_notify::NotifyWork;
|
use ethcore_miner::work_notify::NotifyWork;
|
||||||
use ethereum_types::{H256, U256, Address};
|
use ethereum_types::{H256, U256, Address};
|
||||||
|
use io::IoChannel;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use transaction::{
|
use transaction::{
|
||||||
@ -43,7 +44,7 @@ use block::{ClosedBlock, IsBlock, Block, SealedBlock};
|
|||||||
use client::{
|
use client::{
|
||||||
BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce
|
BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce
|
||||||
};
|
};
|
||||||
use client::BlockId;
|
use client::{BlockId, ClientIoMessage};
|
||||||
use executive::contract_address;
|
use executive::contract_address;
|
||||||
use header::{Header, BlockNumber};
|
use header::{Header, BlockNumber};
|
||||||
use miner;
|
use miner;
|
||||||
@ -209,6 +210,7 @@ pub struct Miner {
|
|||||||
transaction_queue: Arc<TransactionQueue>,
|
transaction_queue: Arc<TransactionQueue>,
|
||||||
engine: Arc<EthEngine>,
|
engine: Arc<EthEngine>,
|
||||||
accounts: Option<Arc<AccountProvider>>,
|
accounts: Option<Arc<AccountProvider>>,
|
||||||
|
io_channel: RwLock<Option<IoChannel<ClientIoMessage>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Miner {
|
impl Miner {
|
||||||
@ -224,7 +226,12 @@ impl Miner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Creates new instance of miner Arc.
|
/// Creates new instance of miner Arc.
|
||||||
pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option<Arc<AccountProvider>>) -> Self {
|
pub fn new(
|
||||||
|
options: MinerOptions,
|
||||||
|
gas_pricer: GasPricer,
|
||||||
|
spec: &Spec,
|
||||||
|
accounts: Option<Arc<AccountProvider>>,
|
||||||
|
) -> Self {
|
||||||
let limits = options.pool_limits.clone();
|
let limits = options.pool_limits.clone();
|
||||||
let verifier_options = options.pool_verification_options.clone();
|
let verifier_options = options.pool_verification_options.clone();
|
||||||
let tx_queue_strategy = options.tx_queue_strategy;
|
let tx_queue_strategy = options.tx_queue_strategy;
|
||||||
@ -247,6 +254,7 @@ impl Miner {
|
|||||||
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
|
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
|
||||||
accounts,
|
accounts,
|
||||||
engine: spec.engine.clone(),
|
engine: spec.engine.clone(),
|
||||||
|
io_channel: RwLock::new(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,6 +274,11 @@ impl Miner {
|
|||||||
}, GasPricer::new_fixed(minimal_gas_price), spec, accounts)
|
}, GasPricer::new_fixed(minimal_gas_price), spec, accounts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets `IoChannel`
|
||||||
|
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
|
||||||
|
*self.io_channel.write() = Some(io_channel);
|
||||||
|
}
|
||||||
|
|
||||||
/// Clear all pending block states
|
/// Clear all pending block states
|
||||||
pub fn clear(&self) {
|
pub fn clear(&self) {
|
||||||
self.sealing.lock().queue.reset();
|
self.sealing.lock().queue.reset();
|
||||||
@ -368,18 +381,28 @@ impl Miner {
|
|||||||
|
|
||||||
let client = self.pool_client(chain);
|
let client = self.pool_client(chain);
|
||||||
let engine_params = self.engine.params();
|
let engine_params = self.engine.params();
|
||||||
let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
|
let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
|
||||||
let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition {
|
let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition {
|
||||||
Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into())
|
Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
// we will never need more transactions than limit divided by min gas
|
||||||
|
let max_transactions = if min_tx_gas.is_zero() {
|
||||||
|
usize::max_value()
|
||||||
|
} else {
|
||||||
|
(*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize
|
||||||
|
};
|
||||||
|
|
||||||
let pending: Vec<Arc<_>> = self.transaction_queue.pending(
|
let pending: Vec<Arc<_>> = self.transaction_queue.pending(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
chain_info.best_block_number,
|
pool::PendingSettings {
|
||||||
chain_info.best_block_timestamp,
|
block_number: chain_info.best_block_number,
|
||||||
nonce_cap,
|
current_timestamp: chain_info.best_block_timestamp,
|
||||||
|
nonce_cap,
|
||||||
|
max_len: max_transactions,
|
||||||
|
ordering: miner::PendingOrdering::Priority,
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let took_ms = |elapsed: &Duration| {
|
let took_ms = |elapsed: &Duration| {
|
||||||
@ -871,7 +894,7 @@ impl miner::MinerService for Miner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions<C>(&self, chain: &C)
|
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
|
||||||
-> Vec<Arc<VerifiedTransaction>>
|
-> Vec<Arc<VerifiedTransaction>>
|
||||||
where
|
where
|
||||||
C: ChainInfo + Nonce + Sync,
|
C: ChainInfo + Nonce + Sync,
|
||||||
@ -879,14 +902,20 @@ impl miner::MinerService for Miner {
|
|||||||
let chain_info = chain.chain_info();
|
let chain_info = chain.chain_info();
|
||||||
|
|
||||||
let from_queue = || {
|
let from_queue = || {
|
||||||
|
// We propagate transactions over the nonce cap.
|
||||||
|
// The mechanism is only to limit number of transactions in pending block
|
||||||
|
// those transactions are valid and will just be ready to be included in next block.
|
||||||
|
let nonce_cap = None;
|
||||||
|
|
||||||
self.transaction_queue.pending(
|
self.transaction_queue.pending(
|
||||||
CachedNonceClient::new(chain, &self.nonce_cache),
|
CachedNonceClient::new(chain, &self.nonce_cache),
|
||||||
chain_info.best_block_number,
|
pool::PendingSettings {
|
||||||
chain_info.best_block_timestamp,
|
block_number: chain_info.best_block_number,
|
||||||
// We propagate transactions over the nonce cap.
|
current_timestamp: chain_info.best_block_timestamp,
|
||||||
// The mechanism is only to limit number of transactions in pending block
|
nonce_cap,
|
||||||
// those transactions are valid and will just be ready to be included in next block.
|
max_len,
|
||||||
None,
|
ordering,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -896,6 +925,7 @@ impl miner::MinerService for Miner {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone()))
|
.map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone()))
|
||||||
.map(Arc::new)
|
.map(Arc::new)
|
||||||
|
.take(max_len)
|
||||||
.collect()
|
.collect()
|
||||||
}, chain_info.best_block_number)
|
}, chain_info.best_block_number)
|
||||||
};
|
};
|
||||||
@ -1130,7 +1160,32 @@ impl miner::MinerService for Miner {
|
|||||||
// (thanks to Ready), but culling can take significant amount of time,
|
// (thanks to Ready), but culling can take significant amount of time,
|
||||||
// so best to leave it after we create some work for miners to prevent increased
|
// so best to leave it after we create some work for miners to prevent increased
|
||||||
// uncle rate.
|
// uncle rate.
|
||||||
self.transaction_queue.cull(client);
|
// If the io_channel is available attempt to offload culling to a separate task
|
||||||
|
// to avoid blocking chain_new_blocks
|
||||||
|
if let Some(ref channel) = *self.io_channel.read() {
|
||||||
|
let queue = self.transaction_queue.clone();
|
||||||
|
let nonce_cache = self.nonce_cache.clone();
|
||||||
|
let engine = self.engine.clone();
|
||||||
|
let accounts = self.accounts.clone();
|
||||||
|
let refuse_service_transactions = self.options.refuse_service_transactions;
|
||||||
|
|
||||||
|
let cull = move |chain: &::client::Client| {
|
||||||
|
let client = PoolClient::new(
|
||||||
|
chain,
|
||||||
|
&nonce_cache,
|
||||||
|
&*engine,
|
||||||
|
accounts.as_ref().map(|x| &**x),
|
||||||
|
refuse_service_transactions,
|
||||||
|
);
|
||||||
|
queue.cull(client);
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = channel.send(ClientIoMessage::execute(cull)) {
|
||||||
|
warn!(target: "miner", "Error queueing cull: {:?}", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.transaction_queue.cull(client);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1160,7 +1215,7 @@ mod tests {
|
|||||||
use rustc_hex::FromHex;
|
use rustc_hex::FromHex;
|
||||||
|
|
||||||
use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock};
|
use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock};
|
||||||
use miner::MinerService;
|
use miner::{MinerService, PendingOrdering};
|
||||||
use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts};
|
use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts};
|
||||||
use transaction::{Transaction};
|
use transaction::{Transaction};
|
||||||
|
|
||||||
@ -1259,7 +1314,7 @@ mod tests {
|
|||||||
assert_eq!(res.unwrap(), ());
|
assert_eq!(res.unwrap(), ());
|
||||||
assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1);
|
assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1);
|
||||||
assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1);
|
assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1);
|
||||||
assert_eq!(miner.ready_transactions(&client).len(), 1);
|
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
|
||||||
// This method will let us know if pending block was created (before calling that method)
|
// This method will let us know if pending block was created (before calling that method)
|
||||||
assert!(!miner.prepare_pending_block(&client));
|
assert!(!miner.prepare_pending_block(&client));
|
||||||
}
|
}
|
||||||
@ -1278,7 +1333,7 @@ mod tests {
|
|||||||
assert_eq!(res.unwrap(), ());
|
assert_eq!(res.unwrap(), ());
|
||||||
assert_eq!(miner.pending_transactions(best_block), None);
|
assert_eq!(miner.pending_transactions(best_block), None);
|
||||||
assert_eq!(miner.pending_receipts(best_block), None);
|
assert_eq!(miner.pending_receipts(best_block), None);
|
||||||
assert_eq!(miner.ready_transactions(&client).len(), 1);
|
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -1297,11 +1352,11 @@ mod tests {
|
|||||||
assert_eq!(miner.pending_transactions(best_block), None);
|
assert_eq!(miner.pending_transactions(best_block), None);
|
||||||
assert_eq!(miner.pending_receipts(best_block), None);
|
assert_eq!(miner.pending_receipts(best_block), None);
|
||||||
// By default we use PendingSet::AlwaysSealing, so no transactions yet.
|
// By default we use PendingSet::AlwaysSealing, so no transactions yet.
|
||||||
assert_eq!(miner.ready_transactions(&client).len(), 0);
|
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0);
|
||||||
// This method will let us know if pending block was created (before calling that method)
|
// This method will let us know if pending block was created (before calling that method)
|
||||||
assert!(miner.prepare_pending_block(&client));
|
assert!(miner.prepare_pending_block(&client));
|
||||||
// After pending block is created we should see a transaction.
|
// After pending block is created we should see a transaction.
|
||||||
assert_eq!(miner.ready_transactions(&client).len(), 1);
|
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -26,6 +26,7 @@ pub mod pool_client;
|
|||||||
pub mod stratum;
|
pub mod stratum;
|
||||||
|
|
||||||
pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
|
pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
|
||||||
|
pub use ethcore_miner::pool::PendingOrdering;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::{BTreeSet, BTreeMap};
|
use std::collections::{BTreeSet, BTreeMap};
|
||||||
@ -173,7 +174,9 @@ pub trait MinerService : Send + Sync {
|
|||||||
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
|
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
|
||||||
///
|
///
|
||||||
/// Depending on the settings may look in transaction pool or only in pending block.
|
/// Depending on the settings may look in transaction pool or only in pending block.
|
||||||
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>>
|
/// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of
|
||||||
|
/// transactions.
|
||||||
|
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec<Arc<VerifiedTransaction>>
|
||||||
where C: ChainInfo + Nonce + Sync;
|
where C: ChainInfo + Nonce + Sync;
|
||||||
|
|
||||||
/// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet).
|
/// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet).
|
||||||
|
@ -16,8 +16,11 @@
|
|||||||
|
|
||||||
//! Blockchain access for transaction pool.
|
//! Blockchain access for transaction pool.
|
||||||
|
|
||||||
use std::fmt;
|
use std::{
|
||||||
use std::collections::HashMap;
|
collections::HashMap,
|
||||||
|
fmt,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use ethereum_types::{H256, U256, Address};
|
use ethereum_types::{H256, U256, Address};
|
||||||
use ethcore_miner::pool;
|
use ethcore_miner::pool;
|
||||||
@ -37,9 +40,9 @@ use miner;
|
|||||||
use miner::service_transaction_checker::ServiceTransactionChecker;
|
use miner::service_transaction_checker::ServiceTransactionChecker;
|
||||||
|
|
||||||
/// Cache for state nonces.
|
/// Cache for state nonces.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct NonceCache {
|
pub struct NonceCache {
|
||||||
nonces: RwLock<HashMap<Address, U256>>,
|
nonces: Arc<RwLock<HashMap<Address, U256>>>,
|
||||||
limit: usize
|
limit: usize
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,7 +50,7 @@ impl NonceCache {
|
|||||||
/// Create new cache with a limit of `limit` entries.
|
/// Create new cache with a limit of `limit` entries.
|
||||||
pub fn new(limit: usize) -> Self {
|
pub fn new(limit: usize) -> Self {
|
||||||
NonceCache {
|
NonceCache {
|
||||||
nonces: RwLock::new(HashMap::with_capacity(limit / 2)),
|
nonces: Arc::new(RwLock::new(HashMap::with_capacity(limit / 2))),
|
||||||
limit,
|
limit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ use test_helpers::{
|
|||||||
use types::filter::Filter;
|
use types::filter::Filter;
|
||||||
use ethereum_types::{U256, Address};
|
use ethereum_types::{U256, Address};
|
||||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||||
use miner::Miner;
|
use miner::{Miner, PendingOrdering};
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use views::BlockView;
|
use views::BlockView;
|
||||||
use ethkey::KeyPair;
|
use ethkey::KeyPair;
|
||||||
@ -345,12 +345,12 @@ fn does_not_propagate_delayed_transactions() {
|
|||||||
|
|
||||||
client.miner().import_own_transaction(&*client, tx0).unwrap();
|
client.miner().import_own_transaction(&*client, tx0).unwrap();
|
||||||
client.miner().import_own_transaction(&*client, tx1).unwrap();
|
client.miner().import_own_transaction(&*client, tx1).unwrap();
|
||||||
assert_eq!(0, client.ready_transactions().len());
|
assert_eq!(0, client.ready_transactions(10).len());
|
||||||
assert_eq!(0, client.miner().ready_transactions(&*client).len());
|
assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
|
||||||
push_blocks_to_client(&client, 53, 2, 2);
|
push_blocks_to_client(&client, 53, 2, 2);
|
||||||
client.flush_queue();
|
client.flush_queue();
|
||||||
assert_eq!(2, client.ready_transactions().len());
|
assert_eq!(2, client.ready_transactions(10).len());
|
||||||
assert_eq!(2, client.miner().ready_transactions(&*client).len());
|
assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -357,6 +357,10 @@ impl SyncProvider for EthSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const PEERS_TIMER: TimerToken = 0;
|
||||||
|
const SYNC_TIMER: TimerToken = 1;
|
||||||
|
const TX_TIMER: TimerToken = 2;
|
||||||
|
|
||||||
struct SyncProtocolHandler {
|
struct SyncProtocolHandler {
|
||||||
/// Shared blockchain client.
|
/// Shared blockchain client.
|
||||||
chain: Arc<BlockChainClient>,
|
chain: Arc<BlockChainClient>,
|
||||||
@ -371,7 +375,9 @@ struct SyncProtocolHandler {
|
|||||||
impl NetworkProtocolHandler for SyncProtocolHandler {
|
impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||||
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
|
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
|
||||||
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
|
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
|
||||||
io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer");
|
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
|
||||||
|
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
|
||||||
|
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -396,12 +402,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
|
||||||
trace_time!("sync::timeout");
|
trace_time!("sync::timeout");
|
||||||
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
|
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
|
||||||
self.sync.write().maintain_peers(&mut io);
|
match timer {
|
||||||
self.sync.write().maintain_sync(&mut io);
|
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
|
||||||
self.sync.write().propagate_new_transactions(&mut io);
|
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
|
||||||
|
TX_TIMER => {
|
||||||
|
self.sync.write().propagate_new_transactions(&mut io);
|
||||||
|
},
|
||||||
|
_ => warn!("Unknown timer {} triggered.", timer),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64;
|
|||||||
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
|
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
|
||||||
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
|
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
|
||||||
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
|
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
|
||||||
|
// Maximal number of transactions queried from miner to propagate.
|
||||||
|
// This set is used to diff with transactions known by the peer and
|
||||||
|
// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`.
|
||||||
|
const MAX_TRANSACTIONS_TO_QUERY: usize = 4096;
|
||||||
// Maximal number of transactions in sent in single packet.
|
// Maximal number of transactions in sent in single packet.
|
||||||
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
|
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
|
||||||
// Min number of blocks to be behind for a snapshot sync
|
// Min number of blocks to be behind for a snapshot sync
|
||||||
@ -1144,7 +1148,7 @@ pub mod tests {
|
|||||||
use super::{PeerInfo, PeerAsking};
|
use super::{PeerInfo, PeerAsking};
|
||||||
use ethcore::header::*;
|
use ethcore::header::*;
|
||||||
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
|
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
|
||||||
use ethcore::miner::MinerService;
|
use ethcore::miner::{MinerService, PendingOrdering};
|
||||||
use private_tx::NoopPrivateTxHandler;
|
use private_tx::NoopPrivateTxHandler;
|
||||||
|
|
||||||
pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
|
pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
|
||||||
@ -1357,7 +1361,7 @@ pub mod tests {
|
|||||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false);
|
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false);
|
||||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
|
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
|
||||||
assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1);
|
assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1);
|
||||||
}
|
}
|
||||||
// We need to update nonce status (because we say that the block has been imported)
|
// We need to update nonce status (because we say that the block has been imported)
|
||||||
for h in &[good_blocks[0]] {
|
for h in &[good_blocks[0]] {
|
||||||
@ -1373,7 +1377,7 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(client.miner.ready_transactions(&client).len(), 1);
|
assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -33,6 +33,7 @@ use super::{
|
|||||||
MAX_PEERS_PROPAGATION,
|
MAX_PEERS_PROPAGATION,
|
||||||
MAX_TRANSACTION_PACKET_SIZE,
|
MAX_TRANSACTION_PACKET_SIZE,
|
||||||
MAX_TRANSACTIONS_TO_PROPAGATE,
|
MAX_TRANSACTIONS_TO_PROPAGATE,
|
||||||
|
MAX_TRANSACTIONS_TO_QUERY,
|
||||||
MIN_PEERS_PROPAGATION,
|
MIN_PEERS_PROPAGATION,
|
||||||
CONSENSUS_DATA_PACKET,
|
CONSENSUS_DATA_PACKET,
|
||||||
NEW_BLOCK_HASHES_PACKET,
|
NEW_BLOCK_HASHES_PACKET,
|
||||||
@ -114,7 +115,7 @@ impl SyncPropagator {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
let transactions = io.chain().ready_transactions();
|
let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY);
|
||||||
if transactions.is_empty() {
|
if transactions.is_empty() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -35,9 +35,9 @@ extern crate transaction_pool as txpool;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate error_chain;
|
extern crate error_chain;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate trace_time;
|
|
||||||
#[macro_use]
|
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate trace_time;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
extern crate rustc_hex;
|
extern crate rustc_hex;
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
//! Transaction Pool
|
//! Transaction Pool
|
||||||
|
|
||||||
use ethereum_types::{H256, Address};
|
use ethereum_types::{U256, H256, Address};
|
||||||
use heapsize::HeapSizeOf;
|
use heapsize::HeapSizeOf;
|
||||||
use transaction;
|
use transaction;
|
||||||
use txpool;
|
use txpool;
|
||||||
@ -45,6 +45,43 @@ pub enum PrioritizationStrategy {
|
|||||||
GasPriceOnly,
|
GasPriceOnly,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transaction ordering when requesting pending set.
|
||||||
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||||
|
pub enum PendingOrdering {
|
||||||
|
/// Get pending transactions ordered by their priority (potentially expensive)
|
||||||
|
Priority,
|
||||||
|
/// Get pending transactions without any care of particular ordering (cheaper).
|
||||||
|
Unordered,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pending set query settings
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct PendingSettings {
|
||||||
|
/// Current block number (affects readiness of some transactions).
|
||||||
|
pub block_number: u64,
|
||||||
|
/// Current timestamp (affects readiness of some transactions).
|
||||||
|
pub current_timestamp: u64,
|
||||||
|
/// Nonce cap (for dust protection; EIP-168)
|
||||||
|
pub nonce_cap: Option<U256>,
|
||||||
|
/// Maximal number of transactions in pending the set.
|
||||||
|
pub max_len: usize,
|
||||||
|
/// Ordering of transactions.
|
||||||
|
pub ordering: PendingOrdering,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PendingSettings {
|
||||||
|
/// Get all transactions (no cap or len limit) prioritized.
|
||||||
|
pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self {
|
||||||
|
PendingSettings {
|
||||||
|
block_number,
|
||||||
|
current_timestamp,
|
||||||
|
nonce_cap: None,
|
||||||
|
max_len: usize::max_value(),
|
||||||
|
ordering: PendingOrdering::Priority,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Transaction priority.
|
/// Transaction priority.
|
||||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)]
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)]
|
||||||
pub(crate) enum Priority {
|
pub(crate) enum Priority {
|
||||||
|
@ -26,7 +26,10 @@ use parking_lot::RwLock;
|
|||||||
use transaction;
|
use transaction;
|
||||||
use txpool::{self, Verifier};
|
use txpool::{self, Verifier};
|
||||||
|
|
||||||
use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy};
|
use pool::{
|
||||||
|
self, scoring, verifier, client, ready, listener,
|
||||||
|
PrioritizationStrategy, PendingOrdering, PendingSettings,
|
||||||
|
};
|
||||||
use pool::local_transactions::LocalTransactionsList;
|
use pool::local_transactions::LocalTransactionsList;
|
||||||
|
|
||||||
type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger));
|
type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger));
|
||||||
@ -82,6 +85,7 @@ struct CachedPending {
|
|||||||
nonce_cap: Option<U256>,
|
nonce_cap: Option<U256>,
|
||||||
has_local_pending: bool,
|
has_local_pending: bool,
|
||||||
pending: Option<Vec<Arc<pool::VerifiedTransaction>>>,
|
pending: Option<Vec<Arc<pool::VerifiedTransaction>>>,
|
||||||
|
max_len: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CachedPending {
|
impl CachedPending {
|
||||||
@ -93,6 +97,7 @@ impl CachedPending {
|
|||||||
has_local_pending: false,
|
has_local_pending: false,
|
||||||
pending: None,
|
pending: None,
|
||||||
nonce_cap: None,
|
nonce_cap: None,
|
||||||
|
max_len: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,6 +112,7 @@ impl CachedPending {
|
|||||||
block_number: u64,
|
block_number: u64,
|
||||||
current_timestamp: u64,
|
current_timestamp: u64,
|
||||||
nonce_cap: Option<&U256>,
|
nonce_cap: Option<&U256>,
|
||||||
|
max_len: usize,
|
||||||
) -> Option<Vec<Arc<pool::VerifiedTransaction>>> {
|
) -> Option<Vec<Arc<pool::VerifiedTransaction>>> {
|
||||||
// First check if we have anything in cache.
|
// First check if we have anything in cache.
|
||||||
let pending = self.pending.as_ref()?;
|
let pending = self.pending.as_ref()?;
|
||||||
@ -131,7 +137,12 @@ impl CachedPending {
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(pending.clone())
|
// It's fine to just take a smaller subset, but not other way around.
|
||||||
|
if max_len > self.max_len {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(pending.iter().take(max_len).cloned().collect())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,7 +239,7 @@ impl TransactionQueue {
|
|||||||
transactions: Vec<verifier::Transaction>,
|
transactions: Vec<verifier::Transaction>,
|
||||||
) -> Vec<Result<(), transaction::Error>> {
|
) -> Vec<Result<(), transaction::Error>> {
|
||||||
// Run verification
|
// Run verification
|
||||||
let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import");
|
trace_time!("pool::verify_and_import");
|
||||||
let options = self.options.read().clone();
|
let options = self.options.read().clone();
|
||||||
|
|
||||||
let transaction_to_replace = {
|
let transaction_to_replace = {
|
||||||
@ -287,10 +298,10 @@ impl TransactionQueue {
|
|||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns all transactions in the queue ordered by priority.
|
/// Returns all transactions in the queue without explicit ordering.
|
||||||
pub fn all_transactions(&self) -> Vec<Arc<pool::VerifiedTransaction>> {
|
pub fn all_transactions(&self) -> Vec<Arc<pool::VerifiedTransaction>> {
|
||||||
let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready;
|
let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready;
|
||||||
self.pool.read().pending(ready).collect()
|
self.pool.read().unordered_pending(ready).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Computes unordered set of pending hashes.
|
/// Computes unordered set of pending hashes.
|
||||||
@ -314,24 +325,31 @@ impl TransactionQueue {
|
|||||||
pub fn pending<C>(
|
pub fn pending<C>(
|
||||||
&self,
|
&self,
|
||||||
client: C,
|
client: C,
|
||||||
block_number: u64,
|
settings: PendingSettings,
|
||||||
current_timestamp: u64,
|
|
||||||
nonce_cap: Option<U256>,
|
|
||||||
) -> Vec<Arc<pool::VerifiedTransaction>> where
|
) -> Vec<Arc<pool::VerifiedTransaction>> where
|
||||||
C: client::NonceClient,
|
C: client::NonceClient,
|
||||||
{
|
{
|
||||||
|
let PendingSettings { block_number, current_timestamp, nonce_cap, max_len, ordering } = settings;
|
||||||
if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) {
|
if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) {
|
||||||
return pending;
|
return pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Double check after acquiring write lock
|
// Double check after acquiring write lock
|
||||||
let mut cached_pending = self.cached_pending.write();
|
let mut cached_pending = self.cached_pending.write();
|
||||||
if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) {
|
if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) {
|
||||||
return pending;
|
return pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect());
|
// In case we don't have a cached set, but we don't care about order
|
||||||
|
// just return the unordered set.
|
||||||
|
if let PendingOrdering::Unordered = ordering {
|
||||||
|
let ready = Self::ready(client, block_number, current_timestamp, nonce_cap);
|
||||||
|
return self.pool.read().unordered_pending(ready).take(max_len).collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| {
|
||||||
|
i.take(max_len).collect()
|
||||||
|
});
|
||||||
|
|
||||||
*cached_pending = CachedPending {
|
*cached_pending = CachedPending {
|
||||||
block_number,
|
block_number,
|
||||||
@ -339,6 +357,7 @@ impl TransactionQueue {
|
|||||||
nonce_cap,
|
nonce_cap,
|
||||||
has_local_pending: self.has_local_pending_transactions(),
|
has_local_pending: self.has_local_pending_transactions(),
|
||||||
pending: Some(pending.clone()),
|
pending: Some(pending.clone()),
|
||||||
|
max_len,
|
||||||
};
|
};
|
||||||
|
|
||||||
pending
|
pending
|
||||||
@ -363,15 +382,27 @@ impl TransactionQueue {
|
|||||||
scoring::NonceAndGasPrice,
|
scoring::NonceAndGasPrice,
|
||||||
Listener,
|
Listener,
|
||||||
>) -> T,
|
>) -> T,
|
||||||
|
{
|
||||||
|
debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number);
|
||||||
|
trace_time!("pool::collect_pending");
|
||||||
|
let ready = Self::ready(client, block_number, current_timestamp, nonce_cap);
|
||||||
|
collect(self.pool.read().pending(ready))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ready<C>(
|
||||||
|
client: C,
|
||||||
|
block_number: u64,
|
||||||
|
current_timestamp: u64,
|
||||||
|
nonce_cap: Option<U256>,
|
||||||
|
) -> (ready::Condition, ready::State<C>) where
|
||||||
|
C: client::NonceClient,
|
||||||
{
|
{
|
||||||
let pending_readiness = ready::Condition::new(block_number, current_timestamp);
|
let pending_readiness = ready::Condition::new(block_number, current_timestamp);
|
||||||
// don't mark any transactions as stale at this point.
|
// don't mark any transactions as stale at this point.
|
||||||
let stale_id = None;
|
let stale_id = None;
|
||||||
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
|
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
|
||||||
|
|
||||||
let ready = (pending_readiness, state_readiness);
|
(pending_readiness, state_readiness)
|
||||||
|
|
||||||
collect(self.pool.read().pending(ready))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Culls all stalled transactions from the pool.
|
/// Culls all stalled transactions from the pool.
|
||||||
@ -523,6 +554,12 @@ impl TransactionQueue {
|
|||||||
let mut pool = self.pool.write();
|
let mut pool = self.pool.write();
|
||||||
(pool.listener_mut().1).0.add(f);
|
(pool.listener_mut().1).0.add(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if pending set is cached.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn is_pending_cached(&self) -> bool {
|
||||||
|
self.cached_pending.read().pending.is_some()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -549,7 +586,7 @@ mod tests {
|
|||||||
fn should_get_pending_transactions() {
|
fn should_get_pending_transactions() {
|
||||||
let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly);
|
let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly);
|
||||||
|
|
||||||
let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None);
|
let pending: Vec<_> = queue.pending(TestClient::default(), PendingSettings::all_prioritized(0, 0));
|
||||||
|
|
||||||
for tx in pending {
|
for tx in pending {
|
||||||
assert!(tx.signed().nonce > 0.into());
|
assert!(tx.signed().nonce > 0.into());
|
||||||
|
@ -18,7 +18,7 @@ use ethereum_types::U256;
|
|||||||
use transaction::{self, PendingTransaction};
|
use transaction::{self, PendingTransaction};
|
||||||
use txpool;
|
use txpool;
|
||||||
|
|
||||||
use pool::{verifier, TransactionQueue, PrioritizationStrategy};
|
use pool::{verifier, TransactionQueue, PrioritizationStrategy, PendingSettings, PendingOrdering};
|
||||||
|
|
||||||
pub mod tx;
|
pub mod tx;
|
||||||
pub mod client;
|
pub mod client;
|
||||||
@ -158,7 +158,7 @@ fn should_handle_same_transaction_imported_twice_with_different_state_nonces() {
|
|||||||
// and then there should be only one transaction in current (the one with higher gas_price)
|
// and then there should be only one transaction in current (the one with higher gas_price)
|
||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 1);
|
assert_eq!(txq.status().status.transaction_count, 1);
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top[0].hash, hash);
|
assert_eq!(top[0].hash, hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,7 +183,7 @@ fn should_move_all_transactions_from_future() {
|
|||||||
// then
|
// then
|
||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top[0].hash, hash);
|
assert_eq!(top[0].hash, hash);
|
||||||
assert_eq!(top[1].hash, hash2);
|
assert_eq!(top[1].hash, hash2);
|
||||||
}
|
}
|
||||||
@ -257,7 +257,7 @@ fn should_import_txs_from_same_sender() {
|
|||||||
txq.import(TestClient::new(), txs.local().into_vec());
|
txq.import(TestClient::new(), txs.local().into_vec());
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let top = txq.pending(TestClient::new(), 0 ,0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0 ,0));
|
||||||
assert_eq!(top[0].hash, hash);
|
assert_eq!(top[0].hash, hash);
|
||||||
assert_eq!(top[1].hash, hash2);
|
assert_eq!(top[1].hash, hash2);
|
||||||
assert_eq!(top.len(), 2);
|
assert_eq!(top.len(), 2);
|
||||||
@ -279,7 +279,7 @@ fn should_prioritize_local_transactions_within_same_nonce_height() {
|
|||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let top = txq.pending(client, 0, 0, None);
|
let top = txq.pending(client, PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top[0].hash, hash); // local should be first
|
assert_eq!(top[0].hash, hash); // local should be first
|
||||||
assert_eq!(top[1].hash, hash2);
|
assert_eq!(top[1].hash, hash2);
|
||||||
assert_eq!(top.len(), 2);
|
assert_eq!(top.len(), 2);
|
||||||
@ -301,7 +301,7 @@ fn should_prioritize_reimported_transactions_within_same_nonce_height() {
|
|||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top[0].hash, hash); // retracted should be first
|
assert_eq!(top[0].hash, hash); // retracted should be first
|
||||||
assert_eq!(top[1].hash, hash2);
|
assert_eq!(top[1].hash, hash2);
|
||||||
assert_eq!(top.len(), 2);
|
assert_eq!(top.len(), 2);
|
||||||
@ -320,7 +320,7 @@ fn should_not_prioritize_local_transactions_with_different_nonce_height() {
|
|||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top[0].hash, hash);
|
assert_eq!(top[0].hash, hash);
|
||||||
assert_eq!(top[1].hash, hash2);
|
assert_eq!(top[1].hash, hash2);
|
||||||
assert_eq!(top.len(), 2);
|
assert_eq!(top.len(), 2);
|
||||||
@ -338,7 +338,7 @@ fn should_put_transaction_to_futures_if_gap_detected() {
|
|||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top.len(), 1);
|
assert_eq!(top.len(), 1);
|
||||||
assert_eq!(top[0].hash, hash);
|
assert_eq!(top[0].hash, hash);
|
||||||
}
|
}
|
||||||
@ -358,9 +358,9 @@ fn should_handle_min_block() {
|
|||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top.len(), 0);
|
assert_eq!(top.len(), 0);
|
||||||
let top = txq.pending(TestClient::new(), 1, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(1, 0));
|
||||||
assert_eq!(top.len(), 2);
|
assert_eq!(top.len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -391,7 +391,7 @@ fn should_move_transactions_if_gap_filled() {
|
|||||||
let res = txq.import(TestClient::new(), vec![tx, tx2].local());
|
let res = txq.import(TestClient::new(), vec![tx, tx2].local());
|
||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
let res = txq.import(TestClient::new(), vec![tx1.local()]);
|
let res = txq.import(TestClient::new(), vec![tx1.local()]);
|
||||||
@ -399,7 +399,7 @@ fn should_move_transactions_if_gap_filled() {
|
|||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.status().status.transaction_count, 3);
|
assert_eq!(txq.status().status.transaction_count, 3);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -411,12 +411,12 @@ fn should_remove_transaction() {
|
|||||||
let res = txq.import(TestClient::default(), vec![tx, tx2].local());
|
let res = txq.import(TestClient::default(), vec![tx, tx2].local());
|
||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
txq.cull(TestClient::new().with_nonce(124));
|
txq.cull(TestClient::new().with_nonce(124));
|
||||||
assert_eq!(txq.status().status.transaction_count, 1);
|
assert_eq!(txq.status().status.transaction_count, 1);
|
||||||
assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new().with_nonce(125), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
txq.cull(TestClient::new().with_nonce(126));
|
txq.cull(TestClient::new().with_nonce(126));
|
||||||
|
|
||||||
// then
|
// then
|
||||||
@ -434,19 +434,19 @@ fn should_move_transactions_to_future_if_gap_introduced() {
|
|||||||
let res = txq.import(TestClient::new(), vec![tx3, tx2].local());
|
let res = txq.import(TestClient::new(), vec![tx3, tx2].local());
|
||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
|
|
||||||
let res = txq.import(TestClient::new(), vec![tx].local());
|
let res = txq.import(TestClient::new(), vec![tx].local());
|
||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 3);
|
assert_eq!(txq.status().status.transaction_count, 3);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
txq.remove(vec![&hash], true);
|
txq.remove(vec![&hash], true);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -497,7 +497,7 @@ fn should_prefer_current_transactions_when_hitting_the_limit() {
|
|||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 1);
|
assert_eq!(txq.status().status.transaction_count, 1);
|
||||||
|
|
||||||
let top = txq.pending(TestClient::new(), 0, 0, None);
|
let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
assert_eq!(top.len(), 1);
|
assert_eq!(top.len(), 1);
|
||||||
assert_eq!(top[0].hash, hash);
|
assert_eq!(top[0].hash, hash);
|
||||||
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into()));
|
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into()));
|
||||||
@ -545,19 +545,19 @@ fn should_accept_same_transaction_twice_if_removed() {
|
|||||||
let res = txq.import(TestClient::new(), txs.local().into_vec());
|
let res = txq.import(TestClient::new(), txs.local().into_vec());
|
||||||
assert_eq!(res, vec![Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
txq.remove(vec![&hash], true);
|
txq.remove(vec![&hash], true);
|
||||||
assert_eq!(txq.status().status.transaction_count, 1);
|
assert_eq!(txq.status().status.transaction_count, 1);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 0);
|
||||||
|
|
||||||
let res = txq.import(TestClient::new(), vec![tx1].local());
|
let res = txq.import(TestClient::new(), vec![tx1].local());
|
||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -577,8 +577,8 @@ fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() {
|
|||||||
// then
|
// then
|
||||||
assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]);
|
assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]);
|
||||||
assert_eq!(txq.status().status.transaction_count, 2);
|
assert_eq!(txq.status().status.transaction_count, 2);
|
||||||
assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20));
|
assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[0].signed().gas_price, U256::from(20));
|
||||||
assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2));
|
assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[1].signed().gas_price, U256::from(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -620,7 +620,7 @@ fn should_return_valid_last_nonce_after_cull() {
|
|||||||
let client = TestClient::new().with_nonce(124);
|
let client = TestClient::new().with_nonce(124);
|
||||||
txq.cull(client.clone());
|
txq.cull(client.clone());
|
||||||
// tx2 should be not be promoted to current
|
// tx2 should be not be promoted to current
|
||||||
assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0);
|
assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0)).len(), 0);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.next_nonce(client.clone(), &sender), None);
|
assert_eq!(txq.next_nonce(client.clone(), &sender), None);
|
||||||
@ -718,7 +718,7 @@ fn should_accept_local_transactions_below_min_gas_price() {
|
|||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -736,7 +736,7 @@ fn should_accept_local_service_transaction() {
|
|||||||
assert_eq!(res, vec![Ok(())]);
|
assert_eq!(res, vec![Ok(())]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -777,9 +777,15 @@ fn should_not_return_transactions_over_nonce_cap() {
|
|||||||
assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]);
|
assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
let all = txq.pending(TestClient::new(), 0, 0, None);
|
let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
// This should invalidate the cache!
|
// This should invalidate the cache!
|
||||||
let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into()));
|
let limited = txq.pending(TestClient::new(), PendingSettings {
|
||||||
|
block_number: 0,
|
||||||
|
current_timestamp: 0,
|
||||||
|
nonce_cap: Some(123.into()),
|
||||||
|
max_len: usize::max_value(),
|
||||||
|
ordering: PendingOrdering::Priority,
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
// then
|
// then
|
||||||
@ -787,6 +793,62 @@ fn should_not_return_transactions_over_nonce_cap() {
|
|||||||
assert_eq!(limited.len(), 1);
|
assert_eq!(limited.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_return_cached_pending_even_if_unordered_is_requested() {
|
||||||
|
// given
|
||||||
|
let txq = new_queue();
|
||||||
|
let tx1 = Tx::default().signed();
|
||||||
|
let (tx2_1, tx2_2)= Tx::default().signed_pair();
|
||||||
|
let tx2_1_hash = tx2_1.hash();
|
||||||
|
let res = txq.import(TestClient::new(), vec![tx1].unverified());
|
||||||
|
assert_eq!(res, vec![Ok(())]);
|
||||||
|
let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local());
|
||||||
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
|
|
||||||
|
// when
|
||||||
|
let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0));
|
||||||
|
assert_eq!(all[0].hash, tx2_1_hash);
|
||||||
|
assert_eq!(all.len(), 3);
|
||||||
|
|
||||||
|
// This should not invalidate the cache!
|
||||||
|
let limited = txq.pending(TestClient::new(), PendingSettings {
|
||||||
|
block_number: 0,
|
||||||
|
current_timestamp: 0,
|
||||||
|
nonce_cap: None,
|
||||||
|
max_len: 3,
|
||||||
|
ordering: PendingOrdering::Unordered,
|
||||||
|
});
|
||||||
|
|
||||||
|
// then
|
||||||
|
assert_eq!(all, limited);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_return_unordered_and_not_populate_the_cache() {
|
||||||
|
// given
|
||||||
|
let txq = new_queue();
|
||||||
|
let tx1 = Tx::default().signed();
|
||||||
|
let (tx2_1, tx2_2)= Tx::default().signed_pair();
|
||||||
|
let res = txq.import(TestClient::new(), vec![tx1].unverified());
|
||||||
|
assert_eq!(res, vec![Ok(())]);
|
||||||
|
let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local());
|
||||||
|
assert_eq!(res, vec![Ok(()), Ok(())]);
|
||||||
|
|
||||||
|
// when
|
||||||
|
// This should not invalidate the cache!
|
||||||
|
let limited = txq.pending(TestClient::new(), PendingSettings {
|
||||||
|
block_number: 0,
|
||||||
|
current_timestamp: 0,
|
||||||
|
nonce_cap: None,
|
||||||
|
max_len: usize::max_value(),
|
||||||
|
ordering: PendingOrdering::Unordered,
|
||||||
|
});
|
||||||
|
|
||||||
|
// then
|
||||||
|
assert_eq!(limited.len(), 3);
|
||||||
|
assert!(!txq.is_pending_cached());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_clear_cache_after_timeout_for_local() {
|
fn should_clear_cache_after_timeout_for_local() {
|
||||||
// given
|
// given
|
||||||
@ -800,12 +862,12 @@ fn should_clear_cache_after_timeout_for_local() {
|
|||||||
|
|
||||||
// This should populate cache and set timestamp to 1
|
// This should populate cache and set timestamp to 1
|
||||||
// when
|
// when
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1)).len(), 0);
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1000)).len(), 0);
|
||||||
|
|
||||||
// This should invalidate the cache and trigger transaction ready.
|
// This should invalidate the cache and trigger transaction ready.
|
||||||
// then
|
// then
|
||||||
assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2);
|
assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1002)).len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -306,7 +306,7 @@ impl FullDependencies {
|
|||||||
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone());
|
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone());
|
||||||
let h = client.handler();
|
let h = client.handler();
|
||||||
self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() {
|
self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() {
|
||||||
h.new_transactions(hashes);
|
h.notify_new_transactions(hashes);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
if let Some(h) = client.handler().upgrade() {
|
if let Some(h) = client.handler().upgrade() {
|
||||||
@ -527,7 +527,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
|
|||||||
let h = client.handler();
|
let h = client.handler();
|
||||||
self.transaction_queue.write().add_listener(Box::new(move |transactions| {
|
self.transaction_queue.write().add_listener(Box::new(move |transactions| {
|
||||||
if let Some(h) = h.upgrade() {
|
if let Some(h) = h.upgrade() {
|
||||||
h.new_transactions(transactions);
|
h.notify_new_transactions(transactions);
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
handler.extend_with(EthPubSub::to_delegate(client));
|
handler.extend_with(EthPubSub::to_delegate(client));
|
||||||
|
@ -550,7 +550,8 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
|
|||||||
cmd.miner_options,
|
cmd.miner_options,
|
||||||
cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()),
|
cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()),
|
||||||
&spec,
|
&spec,
|
||||||
Some(account_provider.clone())
|
Some(account_provider.clone()),
|
||||||
|
|
||||||
));
|
));
|
||||||
miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed");
|
miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed");
|
||||||
miner.set_gas_range_target(cmd.miner_extras.gas_range_target);
|
miner.set_gas_range_target(cmd.miner_extras.gas_range_target);
|
||||||
|
@ -175,7 +175,7 @@ impl<C> ChainNotificationHandler<C> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Notify all subscribers about new transaction hashes.
|
/// Notify all subscribers about new transaction hashes.
|
||||||
pub fn new_transactions(&self, hashes: &[H256]) {
|
pub fn notify_new_transactions(&self, hashes: &[H256]) {
|
||||||
for subscriber in self.transactions_subscribers.read().values() {
|
for subscriber in self.transactions_subscribers.read().values() {
|
||||||
for hash in hashes {
|
for hash in hashes {
|
||||||
Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into()));
|
Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into()));
|
||||||
|
@ -264,12 +264,13 @@ impl Parity for ParityClient {
|
|||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pending_transactions(&self) -> Result<Vec<Transaction>> {
|
fn pending_transactions(&self, limit: Trailing<usize>) -> Result<Vec<Transaction>> {
|
||||||
let txq = self.light_dispatch.transaction_queue.read();
|
let txq = self.light_dispatch.transaction_queue.read();
|
||||||
let chain_info = self.light_dispatch.client.chain_info();
|
let chain_info = self.light_dispatch.client.chain_info();
|
||||||
Ok(
|
Ok(
|
||||||
txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
|
txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
.take(limit.unwrap_or_else(usize::max_value))
|
||||||
.map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition))
|
.map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
)
|
)
|
||||||
|
@ -313,15 +313,19 @@ impl<C, M, U, S> Parity for ParityClient<C, M, U> where
|
|||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pending_transactions(&self) -> Result<Vec<Transaction>> {
|
fn pending_transactions(&self, limit: Trailing<usize>) -> Result<Vec<Transaction>> {
|
||||||
let block_number = self.client.chain_info().best_block_number;
|
let block_number = self.client.chain_info().best_block_number;
|
||||||
let ready_transactions = self.miner.ready_transactions(&*self.client);
|
let ready_transactions = self.miner.ready_transactions(
|
||||||
|
&*self.client,
|
||||||
|
limit.unwrap_or_else(usize::max_value),
|
||||||
|
miner::PendingOrdering::Priority,
|
||||||
|
);
|
||||||
|
|
||||||
Ok(ready_transactions
|
Ok(ready_transactions
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition))
|
.map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition))
|
||||||
.collect()
|
.collect()
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_transactions(&self) -> Result<Vec<Transaction>> {
|
fn all_transactions(&self) -> Result<Vec<Transaction>> {
|
||||||
|
@ -27,7 +27,7 @@ use ethcore::engines::EthEngine;
|
|||||||
use ethcore::error::Error;
|
use ethcore::error::Error;
|
||||||
use ethcore::header::{BlockNumber, Header};
|
use ethcore::header::{BlockNumber, Header};
|
||||||
use ethcore::ids::BlockId;
|
use ethcore::ids::BlockId;
|
||||||
use ethcore::miner::{MinerService, AuthoringParams};
|
use ethcore::miner::{self, MinerService, AuthoringParams};
|
||||||
use ethcore::receipt::{Receipt, RichReceipt};
|
use ethcore::receipt::{Receipt, RichReceipt};
|
||||||
use ethereum_types::{H256, U256, Address};
|
use ethereum_types::{H256, U256, Address};
|
||||||
use miner::pool::local_transactions::Status as LocalTransactionStatus;
|
use miner::pool::local_transactions::Status as LocalTransactionStatus;
|
||||||
@ -215,7 +215,7 @@ impl MinerService for TestMinerService {
|
|||||||
self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect()
|
self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ready_transactions<C>(&self, _chain: &C) -> Vec<Arc<VerifiedTransaction>> {
|
fn ready_transactions<C>(&self, _chain: &C, _max_len: usize, _ordering: miner::PendingOrdering) -> Vec<Arc<VerifiedTransaction>> {
|
||||||
self.queued_transactions()
|
self.queued_transactions()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,7 +183,7 @@ fn should_subscribe_to_pending_transactions() {
|
|||||||
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||||
|
|
||||||
// Send new transactions
|
// Send new transactions
|
||||||
handler.new_transactions(&[5.into(), 7.into()]);
|
handler.notify_new_transactions(&[5.into(), 7.into()]);
|
||||||
|
|
||||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#;
|
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#;
|
||||||
|
@ -141,7 +141,7 @@ build_rpc_trait! {
|
|||||||
|
|
||||||
/// Returns all pending transactions from transaction queue.
|
/// Returns all pending transactions from transaction queue.
|
||||||
#[rpc(name = "parity_pendingTransactions")]
|
#[rpc(name = "parity_pendingTransactions")]
|
||||||
fn pending_transactions(&self) -> Result<Vec<Transaction>>;
|
fn pending_transactions(&self, Trailing<usize>) -> Result<Vec<Transaction>>;
|
||||||
|
|
||||||
/// Returns all transactions from transaction queue.
|
/// Returns all transactions from transaction queue.
|
||||||
///
|
///
|
||||||
|
@ -15,7 +15,8 @@
|
|||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::{HashMap, BTreeSet};
|
use std::slice;
|
||||||
|
use std::collections::{hash_map, HashMap, BTreeSet};
|
||||||
|
|
||||||
use error;
|
use error;
|
||||||
use listener::{Listener, NoopListener};
|
use listener::{Listener, NoopListener};
|
||||||
@ -443,7 +444,16 @@ impl<T, S, L> Pool<T, S, L> where
|
|||||||
PendingIterator {
|
PendingIterator {
|
||||||
ready,
|
ready,
|
||||||
best_transactions,
|
best_transactions,
|
||||||
pool: self
|
pool: self,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns unprioritized list of ready transactions.
|
||||||
|
pub fn unordered_pending<R: Ready<T>>(&self, ready: R) -> UnorderedIterator<T, R, S> {
|
||||||
|
UnorderedIterator {
|
||||||
|
ready,
|
||||||
|
senders: self.transactions.iter(),
|
||||||
|
transactions: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,6 +524,50 @@ impl<T, S, L> Pool<T, S, L> where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An iterator over all pending (ready) transactions in unoredered fashion.
|
||||||
|
///
|
||||||
|
/// NOTE: Current implementation will iterate over all transactions from particular sender
|
||||||
|
/// ordered by nonce, but that might change in the future.
|
||||||
|
///
|
||||||
|
/// NOTE: the transactions are not removed from the queue.
|
||||||
|
/// You might remove them later by calling `cull`.
|
||||||
|
pub struct UnorderedIterator<'a, T, R, S> where
|
||||||
|
T: VerifiedTransaction + 'a,
|
||||||
|
S: Scoring<T> + 'a,
|
||||||
|
{
|
||||||
|
ready: R,
|
||||||
|
senders: hash_map::Iter<'a, T::Sender, Transactions<T, S>>,
|
||||||
|
transactions: Option<slice::Iter<'a, Transaction<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> where
|
||||||
|
T: VerifiedTransaction,
|
||||||
|
R: Ready<T>,
|
||||||
|
S: Scoring<T>,
|
||||||
|
{
|
||||||
|
type Item = Arc<T>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
loop {
|
||||||
|
if let Some(transactions) = self.transactions.as_mut() {
|
||||||
|
if let Some(tx) = transactions.next() {
|
||||||
|
match self.ready.is_ready(&tx) {
|
||||||
|
Readiness::Ready => {
|
||||||
|
return Some(tx.transaction.clone());
|
||||||
|
},
|
||||||
|
state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise fallback and try next sender
|
||||||
|
let next_sender = self.senders.next()?;
|
||||||
|
self.transactions = Some(next_sender.1.iter());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// An iterator over all pending (ready) transactions.
|
/// An iterator over all pending (ready) transactions.
|
||||||
/// NOTE: the transactions are not removed from the queue.
|
/// NOTE: the transactions are not removed from the queue.
|
||||||
/// You might remove them later by calling `cull`.
|
/// You might remove them later by calling `cull`.
|
||||||
|
@ -259,6 +259,66 @@ fn should_construct_pending() {
|
|||||||
assert_eq!(pending.next(), None);
|
assert_eq!(pending.next(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_return_unordered_iterator() {
|
||||||
|
// given
|
||||||
|
let b = TransactionBuilder::default();
|
||||||
|
let mut txq = TestPool::default();
|
||||||
|
|
||||||
|
let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
|
||||||
|
let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
|
||||||
|
let tx2 = txq.import(b.tx().nonce(2).new()).unwrap();
|
||||||
|
let tx3 = txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap();
|
||||||
|
//gap
|
||||||
|
txq.import(b.tx().nonce(5).new()).unwrap();
|
||||||
|
|
||||||
|
let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
|
||||||
|
let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
|
||||||
|
let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap();
|
||||||
|
let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap();
|
||||||
|
// gap
|
||||||
|
txq.import(b.tx().sender(1).nonce(5).new()).unwrap();
|
||||||
|
|
||||||
|
let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap();
|
||||||
|
assert_eq!(txq.light_status().transaction_count, 11);
|
||||||
|
assert_eq!(txq.status(NonceReady::default()), Status {
|
||||||
|
stalled: 0,
|
||||||
|
pending: 9,
|
||||||
|
future: 2,
|
||||||
|
});
|
||||||
|
assert_eq!(txq.status(NonceReady::new(1)), Status {
|
||||||
|
stalled: 3,
|
||||||
|
pending: 6,
|
||||||
|
future: 2,
|
||||||
|
});
|
||||||
|
|
||||||
|
// when
|
||||||
|
let all: Vec<_> = txq.unordered_pending(NonceReady::default()).collect();
|
||||||
|
|
||||||
|
let chain1 = vec![tx0, tx1, tx2, tx3];
|
||||||
|
let chain2 = vec![tx5, tx6, tx7, tx8];
|
||||||
|
let chain3 = vec![tx9];
|
||||||
|
|
||||||
|
assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len());
|
||||||
|
|
||||||
|
let mut options = vec![
|
||||||
|
vec![chain1.clone(), chain2.clone(), chain3.clone()],
|
||||||
|
vec![chain2.clone(), chain1.clone(), chain3.clone()],
|
||||||
|
vec![chain2.clone(), chain3.clone(), chain1.clone()],
|
||||||
|
vec![chain3.clone(), chain2.clone(), chain1.clone()],
|
||||||
|
vec![chain3.clone(), chain1.clone(), chain2.clone()],
|
||||||
|
vec![chain1.clone(), chain3.clone(), chain2.clone()],
|
||||||
|
].into_iter().map(|mut v| {
|
||||||
|
let mut first = v.pop().unwrap();
|
||||||
|
for mut x in v {
|
||||||
|
first.append(&mut x);
|
||||||
|
}
|
||||||
|
first
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(options.any(|opt| all == opt));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_update_scoring_correctly() {
|
fn should_update_scoring_correctly() {
|
||||||
// given
|
// given
|
||||||
|
Loading…
Reference in New Issue
Block a user