From badb3729c9040a8f632ab4ddf528dc138e3d7195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 20 Mar 2017 19:15:02 +0100 Subject: [PATCH] Changing Mutex into RwLock for transaction queue (#4951) * Changing Mutex into RwLock for transaction queue * Fixing merge * little fix --- Cargo.lock | 8 ++--- ethcore/Cargo.toml | 2 +- ethcore/src/miner/banning_queue.rs | 33 +++++++++--------- ethcore/src/miner/miner.rs | 56 +++++++++++++++--------------- rpc/Cargo.toml | 2 +- rpc/src/v1/helpers/poll_manager.rs | 2 +- rpc/src/v1/helpers/signer.rs | 2 +- rpc/src/v1/impls/signing.rs | 2 +- 8 files changed, 53 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ab71f092..1543c9e6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,7 +414,7 @@ dependencies = [ "semver 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", - "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -638,7 +638,7 @@ dependencies = [ "serde_json 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)", "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", - "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2416,7 +2416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "transient-hashmap" -version = "0.1.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2780,7 +2780,7 @@ dependencies = [ "checksum toml 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "fcd27a04ca509aff336ba5eb2abc58d456f52c4ff64d9724d88acb85ead560b6" "checksum toml 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a442dfc13508e603c3f763274361db7f79d7469a0e95c411cde53662ab30fc72" "checksum traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "07eaeb7689bb7fca7ce15628319635758eda769fed481ecfe6686ddef2600616" -"checksum transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "15f7cc7116182edca1ed08f6f8c4da92104555ca77addbabea4eaa59b20373d0" +"checksum transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "715254c8f0811be1a79ad3ea5e6fa3c8eddec2b03d7f5ba78cf093e56d79c24f" "checksum typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1410f6f91f21d1612654e7cc69193b0334f909dcf2c790c4826254fbb86f8887" "checksum unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "13a5906ca2b98c799f4b1ab4557b76367ebd6ae5ef14930ec841c74aed5f3764" "checksum unicode-bidi 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c1f7ceb96afdfeedee42bade65a0d585a6a0106f681b6749c8ff4daa8df30b3f" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 39bf20070..cdf0135e7 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -24,7 +24,7 @@ bit-set = "0.4" time = "0.1" rand = "0.3" byteorder = "1.0" -transient-hashmap = "0.1" +transient-hashmap = "0.4" linked-hash-map = "0.3.0" lru-cache = "0.1.0" ethabi = "1.0.0" diff --git a/ethcore/src/miner/banning_queue.rs b/ethcore/src/miner/banning_queue.rs index 66fae59aa..a77008ba6 100644 --- a/ethcore/src/miner/banning_queue.rs +++ b/ethcore/src/miner/banning_queue.rs @@ -19,7 +19,6 @@ use std::time::Duration; use std::ops::{Deref, DerefMut}; -use std::cell::Cell; use transaction::{SignedTransaction, Action}; use transient_hashmap::TransientHashMap; use miner::{TransactionQueue, TransactionQueueDetailsProvider, TransactionImportResult, TransactionOrigin}; @@ -47,15 +46,15 @@ impl Default for Threshold { pub struct BanningTransactionQueue { queue: TransactionQueue, ban_threshold: Threshold, - senders_bans: TransientHashMap>, - recipients_bans: TransientHashMap>, - codes_bans: TransientHashMap>, + senders_bans: TransientHashMap, + recipients_bans: TransientHashMap, + codes_bans: TransientHashMap, } impl BanningTransactionQueue { /// Creates new banlisting transaction queue pub fn new(queue: TransactionQueue, ban_threshold: Threshold, ban_lifetime: Duration) -> Self { - let ban_lifetime_sec = ban_lifetime.as_secs(); + let ban_lifetime_sec = ban_lifetime.as_secs() as u32; assert!(ban_lifetime_sec > 0, "Lifetime has to be specified in seconds."); BanningTransactionQueue { queue: queue, @@ -87,7 +86,7 @@ impl BanningTransactionQueue { // Check sender let sender = transaction.sender(); - let count = self.senders_bans.direct().get(&sender).map(|v| v.get()).unwrap_or(0); + let count = self.senders_bans.direct().get(&sender).cloned().unwrap_or(0); if count > threshold { debug!(target: "txqueue", "Ignoring transaction {:?} because sender is banned.", transaction.hash()); return Err(Error::Transaction(TransactionError::SenderBanned)); @@ -95,7 +94,7 @@ impl BanningTransactionQueue { // Check recipient if let Action::Call(recipient) = transaction.action { - let count = self.recipients_bans.direct().get(&recipient).map(|v| v.get()).unwrap_or(0); + let count = self.recipients_bans.direct().get(&recipient).cloned().unwrap_or(0); if count > threshold { debug!(target: "txqueue", "Ignoring transaction {:?} because recipient is banned.", transaction.hash()); return Err(Error::Transaction(TransactionError::RecipientBanned)); @@ -105,7 +104,7 @@ impl BanningTransactionQueue { // Check code if let Action::Create = transaction.action { let code_hash = transaction.data.sha3(); - let count = self.codes_bans.direct().get(&code_hash).map(|v| v.get()).unwrap_or(0); + let count = self.codes_bans.direct().get(&code_hash).cloned().unwrap_or(0); if count > threshold { debug!(target: "txqueue", "Ignoring transaction {:?} because code is banned.", transaction.hash()); return Err(Error::Transaction(TransactionError::CodeBanned)); @@ -147,9 +146,9 @@ impl BanningTransactionQueue { /// queue. fn ban_sender(&mut self, address: Address) -> bool { let count = { - let mut count = self.senders_bans.entry(address).or_insert_with(|| Cell::new(0)); - *count.get_mut() = count.get().saturating_add(1); - count.get() + let mut count = self.senders_bans.entry(address).or_insert_with(|| 0); + *count = count.saturating_add(1); + *count }; match self.ban_threshold { Threshold::BanAfter(threshold) if count > threshold => { @@ -167,9 +166,9 @@ impl BanningTransactionQueue { /// Returns true if bans threshold has been reached. fn ban_recipient(&mut self, address: Address) -> bool { let count = { - let mut count = self.recipients_bans.entry(address).or_insert_with(|| Cell::new(0)); - *count.get_mut() = count.get().saturating_add(1); - count.get() + let mut count = self.recipients_bans.entry(address).or_insert_with(|| 0); + *count = count.saturating_add(1); + *count }; match self.ban_threshold { // TODO [ToDr] Consider removing other transactions to the same recipient from the queue? @@ -183,12 +182,12 @@ impl BanningTransactionQueue { /// If bans threshold is reached all subsequent transactions to contracts with this codehash will be rejected. /// Returns true if bans threshold has been reached. fn ban_codehash(&mut self, code_hash: H256) -> bool { - let mut count = self.codes_bans.entry(code_hash).or_insert_with(|| Cell::new(0)); - *count.get_mut() = count.get().saturating_add(1); + let mut count = self.codes_bans.entry(code_hash).or_insert_with(|| 0); + *count = count.saturating_add(1); match self.ban_threshold { // TODO [ToDr] Consider removing other transactions with the same code from the queue? - Threshold::BanAfter(threshold) if count.get() > threshold => true, + Threshold::BanAfter(threshold) if *count > threshold => true, _ => false, } } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 545f7c539..c4a99df3a 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -212,7 +212,7 @@ struct SealingWork { /// Handles preparing work for "work sealing" or seals "internally" if Engine does not require work. pub struct Miner { // NOTE [ToDr] When locking always lock in this order! - transaction_queue: Arc>, + transaction_queue: Arc>, sealing_work: Mutex, next_allowed_reseal: Mutex, next_mandatory_reseal: RwLock, @@ -271,7 +271,7 @@ impl Miner { }; Miner { - transaction_queue: Arc::new(Mutex::new(txq)), + transaction_queue: Arc::new(RwLock::new(txq)), next_allowed_reseal: Mutex::new(Instant::now()), next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period), sealing_block_last_request: Mutex::new(0), @@ -328,7 +328,7 @@ impl Miner { let _timer = PerfTimer::new("prepare_block"); let chain_info = chain.chain_info(); let (transactions, mut open_block, original_work_hash) = { - let transactions = {self.transaction_queue.lock().top_transactions_at(chain_info.best_block_number, chain_info.best_block_timestamp)}; + let transactions = {self.transaction_queue.read().top_transactions_at(chain_info.best_block_number, chain_info.best_block_timestamp)}; let mut sealing_work = self.sealing_work.lock(); let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash()); let best_hash = chain_info.best_block_hash; @@ -375,7 +375,7 @@ impl Miner { // Check for heavy transactions match self.options.tx_queue_banning { Banning::Enabled { ref offend_threshold, .. } if &took > offend_threshold => { - match self.transaction_queue.lock().ban_transaction(&hash) { + match self.transaction_queue.write().ban_transaction(&hash) { true => { warn!(target: "miner", "Detected heavy transaction. Banning the sender and recipient/code."); }, @@ -428,7 +428,7 @@ impl Miner { let fetch_nonce = |a: &Address| chain.latest_nonce(a); { - let mut queue = self.transaction_queue.lock(); + let mut queue = self.transaction_queue.write(); for hash in invalid_transactions { queue.remove_invalid(&hash, &fetch_nonce); } @@ -445,13 +445,13 @@ impl Miner { let txq = self.transaction_queue.clone(); self.gas_pricer.lock().recalibrate(move |price| { debug!(target: "miner", "minimal_gas_price: Got gas price! {}", price); - txq.lock().set_minimal_gas_price(price); + txq.write().set_minimal_gas_price(price); }); } /// Check is reseal is allowed and necessary. fn requires_reseal(&self, best_block: BlockNumber) -> bool { - let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions(); + let has_local_transactions = self.transaction_queue.read().has_local_pending_transactions(); let mut sealing_work = self.sealing_work.lock(); if sealing_work.enabled { trace!(target: "miner", "requires_reseal: sealing enabled"); @@ -557,7 +557,7 @@ impl Miner { fn update_gas_limit(&self, client: &MiningBlockChainClient) { let gas_limit = client.best_block_header().gas_limit(); - let mut queue = self.transaction_queue.lock(); + let mut queue = self.transaction_queue.write(); queue.set_gas_limit(gas_limit); if let GasLimit::Auto = self.options.tx_queue_gas_limit { // Set total tx queue gas limit to be 20x the block gas limit. @@ -679,7 +679,7 @@ const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5; impl MinerService for Miner { fn clear_and_reset(&self, chain: &MiningBlockChainClient) { - self.transaction_queue.lock().clear(); + self.transaction_queue.write().clear(); // -------------------------------------------------------------------------- // | NOTE Code below requires transaction_queue and sealing_work locks. | // | Make sure to release the locks before calling that method. | @@ -688,7 +688,7 @@ impl MinerService for Miner { } fn status(&self) -> MinerStatus { - let status = self.transaction_queue.lock().status(); + let status = self.transaction_queue.read().status(); let sealing_work = self.sealing_work.lock(); MinerStatus { transactions_in_pending_queue: status.pending, @@ -818,16 +818,16 @@ impl MinerService for Miner { } fn set_minimal_gas_price(&self, min_gas_price: U256) { - self.transaction_queue.lock().set_minimal_gas_price(min_gas_price); + self.transaction_queue.write().set_minimal_gas_price(min_gas_price); } fn minimal_gas_price(&self) -> U256 { - *self.transaction_queue.lock().minimal_gas_price() + *self.transaction_queue.read().minimal_gas_price() } fn sensible_gas_price(&self) -> U256 { // 10% above our minimum. - *self.transaction_queue.lock().minimal_gas_price() * 110.into() / 100.into() + *self.transaction_queue.read().minimal_gas_price() * 110.into() / 100.into() } fn sensible_gas_limit(&self) -> U256 { @@ -835,15 +835,15 @@ impl MinerService for Miner { } fn transactions_limit(&self) -> usize { - self.transaction_queue.lock().limit() + self.transaction_queue.read().limit() } fn set_transactions_limit(&self, limit: usize) { - self.transaction_queue.lock().set_limit(limit) + self.transaction_queue.write().set_limit(limit) } fn set_tx_gas_limit(&self, limit: U256) { - self.transaction_queue.lock().set_tx_gas_limit(limit) + self.transaction_queue.write().set_tx_gas_limit(limit) } /// Get the author that we will seal blocks as. @@ -873,7 +873,7 @@ impl MinerService for Miner { ) -> Vec> { trace!(target: "external_tx", "Importing external transactions"); let results = { - let mut transaction_queue = self.transaction_queue.lock(); + let mut transaction_queue = self.transaction_queue.write(); self.add_transactions_to_queue( chain, transactions, TransactionOrigin::External, None, &mut transaction_queue ) @@ -900,7 +900,7 @@ impl MinerService for Miner { let imported = { // Be sure to release the lock before we call prepare_work_sealing - let mut transaction_queue = self.transaction_queue.lock(); + let mut transaction_queue = self.transaction_queue.write(); // We need to re-validate transactions let import = self.add_transactions_to_queue( chain, vec![pending.transaction.into()], TransactionOrigin::Local, pending.condition, &mut transaction_queue @@ -937,12 +937,12 @@ impl MinerService for Miner { } fn pending_transactions(&self) -> Vec { - let queue = self.transaction_queue.lock(); + let queue = self.transaction_queue.read(); queue.pending_transactions(BlockNumber::max_value(), u64::max_value()) } fn local_transactions(&self) -> BTreeMap { - let queue = self.transaction_queue.lock(); + let queue = self.transaction_queue.read(); queue.local_transactions() .iter() .map(|(hash, status)| (*hash, status.clone())) @@ -950,11 +950,11 @@ impl MinerService for Miner { } fn future_transactions(&self) -> Vec { - self.transaction_queue.lock().future_transactions() + self.transaction_queue.read().future_transactions() } fn ready_transactions(&self, best_block: BlockNumber, best_block_timestamp: u64) -> Vec { - let queue = self.transaction_queue.lock(); + let queue = self.transaction_queue.read(); match self.options.pending_set { PendingSet::AlwaysQueue => queue.pending_transactions(best_block, best_block_timestamp), PendingSet::SealingOrElseQueue => { @@ -975,7 +975,7 @@ impl MinerService for Miner { } fn pending_transactions_hashes(&self, best_block: BlockNumber) -> Vec { - let queue = self.transaction_queue.lock(); + let queue = self.transaction_queue.read(); match self.options.pending_set { PendingSet::AlwaysQueue => queue.pending_hashes(), PendingSet::SealingOrElseQueue => { @@ -996,7 +996,7 @@ impl MinerService for Miner { } fn transaction(&self, best_block: BlockNumber, hash: &H256) -> Option { - let queue = self.transaction_queue.lock(); + let queue = self.transaction_queue.read(); match self.options.pending_set { PendingSet::AlwaysQueue => queue.find(hash), PendingSet::SealingOrElseQueue => { @@ -1017,7 +1017,7 @@ impl MinerService for Miner { } fn remove_pending_transaction(&self, chain: &MiningBlockChainClient, hash: &H256) -> Option { - let mut queue = self.transaction_queue.lock(); + let mut queue = self.transaction_queue.write(); let tx = queue.find(hash); if tx.is_some() { let fetch_nonce = |a: &Address| chain.latest_nonce(a); @@ -1077,7 +1077,7 @@ impl MinerService for Miner { } fn last_nonce(&self, address: &Address) -> Option { - self.transaction_queue.lock().last_nonce(address) + self.transaction_queue.read().last_nonce(address) } /// Update sealing if required. @@ -1167,7 +1167,7 @@ impl MinerService for Miner { // Then import all transactions... { - let mut transaction_queue = self.transaction_queue.lock(); + let mut transaction_queue = self.transaction_queue.write(); for hash in retracted { let block = chain.block(BlockId::Hash(*hash)) .expect("Client is sending message after commit to db and inserting to chain; the block is available; qed"); @@ -1185,7 +1185,7 @@ impl MinerService for Miner { balance: chain.latest_balance(a), }; let time = chain.chain_info().best_block_number; - let mut transaction_queue = self.transaction_queue.lock(); + let mut transaction_queue = self.transaction_queue.write(); transaction_queue.remove_old(&fetch_account, time); } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 1a5347eba..0b3cc4b82 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -16,7 +16,7 @@ serde_json = "0.9" serde_derive = "0.9" rustc-serialize = "0.3" time = "0.1" -transient-hashmap = "0.1" +transient-hashmap = "0.4" order-stat = "0.1" jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch = "parity-1.6" } jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git", branch = "parity-1.6" } diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index f6ca5ac62..ab44dc35f 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -19,7 +19,7 @@ use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; /// Lifetime of poll (in seconds). -const POLL_LIFETIME: u64 = 60; +const POLL_LIFETIME: u32 = 60; pub type PollId = usize; diff --git a/rpc/src/v1/helpers/signer.rs b/rpc/src/v1/helpers/signer.rs index 56991f487..52c3e731d 100644 --- a/rpc/src/v1/helpers/signer.rs +++ b/rpc/src/v1/helpers/signer.rs @@ -23,7 +23,7 @@ use ethstore::random_string; use v1::helpers::signing_queue::{ConfirmationsQueue}; -const TOKEN_LIFETIME_SECS: u64 = 3600; +const TOKEN_LIFETIME_SECS: u32 = 3600; /// Manages communication with Signer crate pub struct SignerService { diff --git a/rpc/src/v1/impls/signing.rs b/rpc/src/v1/impls/signing.rs index bc01fbcda..dcda6fa9e 100644 --- a/rpc/src/v1/impls/signing.rs +++ b/rpc/src/v1/impls/signing.rs @@ -43,7 +43,7 @@ use v1::types::{ }; /// After 60s entries that are not queried with `check_request` will get garbage collected. -const MAX_PENDING_DURATION_SEC: u64 = 60; +const MAX_PENDING_DURATION_SEC: u32 = 60; /// Max number of total requests pending and completed, before we start garbage collecting them. const MAX_TOTAL_REQUESTS: usize = SIGNING_QUEUE_LIMIT;