A last bunch of txqueue performance optimizations (#9024)

* Clear cache only when block is enacted.

* Add tracing for cull.

* Cull split.

* Cull after creating pending block.

* Add constant, remove sync::read tracing.

* Reset debug.

* Remove excessive tracing.

* Use struct for NonceCache.

* Fix build

* Remove warnings.

* Fix build again.
This commit is contained in:
Tomasz Drwięga 2018-07-05 17:27:48 +02:00 committed by André Silva
parent 08332f1945
commit 662e76629c
7 changed files with 94 additions and 37 deletions

View File

@ -82,7 +82,7 @@ use ethcore::client::{
Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, CallContract Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, CallContract
}; };
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::{self, Miner, MinerService}; use ethcore::miner::{self, Miner, MinerService, pool_client::NonceCache};
use ethcore::trace::{Tracer, VMTracer}; use ethcore::trace::{Tracer, VMTracer};
use rustc_hex::FromHex; use rustc_hex::FromHex;
@ -94,6 +94,9 @@ use_contract!(private, "PrivateContract", "res/private.json");
/// Initialization vector length. /// Initialization vector length.
const INIT_VEC_LEN: usize = 16; const INIT_VEC_LEN: usize = 16;
/// Size of nonce cache
const NONCE_CACHE_SIZE: usize = 128;
/// Configurtion for private transaction provider /// Configurtion for private transaction provider
#[derive(Default, PartialEq, Debug, Clone)] #[derive(Default, PartialEq, Debug, Clone)]
pub struct ProviderConfig { pub struct ProviderConfig {
@ -243,7 +246,7 @@ impl Provider where {
Ok(original_transaction) Ok(original_transaction)
} }
fn pool_client<'a>(&'a self, nonce_cache: &'a RwLock<HashMap<Address, U256>>) -> miner::pool_client::PoolClient<'a, Client> { fn pool_client<'a>(&'a self, nonce_cache: &'a NonceCache) -> miner::pool_client::PoolClient<'a, Client> {
let engine = self.client.engine(); let engine = self.client.engine();
let refuse_service_transactions = true; let refuse_service_transactions = true;
miner::pool_client::PoolClient::new( miner::pool_client::PoolClient::new(
@ -262,7 +265,7 @@ impl Provider where {
/// can be replaced with a single `drain()` method instead. /// can be replaced with a single `drain()` method instead.
/// Thanks to this we also don't really need to lock the entire verification for the time of execution. /// Thanks to this we also don't really need to lock the entire verification for the time of execution.
fn process_queue(&self) -> Result<(), Error> { fn process_queue(&self) -> Result<(), Error> {
let nonce_cache = Default::default(); let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
let mut verification_queue = self.transactions_for_verification.lock(); let mut verification_queue = self.transactions_for_verification.lock();
let ready_transactions = verification_queue.ready_transactions(self.pool_client(&nonce_cache)); let ready_transactions = verification_queue.ready_transactions(self.pool_client(&nonce_cache));
for transaction in ready_transactions { for transaction in ready_transactions {
@ -583,7 +586,7 @@ impl Importer for Arc<Provider> {
trace!("Validating transaction: {:?}", original_tx); trace!("Validating transaction: {:?}", original_tx);
// Verify with the first account available // Verify with the first account available
trace!("The following account will be used for verification: {:?}", validation_account); trace!("The following account will be used for verification: {:?}", validation_account);
let nonce_cache = Default::default(); let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
self.transactions_for_verification.lock().add_transaction( self.transactions_for_verification.lock().add_transaction(
original_tx, original_tx,
contract, contract,

View File

@ -176,7 +176,7 @@ impl Default for ClientConfig {
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::{DatabaseCompactionProfile, Mode}; use super::{DatabaseCompactionProfile};
#[test] #[test]
fn test_default_compaction_profile() { fn test_default_compaction_profile() {

View File

@ -14,8 +14,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use std::collections::{BTreeMap, BTreeSet, HashSet, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc; use std::sync::Arc;
use ansi_term::Colour; use ansi_term::Colour;
@ -46,7 +47,7 @@ use client::BlockId;
use executive::contract_address; use executive::contract_address;
use header::{Header, BlockNumber}; use header::{Header, BlockNumber};
use miner; use miner;
use miner::pool_client::{PoolClient, CachedNonceClient}; use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache};
use receipt::{Receipt, RichReceipt}; use receipt::{Receipt, RichReceipt};
use spec::Spec; use spec::Spec;
use state::State; use state::State;
@ -201,7 +202,7 @@ pub struct Miner {
sealing: Mutex<SealingWork>, sealing: Mutex<SealingWork>,
params: RwLock<AuthoringParams>, params: RwLock<AuthoringParams>,
listeners: RwLock<Vec<Box<NotifyWork>>>, listeners: RwLock<Vec<Box<NotifyWork>>>,
nonce_cache: RwLock<HashMap<Address, U256>>, nonce_cache: NonceCache,
gas_pricer: Mutex<GasPricer>, gas_pricer: Mutex<GasPricer>,
options: MinerOptions, options: MinerOptions,
// TODO [ToDr] Arc is only required because of price updater // TODO [ToDr] Arc is only required because of price updater
@ -227,6 +228,7 @@ impl Miner {
let limits = options.pool_limits.clone(); let limits = options.pool_limits.clone();
let verifier_options = options.pool_verification_options.clone(); let verifier_options = options.pool_verification_options.clone();
let tx_queue_strategy = options.tx_queue_strategy; let tx_queue_strategy = options.tx_queue_strategy;
let nonce_cache_size = cmp::max(4096, limits.max_count / 4);
Miner { Miner {
sealing: Mutex::new(SealingWork { sealing: Mutex::new(SealingWork {
@ -240,7 +242,7 @@ impl Miner {
params: RwLock::new(AuthoringParams::default()), params: RwLock::new(AuthoringParams::default()),
listeners: RwLock::new(vec![]), listeners: RwLock::new(vec![]),
gas_pricer: Mutex::new(gas_pricer), gas_pricer: Mutex::new(gas_pricer),
nonce_cache: RwLock::new(HashMap::with_capacity(1024)), nonce_cache: NonceCache::new(nonce_cache_size),
options, options,
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)), transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts, accounts,
@ -844,7 +846,7 @@ impl miner::MinerService for Miner {
let chain_info = chain.chain_info(); let chain_info = chain.chain_info();
let from_queue = || self.transaction_queue.pending_hashes( let from_queue = || self.transaction_queue.pending_hashes(
|sender| self.nonce_cache.read().get(sender).cloned(), |sender| self.nonce_cache.get(sender),
); );
let from_pending = || { let from_pending = || {
@ -1080,14 +1082,15 @@ impl miner::MinerService for Miner {
if has_new_best_block { if has_new_best_block {
// Clear nonce cache // Clear nonce cache
self.nonce_cache.write().clear(); self.nonce_cache.clear();
} }
// First update gas limit in transaction queue and minimal gas price. // First update gas limit in transaction queue and minimal gas price.
let gas_limit = *chain.best_block_header().gas_limit(); let gas_limit = *chain.best_block_header().gas_limit();
self.update_transaction_queue_limits(gas_limit); self.update_transaction_queue_limits(gas_limit);
// Then import all transactions...
// Then import all transactions from retracted blocks.
let client = self.pool_client(chain); let client = self.pool_client(chain);
{ {
retracted retracted
@ -1106,11 +1109,6 @@ impl miner::MinerService for Miner {
}); });
} }
if has_new_best_block {
// ...and at the end remove the old ones
self.transaction_queue.cull(client);
}
if has_new_best_block || (imported.len() > 0 && self.options.reseal_on_uncle) { if has_new_best_block || (imported.len() > 0 && self.options.reseal_on_uncle) {
// Reset `next_allowed_reseal` in case a block is imported. // Reset `next_allowed_reseal` in case a block is imported.
// Even if min_period is high, we will always attempt to create // Even if min_period is high, we will always attempt to create
@ -1125,6 +1123,15 @@ impl miner::MinerService for Miner {
self.update_sealing(chain); self.update_sealing(chain);
} }
} }
if has_new_best_block {
// Make sure to cull transactions after we update sealing.
// Not culling won't lead to old transactions being added to the block
// (thanks to Ready), but culling can take significant amount of time,
// so best to leave it after we create some work for miners to prevent increased
// uncle rate.
self.transaction_queue.cull(client);
}
} }
fn pending_state(&self, latest_block_number: BlockNumber) -> Option<Self::State> { fn pending_state(&self, latest_block_number: BlockNumber) -> Option<Self::State> {

View File

@ -36,10 +36,32 @@ use header::Header;
use miner; use miner;
use miner::service_transaction_checker::ServiceTransactionChecker; use miner::service_transaction_checker::ServiceTransactionChecker;
type NoncesCache = RwLock<HashMap<Address, U256>>; /// Cache for state nonces.
#[derive(Debug)]
pub struct NonceCache {
nonces: RwLock<HashMap<Address, U256>>,
limit: usize
}
const MAX_NONCE_CACHE_SIZE: usize = 4096; impl NonceCache {
const EXPECTED_NONCE_CACHE_SIZE: usize = 2048; /// Create new cache with a limit of `limit` entries.
pub fn new(limit: usize) -> Self {
NonceCache {
nonces: RwLock::new(HashMap::with_capacity(limit / 2)),
limit,
}
}
/// Retrieve a cached nonce for given sender.
pub fn get(&self, sender: &Address) -> Option<U256> {
self.nonces.read().get(sender).cloned()
}
/// Clear all entries from the cache.
pub fn clear(&self) {
self.nonces.write().clear();
}
}
/// Blockchain accesss for transaction pool. /// Blockchain accesss for transaction pool.
pub struct PoolClient<'a, C: 'a> { pub struct PoolClient<'a, C: 'a> {
@ -70,7 +92,7 @@ C: BlockInfo + CallContract,
/// Creates new client given chain, nonce cache, accounts and service transaction verifier. /// Creates new client given chain, nonce cache, accounts and service transaction verifier.
pub fn new( pub fn new(
chain: &'a C, chain: &'a C,
cache: &'a NoncesCache, cache: &'a NonceCache,
engine: &'a EthEngine, engine: &'a EthEngine,
accounts: Option<&'a AccountProvider>, accounts: Option<&'a AccountProvider>,
refuse_service_transactions: bool, refuse_service_transactions: bool,
@ -161,7 +183,7 @@ impl<'a, C: 'a> NonceClient for PoolClient<'a, C> where
pub(crate) struct CachedNonceClient<'a, C: 'a> { pub(crate) struct CachedNonceClient<'a, C: 'a> {
client: &'a C, client: &'a C,
cache: &'a NoncesCache, cache: &'a NonceCache,
} }
impl<'a, C: 'a> Clone for CachedNonceClient<'a, C> { impl<'a, C: 'a> Clone for CachedNonceClient<'a, C> {
@ -176,13 +198,14 @@ impl<'a, C: 'a> Clone for CachedNonceClient<'a, C> {
impl<'a, C: 'a> fmt::Debug for CachedNonceClient<'a, C> { impl<'a, C: 'a> fmt::Debug for CachedNonceClient<'a, C> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("CachedNonceClient") fmt.debug_struct("CachedNonceClient")
.field("cache", &self.cache.read().len()) .field("cache", &self.cache.nonces.read().len())
.field("limit", &self.cache.limit)
.finish() .finish()
} }
} }
impl<'a, C: 'a> CachedNonceClient<'a, C> { impl<'a, C: 'a> CachedNonceClient<'a, C> {
pub fn new(client: &'a C, cache: &'a NoncesCache) -> Self { pub fn new(client: &'a C, cache: &'a NonceCache) -> Self {
CachedNonceClient { CachedNonceClient {
client, client,
cache, cache,
@ -194,27 +217,29 @@ impl<'a, C: 'a> NonceClient for CachedNonceClient<'a, C> where
C: Nonce + Sync, C: Nonce + Sync,
{ {
fn account_nonce(&self, address: &Address) -> U256 { fn account_nonce(&self, address: &Address) -> U256 {
if let Some(nonce) = self.cache.read().get(address) { if let Some(nonce) = self.cache.nonces.read().get(address) {
return *nonce; return *nonce;
} }
// We don't check again if cache has been populated. // We don't check again if cache has been populated.
// It's not THAT expensive to fetch the nonce from state. // It's not THAT expensive to fetch the nonce from state.
let mut cache = self.cache.write(); let mut cache = self.cache.nonces.write();
let nonce = self.client.latest_nonce(address); let nonce = self.client.latest_nonce(address);
cache.insert(*address, nonce); cache.insert(*address, nonce);
if cache.len() < MAX_NONCE_CACHE_SIZE { if cache.len() < self.cache.limit {
return nonce return nonce
} }
debug!(target: "txpool", "NonceCache: reached limit.");
trace_time!("nonce_cache:clear");
// Remove excessive amount of entries from the cache // Remove excessive amount of entries from the cache
while cache.len() > EXPECTED_NONCE_CACHE_SIZE { let to_remove: Vec<_> = cache.keys().take(self.cache.limit / 2).cloned().collect();
// Just remove random entry for x in to_remove {
if let Some(key) = cache.keys().next().cloned() { cache.remove(&x);
cache.remove(&key);
}
} }
nonce nonce
} }
} }

View File

@ -376,7 +376,6 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
} }
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
trace_time!("sync::read");
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
} }

View File

@ -40,6 +40,14 @@ type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, L
/// since it only affects transaction Condition. /// since it only affects transaction Condition.
const TIMESTAMP_CACHE: u64 = 1000; const TIMESTAMP_CACHE: u64 = 1000;
/// How many senders at once do we attempt to process while culling.
///
/// When running with huge transaction pools, culling can take significant amount of time.
/// To prevent holding `write()` lock on the pool for this long period, we split the work into
/// chunks and allow other threads to utilize the pool in the meantime.
/// This parameter controls how many (best) senders at once will be processed.
const CULL_SENDERS_CHUNK: usize = 1024;
/// Transaction queue status. /// Transaction queue status.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct Status { pub struct Status {
@ -367,10 +375,11 @@ impl TransactionQueue {
} }
/// Culls all stalled transactions from the pool. /// Culls all stalled transactions from the pool.
pub fn cull<C: client::NonceClient>( pub fn cull<C: client::NonceClient + Clone>(
&self, &self,
client: C, client: C,
) { ) {
trace_time!("pool::cull");
// We don't care about future transactions, so nonce_cap is not important. // We don't care about future transactions, so nonce_cap is not important.
let nonce_cap = None; let nonce_cap = None;
// We want to clear stale transactions from the queue as well. // We want to clear stale transactions from the queue as well.
@ -385,10 +394,19 @@ impl TransactionQueue {
current_id.checked_sub(gap) current_id.checked_sub(gap)
}; };
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
self.recently_rejected.clear(); self.recently_rejected.clear();
let removed = self.pool.write().cull(None, state_readiness);
let mut removed = 0;
let senders: Vec<_> = {
let pool = self.pool.read();
let senders = pool.senders().cloned().collect();
senders
};
for chunk in senders.chunks(CULL_SENDERS_CHUNK) {
trace_time!("pool::cull::chunk");
let state_readiness = ready::State::new(client.clone(), stale_id, nonce_cap);
removed += self.pool.write().cull(Some(chunk), state_readiness);
}
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status()); debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
} }

View File

@ -414,6 +414,11 @@ impl<T, S, L> Pool<T, S, L> where
|| self.mem_usage >= self.options.max_mem_usage || self.mem_usage >= self.options.max_mem_usage
} }
/// Returns senders ordered by priority of their transactions.
pub fn senders(&self) -> impl Iterator<Item=&T::Sender> {
self.best_transactions.iter().map(|tx| tx.transaction.sender())
}
/// Returns an iterator of pending (ready) transactions. /// Returns an iterator of pending (ready) transactions.
pub fn pending<R: Ready<T>>(&self, ready: R) -> PendingIterator<T, R, S, L> { pub fn pending<R: Ready<T>>(&self, ready: R) -> PendingIterator<T, R, S, L> {
PendingIterator { PendingIterator {