diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 9d68f0841..10da80902 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -87,7 +87,6 @@ pub use verification::queue::QueueInfo as BlockQueueInfo; use_contract!(registry, "Registry", "res/contracts/registrar.json"); -const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; // Max number of blocks imported at once. const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4; @@ -760,13 +759,12 @@ impl Client { tracedb: tracedb, engine: engine, pruning: config.pruning.clone(), - config: config, db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), report: RwLock::new(Default::default()), io_channel: Mutex::new(message_channel), notify: RwLock::new(Vec::new()), - queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), + queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), queued_ancient_blocks: Default::default(), ancient_blocks_import_lock: Default::default(), @@ -779,6 +777,7 @@ impl Client { registrar_address, exit_handler: Mutex::new(None), importer, + config, }); // prune old states. diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 462a126a0..1045ea610 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -70,12 +70,6 @@ pub enum Mode { Off, } -impl Default for Mode { - fn default() -> Self { - Mode::Active - } -} - impl Display for Mode { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { match *self { @@ -88,7 +82,7 @@ impl Display for Mode { } /// Client configuration. Includes configs for all sub-systems. -#[derive(Debug, PartialEq, Default, Clone)] +#[derive(Debug, PartialEq, Clone)] pub struct ClientConfig { /// Block queue configuration. pub queue: QueueConfig, @@ -126,8 +120,36 @@ pub struct ClientConfig { pub history_mem: usize, /// Check seal valididity on block import pub check_seal: bool, + /// Maximal number of transactions queued for verification in a separate thread. + pub transaction_verification_queue_size: usize, } +impl Default for ClientConfig { + fn default() -> Self { + let mb = 1024 * 1024; + ClientConfig { + queue: Default::default(), + blockchain: Default::default(), + tracing: Default::default(), + vm_type: Default::default(), + fat_db: false, + pruning: journaldb::Algorithm::OverlayRecent, + name: "default".into(), + db_cache_size: None, + db_compaction: Default::default(), + db_wal: true, + mode: Mode::Active, + spec_name: "".into(), + verifier_type: VerifierType::Canon, + state_cache_size: 1 * mb, + jump_table_size: 1 * mb, + history: 64, + history_mem: 32 * mb, + check_seal: true, + transaction_verification_queue_size: 8192, + } + } +} #[cfg(test)] mod test { use super::{DatabaseCompactionProfile, Mode}; @@ -143,9 +165,4 @@ mod test { assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap()); assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap()); } - - #[test] - fn test_mode_default() { - assert_eq!(Mode::default(), Mode::Active); - } } diff --git a/ethcore/src/verification/mod.rs b/ethcore/src/verification/mod.rs index ed4227ee2..56cb2ade7 100644 --- a/ethcore/src/verification/mod.rs +++ b/ethcore/src/verification/mod.rs @@ -42,12 +42,6 @@ pub enum VerifierType { Noop, } -impl Default for VerifierType { - fn default() -> Self { - VerifierType::Canon - } -} - /// Create a new verifier based on type. pub fn new(v: VerifierType) -> Box> { match v { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 84e6344e6..50f4c9428 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -662,20 +662,29 @@ impl ChainSync { None } ).collect(); - let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)| - self.active_peers.contains(&peer_id) - ).map(|v| *v).collect(); - random::new().shuffle(&mut peers); //TODO: sort by rating - // prefer peers with higher protocol version - peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); trace!( target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), confirmed_peers.len(), self.peers.len() ); - for (peer_id, _) in peers { - self.sync_peer(io, peer_id, false); + + if self.state == SyncState::Waiting { + trace!(target: "sync", "Waiting for the block queue"); + } else if self.state == SyncState::SnapshotWaiting { + trace!(target: "sync", "Waiting for the snapshot restoration"); + } else { + let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)| + self.active_peers.contains(&peer_id) + ).map(|v| *v).collect(); + + random::new().shuffle(&mut peers); //TODO: sort by rating + // prefer peers with higher protocol version + peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); + + for (peer_id, _) in peers { + self.sync_peer(io, peer_id, false); + } } if @@ -710,14 +719,6 @@ impl ChainSync { trace!(target: "sync", "Skipping busy peer {}", peer_id); return; } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Waiting for the block queue"); - return; - } - if self.state == SyncState::SnapshotWaiting { - trace!(target: "sync", "Waiting for the snapshot restoration"); - return; - } (peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned()) } else { return; diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 450075faa..284f64d31 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -19,7 +19,7 @@ use std::{cmp, fmt}; use std::sync::Arc; use std::sync::atomic::{self, AtomicUsize}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use ethereum_types::{H256, U256, Address}; use parking_lot::RwLock; @@ -138,6 +138,50 @@ impl CachedPending { } } +#[derive(Debug)] +struct RecentlyRejected { + inner: RwLock>, + limit: usize, +} + +impl RecentlyRejected { + fn new(limit: usize) -> Self { + RecentlyRejected { + limit, + inner: RwLock::new(HashMap::with_capacity(MIN_REJECTED_CACHE_SIZE)), + } + } + + fn clear(&self) { + self.inner.write().clear(); + } + + fn get(&self, hash: &H256) -> Option { + self.inner.read().get(hash).cloned() + } + + fn insert(&self, hash: H256, err: &transaction::Error) { + if self.inner.read().contains_key(&hash) { + return; + } + + let mut inner = self.inner.write(); + inner.insert(hash, err.clone()); + + // clean up + if inner.len() > self.limit { + // randomly remove half of the entries + let to_remove: Vec<_> = inner.keys().take(self.limit / 2).cloned().collect(); + for key in to_remove { + inner.remove(&key); + } + } + } +} + +/// Minimal size of rejection cache, by default it's equal to queue size. +const MIN_REJECTED_CACHE_SIZE: usize = 2048; + /// Ethereum Transaction Queue /// /// Responsible for: @@ -150,6 +194,7 @@ pub struct TransactionQueue { pool: RwLock, options: RwLock, cached_pending: RwLock, + recently_rejected: RecentlyRejected, } impl TransactionQueue { @@ -159,11 +204,13 @@ impl TransactionQueue { verification_options: verifier::Options, strategy: PrioritizationStrategy, ) -> Self { + let max_count = limits.max_count; TransactionQueue { insertion_id: Default::default(), pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)), options: RwLock::new(verification_options), cached_pending: RwLock::new(CachedPending::none()), + recently_rejected: RecentlyRejected::new(cmp::max(MIN_REJECTED_CACHE_SIZE, max_count / 4)), } } @@ -195,26 +242,42 @@ impl TransactionQueue { None } }; + let verifier = verifier::Verifier::new( client, options, self.insertion_id.clone(), transaction_to_replace, ); + let results = transactions .into_iter() .map(|transaction| { - if self.pool.read().find(&transaction.hash()).is_some() { - bail!(transaction::Error::AlreadyImported) + let hash = transaction.hash(); + + if self.pool.read().find(&hash).is_some() { + return Err(transaction::Error::AlreadyImported); } - verifier.verify_transaction(transaction) + if let Some(err) = self.recently_rejected.get(&hash) { + trace!(target: "txqueue", "[{:?}] Rejecting recently rejected: {:?}", hash, err); + return Err(err); + } + + let imported = verifier + .verify_transaction(transaction) + .and_then(|verified| { + self.pool.write().import(verified).map_err(convert_error) + }); + + match imported { + Ok(_) => Ok(()), + Err(err) => { + self.recently_rejected.insert(hash, &err); + Err(err) + }, + } }) - .map(|result| result.and_then(|verified| { - self.pool.write().import(verified) - .map(|_imported| ()) - .map_err(convert_error) - })) .collect::>(); // Notify about imported transactions. @@ -342,6 +405,7 @@ impl TransactionQueue { let state_readiness = ready::State::new(client, stale_id, nonce_cap); + self.recently_rejected.clear(); let removed = self.pool.write().cull(None, state_readiness); debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status()); } diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index df637a6cf..ab33c2ce3 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -894,6 +894,47 @@ fn should_avoid_verifying_transaction_already_in_pool() { } #[test] +fn should_avoid_reverifying_recently_rejected_transactions() { + // given + let txq = TransactionQueue::new( + txpool::Options { + max_count: 1, + max_per_sender: 2, + max_mem_usage: 50 + }, + verifier::Options { + minimal_gas_price: 1.into(), + block_gas_limit: 1_000_000.into(), + tx_gas_limit: 1_000_000.into(), + }, + PrioritizationStrategy::GasPriceOnly, + ); + + let client = TestClient::new(); + let tx1 = Tx::gas_price(10_000).signed().unverified(); + + let res = txq.import(client.clone(), vec![tx1.clone()]); + assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance { + balance: 0xf67c.into(), + cost: 0xc8458e4.into(), + })]); + assert_eq!(txq.status().status.transaction_count, 0); + assert!(client.was_verification_triggered()); + + // when + let client = TestClient::new(); + let res = txq.import(client.clone(), vec![tx1]); + assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance { + balance: 0xf67c.into(), + cost: 0xc8458e4.into(), + })]); + assert!(!client.was_verification_triggered()); + + // then + assert_eq!(txq.status().status.transaction_count, 0); +} + + fn should_reject_early_in_case_gas_price_is_less_than_min_effective() { // given let txq = TransactionQueue::new( diff --git a/parity/blockchain.rs b/parity/blockchain.rs index af5373b39..f2bccba4c 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -357,7 +357,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> { algorithm, cmd.pruning_history, cmd.pruning_memory, - cmd.check_seal + cmd.check_seal, ); client_config.queue.verifier_settings = cmd.verifier_settings; diff --git a/parity/run.rs b/parity/run.rs index da7a715bd..f4795f982 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -518,6 +518,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: // fetch service let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?; + let txpool_size = cmd.miner_options.pool_limits.max_count; // create miner let miner = Arc::new(Miner::new( cmd.miner_options, @@ -574,6 +575,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: ); client_config.queue.verifier_settings = cmd.verifier_settings; + client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4); // set up bootnodes let mut net_conf = cmd.net_conf; diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 772719250..949a685bd 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -179,7 +179,7 @@ impl SnapshotCommand { algorithm, self.pruning_history, self.pruning_memory, - true + true, ); let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config); diff --git a/parity/user_defaults.rs b/parity/user_defaults.rs index cb4a0a40a..5031886f2 100644 --- a/parity/user_defaults.rs +++ b/parity/user_defaults.rs @@ -122,7 +122,7 @@ impl Default for UserDefaults { fn default() -> Self { UserDefaults { is_first_launch: true, - pruning: Algorithm::default(), + pruning: Algorithm::OverlayRecent, tracing: false, fat_db: false, mode: Mode::Active, diff --git a/util/journaldb/src/lib.rs b/util/journaldb/src/lib.rs index 652d89039..4814ac868 100644 --- a/util/journaldb/src/lib.rs +++ b/util/journaldb/src/lib.rs @@ -82,10 +82,6 @@ pub enum Algorithm { RefCounted, } -impl Default for Algorithm { - fn default() -> Algorithm { Algorithm::OverlayRecent } -} - impl str::FromStr for Algorithm { type Err = String; @@ -183,11 +179,6 @@ mod tests { assert!(!Algorithm::RefCounted.is_stable()); } - #[test] - fn test_journal_algorithm_default() { - assert_eq!(Algorithm::default(), Algorithm::OverlayRecent); - } - #[test] fn test_journal_algorithm_all_types() { // compiling should fail if some cases are not covered