Recently rejected cache for transaction queue (#9005)

* Store recently rejected transactions.

* Don't cache AlreadyImported rejections.

* Make the size of transaction verification queue dependent on pool size.

* Add a test for recently rejected.

* Fix logging for recently rejected.

* Make rejection cache smaller.

* obsolete test removed

* obsolete test removed

* Construct cache with_capacity.
This commit is contained in:
Tomasz Drwięga 2018-07-02 19:00:06 +02:00 committed by André Silva
parent a057532ee3
commit 9438afde32
11 changed files with 168 additions and 59 deletions

View File

@ -86,7 +86,6 @@ pub use verification::queue::QueueInfo as BlockQueueInfo;
use_contract!(registry, "Registry", "res/contracts/registrar.json"); use_contract!(registry, "Registry", "res/contracts/registrar.json");
const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once. // Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4; const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
@ -710,13 +709,12 @@ impl Client {
tracedb: tracedb, tracedb: tracedb,
engine: engine, engine: engine,
pruning: config.pruning.clone(), pruning: config.pruning.clone(),
config: config, db: RwLock::new(db.clone()),
db: RwLock::new(db),
state_db: RwLock::new(state_db), state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel), io_channel: Mutex::new(message_channel),
notify: RwLock::new(Vec::new()), notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
queued_ancient_blocks: Default::default(), queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(), ancient_blocks_import_lock: Default::default(),
@ -729,6 +727,7 @@ impl Client {
registrar_address, registrar_address,
exit_handler: Mutex::new(None), exit_handler: Mutex::new(None),
importer, importer,
config,
}); });
// prune old states. // prune old states.

View File

