Recently rejected cache for transaction queue (#9005)
* Store recently rejected transactions. * Don't cache AlreadyImported rejections. * Make the size of transaction verification queue dependent on pool size. * Add a test for recently rejected. * Fix logging for recently rejected. * Make rejection cache smaller. * obsolete test removed * obsolete test removed * Construct cache with_capacity.
This commit is contained in:
parent
9caa868603
commit
78e001284f
@ -87,7 +87,6 @@ pub use verification::queue::QueueInfo as BlockQueueInfo;
|
||||
|
||||
use_contract!(registry, "Registry", "res/contracts/registrar.json");
|
||||
|
||||
const MAX_TX_QUEUE_SIZE: usize = 4096;
|
||||
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
|
||||
// Max number of blocks imported at once.
|
||||
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
|
||||
@ -760,13 +759,12 @@ impl Client {
|
||||
tracedb: tracedb,
|
||||
engine: engine,
|
||||
pruning: config.pruning.clone(),
|
||||
config: config,
|
||||
db: RwLock::new(db.clone()),
|
||||
state_db: RwLock::new(state_db),
|
||||
report: RwLock::new(Default::default()),
|
||||
io_channel: Mutex::new(message_channel),
|
||||
notify: RwLock::new(Vec::new()),
|
||||
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
|
||||
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
|
||||
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
|
||||
queued_ancient_blocks: Default::default(),
|
||||
ancient_blocks_import_lock: Default::default(),
|
||||
@ -779,6 +777,7 @@ impl Client {
|
||||
registrar_address,
|
||||
exit_handler: Mutex::new(None),
|
||||
importer,
|
||||
config,
|
||||
});
|
||||
|
||||
// prune old states.
|
||||
|
@ -70,12 +70,6 @@ pub enum Mode {
|
||||
Off,
|
||||
}
|
||||
|
||||
impl Default for Mode {
|
||||
fn default() -> Self {
|
||||
Mode::Active
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Mode {
|
||||
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
|
||||
match *self {
|
||||
@ -88,7 +82,7 @@ impl Display for Mode {
|
||||
}
|
||||
|
||||
/// Client configuration. Includes configs for all sub-systems.
|
||||
#[derive(Debug, PartialEq, Default, Clone)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct ClientConfig {
|
||||
/// Block queue configuration.
|
||||
pub queue: QueueConfig,
|
||||
@ -126,8 +120,36 @@ pub struct ClientConfig {
|
||||
pub history_mem: usize,
|
||||
/// Check seal valididity on block import
|
||||
pub check_seal: bool,
|
||||
/// Maximal number of transactions queued for verification in a separate thread.
|
||||
pub transaction_verification_queue_size: usize,
|
||||
}
|
||||
|
||||
impl Default for ClientConfig {
|
||||
fn default() -> Self {
|
||||
let mb = 1024 * 1024;
|
||||
ClientConfig {
|
||||
queue: Default::default(),
|
||||
blockchain: Default::default(),
|
||||
tracing: Default::default(),
|
||||
vm_type: Default::default(),
|
||||
fat_db: false,
|
||||
pruning: journaldb::Algorithm::OverlayRecent,
|
||||
name: "default".into(),
|
||||
db_cache_size: None,
|
||||
db_compaction: Default::default(),
|
||||
db_wal: true,
|
||||
mode: Mode::Active,
|
||||
spec_name: "".into(),
|
||||
verifier_type: VerifierType::Canon,
|
||||
state_cache_size: 1 * mb,
|
||||
jump_table_size: 1 * mb,
|
||||
history: 64,
|
||||
history_mem: 32 * mb,
|
||||
check_seal: true,
|
||||
transaction_verification_queue_size: 8192,
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::{DatabaseCompactionProfile, Mode};
|
||||
@ -143,9 +165,4 @@ mod test {
|
||||
assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap());
|
||||
assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mode_default() {
|
||||
assert_eq!(Mode::default(), Mode::Active);
|
||||
}
|
||||
}
|
||||
|
@ -42,12 +42,6 @@ pub enum VerifierType {
|
||||
Noop,
|
||||
}
|
||||
|
||||
impl Default for VerifierType {
|
||||
fn default() -> Self {
|
||||
VerifierType::Canon
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new verifier based on type.
|
||||
pub fn new<C: BlockInfo + CallContract>(v: VerifierType) -> Box<Verifier<C>> {
|
||||
match v {
|
||||
|
@ -662,6 +662,18 @@ impl ChainSync {
|
||||
None
|
||||
}
|
||||
).collect();
|
||||
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Syncing with peers: {} active, {} confirmed, {} total",
|
||||
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
|
||||
);
|
||||
|
||||
if self.state == SyncState::Waiting {
|
||||
trace!(target: "sync", "Waiting for the block queue");
|
||||
} else if self.state == SyncState::SnapshotWaiting {
|
||||
trace!(target: "sync", "Waiting for the snapshot restoration");
|
||||
} else {
|
||||
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
|
||||
self.active_peers.contains(&peer_id)
|
||||
).map(|v| *v).collect();
|
||||
@ -669,14 +681,11 @@ impl ChainSync {
|
||||
random::new().shuffle(&mut peers); //TODO: sort by rating
|
||||
// prefer peers with higher protocol version
|
||||
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Syncing with peers: {} active, {} confirmed, {} total",
|
||||
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
|
||||
);
|
||||
|
||||
for (peer_id, _) in peers {
|
||||
self.sync_peer(io, peer_id, false);
|
||||
}
|
||||
}
|
||||
|
||||
if
|
||||
(self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) &&
|
||||
@ -710,14 +719,6 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Skipping busy peer {}", peer_id);
|
||||
return;
|
||||
}
|
||||
if self.state == SyncState::Waiting {
|
||||
trace!(target: "sync", "Waiting for the block queue");
|
||||
return;
|
||||
}
|
||||
if self.state == SyncState::SnapshotWaiting {
|
||||
trace!(target: "sync", "Waiting for the snapshot restoration");
|
||||
return;
|
||||
}
|
||||
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned())
|
||||
} else {
|
||||
return;
|
||||
|
@ -19,7 +19,7 @@
|
||||
use std::{cmp, fmt};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
use ethereum_types::{H256, U256, Address};
|
||||
use parking_lot::RwLock;
|
||||
@ -138,6 +138,50 @@ impl CachedPending {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RecentlyRejected {
|
||||
inner: RwLock<HashMap<H256, transaction::Error>>,
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
impl RecentlyRejected {
|
||||
fn new(limit: usize) -> Self {
|
||||
RecentlyRejected {
|
||||
limit,
|
||||
inner: RwLock::new(HashMap::with_capacity(MIN_REJECTED_CACHE_SIZE)),
|
||||
}
|
||||
}
|
||||
|
||||
fn clear(&self) {
|
||||
self.inner.write().clear();
|
||||
}
|
||||
|
||||
fn get(&self, hash: &H256) -> Option<transaction::Error> {
|
||||
self.inner.read().get(hash).cloned()
|
||||
}
|
||||
|
||||
fn insert(&self, hash: H256, err: &transaction::Error) {
|
||||
if self.inner.read().contains_key(&hash) {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut inner = self.inner.write();
|
||||
inner.insert(hash, err.clone());
|
||||
|
||||
// clean up
|
||||
if inner.len() > self.limit {
|
||||
// randomly remove half of the entries
|
||||
let to_remove: Vec<_> = inner.keys().take(self.limit / 2).cloned().collect();
|
||||
for key in to_remove {
|
||||
inner.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal size of rejection cache, by default it's equal to queue size.
|
||||
const MIN_REJECTED_CACHE_SIZE: usize = 2048;
|
||||
|
||||
/// Ethereum Transaction Queue
|
||||
///
|
||||
/// Responsible for:
|
||||
@ -150,6 +194,7 @@ pub struct TransactionQueue {
|
||||
pool: RwLock<Pool>,
|
||||
options: RwLock<verifier::Options>,
|
||||
cached_pending: RwLock<CachedPending>,
|
||||
recently_rejected: RecentlyRejected,
|
||||
}
|
||||
|
||||
impl TransactionQueue {
|
||||
@ -159,11 +204,13 @@ impl TransactionQueue {
|
||||
verification_options: verifier::Options,
|
||||
strategy: PrioritizationStrategy,
|
||||
) -> Self {
|
||||
let max_count = limits.max_count;
|
||||
TransactionQueue {
|
||||
insertion_id: Default::default(),
|
||||
pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)),
|
||||
options: RwLock::new(verification_options),
|
||||
cached_pending: RwLock::new(CachedPending::none()),
|
||||
recently_rejected: RecentlyRejected::new(cmp::max(MIN_REJECTED_CACHE_SIZE, max_count / 4)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -195,26 +242,42 @@ impl TransactionQueue {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let verifier = verifier::Verifier::new(
|
||||
client,
|
||||
options,
|
||||
self.insertion_id.clone(),
|
||||
transaction_to_replace,
|
||||
);
|
||||
|
||||
let results = transactions
|
||||
.into_iter()
|
||||
.map(|transaction| {
|
||||
if self.pool.read().find(&transaction.hash()).is_some() {
|
||||
bail!(transaction::Error::AlreadyImported)
|
||||
let hash = transaction.hash();
|
||||
|
||||
if self.pool.read().find(&hash).is_some() {
|
||||
return Err(transaction::Error::AlreadyImported);
|
||||
}
|
||||
|
||||
verifier.verify_transaction(transaction)
|
||||
if let Some(err) = self.recently_rejected.get(&hash) {
|
||||
trace!(target: "txqueue", "[{:?}] Rejecting recently rejected: {:?}", hash, err);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let imported = verifier
|
||||
.verify_transaction(transaction)
|
||||
.and_then(|verified| {
|
||||
self.pool.write().import(verified).map_err(convert_error)
|
||||
});
|
||||
|
||||
match imported {
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => {
|
||||
self.recently_rejected.insert(hash, &err);
|
||||
Err(err)
|
||||
},
|
||||
}
|
||||
})
|
||||
.map(|result| result.and_then(|verified| {
|
||||
self.pool.write().import(verified)
|
||||
.map(|_imported| ())
|
||||
.map_err(convert_error)
|
||||
}))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Notify about imported transactions.
|
||||
@ -342,6 +405,7 @@ impl TransactionQueue {
|
||||
|
||||
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
|
||||
|
||||
self.recently_rejected.clear();
|
||||
let removed = self.pool.write().cull(None, state_readiness);
|
||||
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
|
||||
}
|
||||
|
@ -894,6 +894,47 @@ fn should_avoid_verifying_transaction_already_in_pool() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_avoid_reverifying_recently_rejected_transactions() {
|
||||
// given
|
||||
let txq = TransactionQueue::new(
|
||||
txpool::Options {
|
||||
max_count: 1,
|
||||
max_per_sender: 2,
|
||||
max_mem_usage: 50
|
||||
},
|
||||
verifier::Options {
|
||||
minimal_gas_price: 1.into(),
|
||||
block_gas_limit: 1_000_000.into(),
|
||||
tx_gas_limit: 1_000_000.into(),
|
||||
},
|
||||
PrioritizationStrategy::GasPriceOnly,
|
||||
);
|
||||
|
||||
let client = TestClient::new();
|
||||
let tx1 = Tx::gas_price(10_000).signed().unverified();
|
||||
|
||||
let res = txq.import(client.clone(), vec![tx1.clone()]);
|
||||
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
|
||||
balance: 0xf67c.into(),
|
||||
cost: 0xc8458e4.into(),
|
||||
})]);
|
||||
assert_eq!(txq.status().status.transaction_count, 0);
|
||||
assert!(client.was_verification_triggered());
|
||||
|
||||
// when
|
||||
let client = TestClient::new();
|
||||
let res = txq.import(client.clone(), vec![tx1]);
|
||||
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
|
||||
balance: 0xf67c.into(),
|
||||
cost: 0xc8458e4.into(),
|
||||
})]);
|
||||
assert!(!client.was_verification_triggered());
|
||||
|
||||
// then
|
||||
assert_eq!(txq.status().status.transaction_count, 0);
|
||||
}
|
||||
|
||||
|
||||
fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
|
||||
// given
|
||||
let txq = TransactionQueue::new(
|
||||
|
@ -357,7 +357,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
|
||||
algorithm,
|
||||
cmd.pruning_history,
|
||||
cmd.pruning_memory,
|
||||
cmd.check_seal
|
||||
cmd.check_seal,
|
||||
);
|
||||
|
||||
client_config.queue.verifier_settings = cmd.verifier_settings;
|
||||
|
@ -518,6 +518,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
|
||||
// fetch service
|
||||
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
|
||||
|
||||
let txpool_size = cmd.miner_options.pool_limits.max_count;
|
||||
// create miner
|
||||
let miner = Arc::new(Miner::new(
|
||||
cmd.miner_options,
|
||||
@ -574,6 +575,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
|
||||
);
|
||||
|
||||
client_config.queue.verifier_settings = cmd.verifier_settings;
|
||||
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);
|
||||
|
||||
// set up bootnodes
|
||||
let mut net_conf = cmd.net_conf;
|
||||
|
@ -179,7 +179,7 @@ impl SnapshotCommand {
|
||||
algorithm,
|
||||
self.pruning_history,
|
||||
self.pruning_memory,
|
||||
true
|
||||
true,
|
||||
);
|
||||
|
||||
let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config);
|
||||
|
@ -122,7 +122,7 @@ impl Default for UserDefaults {
|
||||
fn default() -> Self {
|
||||
UserDefaults {
|
||||
is_first_launch: true,
|
||||
pruning: Algorithm::default(),
|
||||
pruning: Algorithm::OverlayRecent,
|
||||
tracing: false,
|
||||
fat_db: false,
|
||||
mode: Mode::Active,
|
||||
|
@ -82,10 +82,6 @@ pub enum Algorithm {
|
||||
RefCounted,
|
||||
}
|
||||
|
||||
impl Default for Algorithm {
|
||||
fn default() -> Algorithm { Algorithm::OverlayRecent }
|
||||
}
|
||||
|
||||
impl str::FromStr for Algorithm {
|
||||
type Err = String;
|
||||
|
||||
@ -183,11 +179,6 @@ mod tests {
|
||||
assert!(!Algorithm::RefCounted.is_stable());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_journal_algorithm_default() {
|
||||
assert_eq!(Algorithm::default(), Algorithm::OverlayRecent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_journal_algorithm_all_types() {
|
||||
// compiling should fail if some cases are not covered
|
||||
|
Loading…
Reference in New Issue
Block a user