@ -71,12 +71,6 @@ pub enum Mode {
Off, Off,
} }
impl Default for Mode {
fn default() -> Self {
Mode::Active
}
}
impl Display for Mode { impl Display for Mode {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match *self { match *self {
@ -112,7 +106,7 @@ impl From<IpcMode> for Mode {
/// Client configuration. Includes configs for all sub-systems. /// Client configuration. Includes configs for all sub-systems.
#[derive(Debug, PartialEq, Default)] #[derive(Debug, PartialEq, Clone)]
pub struct ClientConfig { pub struct ClientConfig {
/// Block queue configuration. /// Block queue configuration.
pub queue: QueueConfig, pub queue: QueueConfig,
@ -150,8 +144,36 @@ pub struct ClientConfig {
pub history_mem: usize, pub history_mem: usize,
/// Check seal valididity on block import /// Check seal valididity on block import
pub check_seal: bool, pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize,
} }
impl Default for ClientConfig {
fn default() -> Self {
let mb = 1024 * 1024;
ClientConfig {
queue: Default::default(),
blockchain: Default::default(),
tracing: Default::default(),
vm_type: Default::default(),
fat_db: false,
pruning: journaldb::Algorithm::OverlayRecent,
name: "default".into(),
db_cache_size: None,
db_compaction: Default::default(),
db_wal: true,
mode: Mode::Active,
spec_name: "".into(),
verifier_type: VerifierType::Canon,
state_cache_size: 1 * mb,
jump_table_size: 1 * mb,
history: 64,
history_mem: 32 * mb,
check_seal: true,
transaction_verification_queue_size: 8192,
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::{DatabaseCompactionProfile, Mode}; use super::{DatabaseCompactionProfile, Mode};
@ -167,9 +189,4 @@ mod test {
assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap()); assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap());
assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap()); assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap());
} }
#[test]
fn test_mode_default() {
assert_eq!(Mode::default(), Mode::Active);
}
} }

View File

@ -42,12 +42,6 @@ pub enum VerifierType {
Noop, Noop,
} }
impl Default for VerifierType {
fn default() -> Self {
VerifierType::Canon
}
}
/// Create a new verifier based on type. /// Create a new verifier based on type.
pub fn new<C: BlockInfo + CallContract>(v: VerifierType) -> Box<Verifier<C>> { pub fn new<C: BlockInfo + CallContract>(v: VerifierType) -> Box<Verifier<C>> {
match v { match v {

View File

@ -658,6 +658,18 @@ impl ChainSync {
None None
} }
).collect(); ).collect();
trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)| let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id) self.active_peers.contains(&peer_id)
).map(|v| *v).collect(); ).map(|v| *v).collect();
@ -665,14 +677,11 @@ impl ChainSync {
random::new().shuffle(&mut peers); //TODO: sort by rating random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version // prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);
for (peer_id, _) in peers { for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false); self.sync_peer(io, peer_id, false);
} }
}
if if
(self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) &&
@ -706,14 +715,6 @@ impl ChainSync {
trace!(target: "sync", "Skipping busy peer {}", peer_id); trace!(target: "sync", "Skipping busy peer {}", peer_id);
return; return;
} }
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
return;
}
if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
return;
}
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned()) (peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned())
} else { } else {
return; return;

View File

@ -19,7 +19,7 @@
use std::{cmp, fmt}; use std::{cmp, fmt};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize}; use std::sync::atomic::{self, AtomicUsize};
use std::collections::BTreeMap; use std::collections::{BTreeMap, HashMap};
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock; use parking_lot::RwLock;
@ -127,6 +127,50 @@ impl CachedPending {
} }
} }
#[derive(Debug)]
struct RecentlyRejected {
inner: RwLock<HashMap<H256, transaction::Error>>,
limit: usize,
}
impl RecentlyRejected {
fn new(limit: usize) -> Self {
RecentlyRejected {
limit,
inner: RwLock::new(HashMap::with_capacity(MIN_REJECTED_CACHE_SIZE)),
}
}
fn clear(&self) {
self.inner.write().clear();
}
fn get(&self, hash: &H256) -> Option<transaction::Error> {
self.inner.read().get(hash).cloned()
}
fn insert(&self, hash: H256, err: &transaction::Error) {
if self.inner.read().contains_key(&hash) {
return;
}
let mut inner = self.inner.write();
inner.insert(hash, err.clone());
// clean up
if inner.len() > self.limit {
// randomly remove half of the entries
let to_remove: Vec<_> = inner.keys().take(self.limit / 2).cloned().collect();
for key in to_remove {
inner.remove(&key);
}
}
}
}
/// Minimal size of rejection cache, by default it's equal to queue size.
const MIN_REJECTED_CACHE_SIZE: usize = 2048;
/// Ethereum Transaction Queue /// Ethereum Transaction Queue
/// ///
/// Responsible for: /// Responsible for:
@ -139,6 +183,7 @@ pub struct TransactionQueue {
pool: RwLock<Pool>, pool: RwLock<Pool>,
options: RwLock<verifier::Options>, options: RwLock<verifier::Options>,
cached_pending: RwLock<CachedPending>, cached_pending: RwLock<CachedPending>,
recently_rejected: RecentlyRejected,
} }
impl TransactionQueue { impl TransactionQueue {
@ -148,11 +193,13 @@ impl TransactionQueue {
verification_options: verifier::Options, verification_options: verifier::Options,
strategy: PrioritizationStrategy, strategy: PrioritizationStrategy,
) -> Self { ) -> Self {
let max_count = limits.max_count;
TransactionQueue { TransactionQueue {
insertion_id: Default::default(), insertion_id: Default::default(),
pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)), pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)),
options: RwLock::new(verification_options), options: RwLock::new(verification_options),
cached_pending: RwLock::new(CachedPending::none()), cached_pending: RwLock::new(CachedPending::none()),
recently_rejected: RecentlyRejected::new(cmp::max(MIN_REJECTED_CACHE_SIZE, max_count / 4)),
} }
} }
@ -184,26 +231,42 @@ impl TransactionQueue {
None None
} }
}; };
let verifier = verifier::Verifier::new( let verifier = verifier::Verifier::new(
client, client,
options, options,
self.insertion_id.clone(), self.insertion_id.clone(),
transaction_to_replace, transaction_to_replace,
); );
let results = transactions let results = transactions
.into_iter() .into_iter()
.map(|transaction| { .map(|transaction| {
if self.pool.read().find(&transaction.hash()).is_some() { let hash = transaction.hash();
bail!(transaction::Error::AlreadyImported)
if self.pool.read().find(&hash).is_some() {
return Err(transaction::Error::AlreadyImported);
} }
verifier.verify_transaction(transaction) if let Some(err) = self.recently_rejected.get(&hash) {
trace!(target: "txqueue", "[{:?}] Rejecting recently rejected: {:?}", hash, err);
return Err(err);
}
let imported = verifier
.verify_transaction(transaction)
.and_then(|verified| {
self.pool.write().import(verified).map_err(convert_error)
});
match imported {
Ok(_) => Ok(()),
Err(err) => {
self.recently_rejected.insert(hash, &err);
Err(err)
},
}
}) })
.map(|result| result.and_then(|verified| {
self.pool.write().import(verified)
.map(|_imported| ())
.map_err(convert_error)
}))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Notify about imported transactions. // Notify about imported transactions.
@ -311,6 +374,7 @@ impl TransactionQueue {
let state_readiness = ready::State::new(client, stale_id, nonce_cap); let state_readiness = ready::State::new(client, stale_id, nonce_cap);
self.recently_rejected.clear();
let removed = self.pool.write().cull(None, state_readiness); let removed = self.pool.write().cull(None, state_readiness);
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status()); debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
} }

View File

@ -884,6 +884,47 @@ fn should_avoid_verifying_transaction_already_in_pool() {
} }
#[test] #[test]
fn should_avoid_reverifying_recently_rejected_transactions() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new();
let tx1 = Tx::gas_price(10_000).signed().unverified();
let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert_eq!(txq.status().status.transaction_count, 0);
assert!(client.was_verification_triggered());
// when
let client = TestClient::new();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert!(!client.was_verification_triggered());
// then
assert_eq!(txq.status().status.transaction_count, 0);
}
fn should_reject_early_in_case_gas_price_is_less_than_min_effective() { fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
// given // given
let txq = TransactionQueue::new( let txq = TransactionQueue::new(

View File

@ -362,7 +362,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
algorithm, algorithm,
cmd.pruning_history, cmd.pruning_history,
cmd.pruning_memory, cmd.pruning_memory,
cmd.check_seal cmd.check_seal,
); );
client_config.queue.verifier_settings = cmd.verifier_settings; client_config.queue.verifier_settings = cmd.verifier_settings;

View File

@ -544,6 +544,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
// fetch service // fetch service
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?; let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let txpool_size = cmd.miner_options.pool_limits.max_count;
// create miner // create miner
let miner = Arc::new(Miner::new( let miner = Arc::new(Miner::new(
cmd.miner_options, cmd.miner_options,
@ -600,6 +601,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
); );
client_config.queue.verifier_settings = cmd.verifier_settings; client_config.queue.verifier_settings = cmd.verifier_settings;
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);
// set up bootnodes // set up bootnodes
let mut net_conf = cmd.net_conf; let mut net_conf = cmd.net_conf;

View File

@ -182,7 +182,7 @@ impl SnapshotCommand {
algorithm, algorithm,
self.pruning_history, self.pruning_history,
self.pruning_memory, self.pruning_memory,
true true,
); );
let client_db = db::open_client_db(&client_path, &client_config)?; let client_db = db::open_client_db(&client_path, &client_config)?;

View File

@ -122,7 +122,7 @@ impl Default for UserDefaults {
fn default() -> Self { fn default() -> Self {
UserDefaults { UserDefaults {
is_first_launch: true, is_first_launch: true,
pruning: Algorithm::default(), pruning: Algorithm::OverlayRecent,
tracing: false, tracing: false,
fat_db: false, fat_db: false,
mode: Mode::Active, mode: Mode::Active,

View File

@ -80,10 +80,6 @@ pub enum Algorithm {
RefCounted, RefCounted,
} }
impl Default for Algorithm {
fn default() -> Algorithm { Algorithm::OverlayRecent }
}
impl str::FromStr for Algorithm { impl str::FromStr for Algorithm {
type Err = String; type Err = String;
@ -181,11 +177,6 @@ mod tests {
assert!(!Algorithm::RefCounted.is_stable()); assert!(!Algorithm::RefCounted.is_stable());
} }
#[test]
fn test_journal_algorithm_default() {
assert_eq!(Algorithm::default(), Algorithm::OverlayRecent);
}
#[test] #[test]
fn test_journal_algorithm_all_types() { fn test_journal_algorithm_all_types() {
// compiling should fail if some cases are not covered // compiling should fail if some cases are not covered