From 66e327dfcbad838a725b819a09c32d25c041567d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 16 Nov 2016 15:58:14 +0100 Subject: [PATCH] Keep track of local transactions --- Cargo.lock | 1 + ethcore/Cargo.toml | 1 + ethcore/src/lib.rs | 1 + ethcore/src/miner/local_transactions.rs | 193 ++++++++++++++++++++++++ ethcore/src/miner/mod.rs | 1 + ethcore/src/miner/transaction_queue.rs | 132 +++++++++++++--- 6 files changed, 309 insertions(+), 20 deletions(-) create mode 100644 ethcore/src/miner/local_transactions.rs diff --git a/Cargo.lock b/Cargo.lock index a93d2d551..94e754997 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,7 @@ dependencies = [ "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 667b40ace..48fce064a 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -27,6 +27,7 @@ time = "0.1" rand = "0.3" byteorder = "0.5" transient-hashmap = "0.1" +linked-hash-map = "0.3.0" evmjit = { path = "../evmjit", optional = true } clippy = { version = "0.0.96", optional = true} ethash = { path = "../ethash" } diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index bf3e59171..ca343b1a7 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -102,6 +102,7 @@ extern crate rlp; extern crate ethcore_bloom_journal as bloom_journal; extern crate byteorder; extern crate transient_hashmap; +extern crate linked_hash_map; #[macro_use] extern crate log; diff --git a/ethcore/src/miner/local_transactions.rs b/ethcore/src/miner/local_transactions.rs new file mode 100644 index 000000000..c9c869c33 --- /dev/null +++ b/ethcore/src/miner/local_transactions.rs @@ -0,0 +1,193 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Local Transactions List. + +use linked_hash_map::LinkedHashMap; +use transaction::SignedTransaction; +use error::TransactionError; +use util::{U256, H256}; + +#[derive(Debug, PartialEq, Clone)] +pub enum Status { + /// The transaction is currently in the transaction queue. + Pending, + /// The transaction is in future part of the queue. + Future, + /// Transaction is already mined. + Mined(SignedTransaction), + /// Transaction is dropped because of limit + Dropped(SignedTransaction), + /// Replaced because of higher gas price of another transaction. + Replaced(SignedTransaction, U256, H256), + /// Transaction was never accepted to the queue. + Rejected(SignedTransaction, TransactionError), + /// Transaction is invalid. + Invalid(SignedTransaction), +} + +impl Status { + fn is_current(&self) -> bool { + *self == Status::Pending || *self == Status::Future + } +} + +/// Keeps track of local transactions that are in the queue or were mined/dropped recently. +#[derive(Debug)] +pub struct LocalTransactionsList { + max_old: usize, + transactions: LinkedHashMap, +} + +impl Default for LocalTransactionsList { + fn default() -> Self { + Self::new(10) + } +} + +impl LocalTransactionsList { + pub fn new(max_old: usize) -> Self { + LocalTransactionsList { + max_old: max_old, + transactions: Default::default(), + } + } + + pub fn mark_pending(&mut self, hash: H256) { + self.clear_old(); + self.transactions.insert(hash, Status::Pending); + } + + pub fn mark_future(&mut self, hash: H256) { + self.transactions.insert(hash, Status::Future); + self.clear_old(); + } + + pub fn mark_rejected(&mut self, tx: SignedTransaction, err: TransactionError) { + self.transactions.insert(tx.hash(), Status::Rejected(tx, err)); + self.clear_old(); + } + + pub fn mark_replaced(&mut self, tx: SignedTransaction, gas_price: U256, hash: H256) { + self.transactions.insert(tx.hash(), Status::Replaced(tx, gas_price, hash)); + self.clear_old(); + } + + pub fn mark_invalid(&mut self, tx: SignedTransaction) { + self.transactions.insert(tx.hash(), Status::Invalid(tx)); + self.clear_old(); + } + + pub fn mark_dropped(&mut self, tx: SignedTransaction) { + self.transactions.insert(tx.hash(), Status::Dropped(tx)); + self.clear_old(); + } + + pub fn mark_mined(&mut self, tx: SignedTransaction) { + self.transactions.insert(tx.hash(), Status::Mined(tx)); + self.clear_old(); + } + + pub fn contains(&self, hash: &H256) -> bool { + self.transactions.contains_key(hash) + } + + pub fn all_transactions(&self) -> &LinkedHashMap { + &self.transactions + } + + fn clear_old(&mut self) { + let number_of_old = self.transactions + .values() + .filter(|status| !status.is_current()) + .count(); + + if self.max_old >= number_of_old { + return; + } + + let to_remove = self.transactions + .iter() + .filter(|&(_, status)| !status.is_current()) + .map(|(hash, _)| *hash) + .take(number_of_old - self.max_old) + .collect::>(); + + for hash in to_remove { + self.transactions.remove(&hash); + } + } +} + +#[cfg(test)] +mod tests { + use util::U256; + use ethkey::{Random, Generator}; + use transaction::{Action, Transaction, SignedTransaction}; + use super::{LocalTransactionsList, Status}; + + #[test] + fn should_add_transaction_as_pending() { + // given + let mut list = LocalTransactionsList::default(); + + // when + list.mark_pending(10.into()); + list.mark_future(20.into()); + + // then + assert!(list.contains(&10.into()), "Should contain the transaction."); + assert!(list.contains(&20.into()), "Should contain the transaction."); + let statuses = list.all_transactions().values().cloned().collect::>(); + assert_eq!(statuses, vec![Status::Pending, Status::Future]); + } + + #[test] + fn should_clear_old_transactions() { + // given + let mut list = LocalTransactionsList::new(1); + let tx1 = new_tx(10.into()); + let tx1_hash = tx1.hash(); + let tx2 = new_tx(50.into()); + let tx2_hash = tx2.hash(); + + list.mark_pending(10.into()); + list.mark_invalid(tx1); + list.mark_dropped(tx2); + assert!(list.contains(&tx2_hash)); + assert!(!list.contains(&tx1_hash)); + assert!(list.contains(&10.into())); + + // when + list.mark_future(15.into()); + + // then + assert!(list.contains(&10.into())); + assert!(list.contains(&15.into())); + } + + fn new_tx(nonce: U256) -> SignedTransaction { + let keypair = Random.generate().unwrap(); + Transaction { + action: Action::Create, + value: U256::from(100), + data: Default::default(), + gas: U256::from(10), + gas_price: U256::from(1245), + nonce: nonce + }.sign(keypair.secret(), None) + } +} diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 2eb09cdf1..29d731e9e 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -43,6 +43,7 @@ mod banning_queue; mod external; +mod local_transactions; mod miner; mod price_info; mod transaction_queue; diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index cc10bbe98..9f688adf5 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -91,6 +91,7 @@ use util::table::Table; use transaction::*; use error::{Error, TransactionError}; use client::TransactionImportResult; +use miner::local_transactions::LocalTransactionsList; /// Transaction origin #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -125,6 +126,12 @@ impl Ord for TransactionOrigin { } } +impl TransactionOrigin { + fn is_local(&self) -> bool { + *self == TransactionOrigin::Local + } +} + #[derive(Clone, Debug)] /// Light structure used to identify transaction and its order struct TransactionOrder { @@ -352,7 +359,7 @@ impl TransactionSet { /// /// It drops transactions from this set but also removes associated `VerifiedTransaction`. /// Returns addresses and lowest nonces of transactions removed because of limit. - fn enforce_limit(&mut self, by_hash: &mut HashMap) -> Option> { + fn enforce_limit(&mut self, by_hash: &mut HashMap, local: &mut LocalTransactionsList) -> Option> { let mut count = 0; let mut gas: U256 = 0.into(); let to_drop : Vec<(Address, U256)> = { @@ -379,9 +386,13 @@ impl TransactionSet { .expect("Transaction has just been found in `by_priority`; so it is in `by_address` also."); trace!(target: "txqueue", "Dropped out of limit transaction: {:?}", order.hash); - by_hash.remove(&order.hash) + let order = by_hash.remove(&order.hash) .expect("hash is in `by_priorty`; all hashes in `by_priority` must be in `by_hash`; qed"); + if order.origin.is_local() { + local.mark_dropped(order.transaction); + } + let min = removed.get(&sender).map_or(nonce, |val| cmp::min(*val, nonce)); removed.insert(sender, min); removed @@ -488,6 +499,8 @@ pub struct TransactionQueue { by_hash: HashMap, /// Last nonce of transaction in current (to quickly check next expected transaction) last_nonces: HashMap, + /// List of local transactions and their statuses. + local_transactions: LocalTransactionsList, } impl Default for TransactionQueue { @@ -529,6 +542,7 @@ impl TransactionQueue { future: future, by_hash: HashMap::new(), last_nonces: HashMap::new(), + local_transactions: LocalTransactionsList::default(), } } @@ -537,8 +551,8 @@ impl TransactionQueue { self.current.set_limit(limit); self.future.set_limit(limit); // And ensure the limits - self.current.enforce_limit(&mut self.by_hash); - self.future.enforce_limit(&mut self.by_hash); + self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions); + self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); } /// Returns current limit of transactions in the queue. @@ -578,7 +592,7 @@ impl TransactionQueue { pub fn set_total_gas_limit(&mut self, gas_limit: U256) { self.future.gas_limit = gas_limit; self.current.gas_limit = gas_limit; - self.future.enforce_limit(&mut self.by_hash); + self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); } /// Set the new limit for the amount of gas any individual transaction may have. @@ -609,6 +623,42 @@ impl TransactionQueue { F: Fn(&Address) -> AccountDetails, G: Fn(&SignedTransaction) -> U256, { + if origin == TransactionOrigin::Local { + let hash = tx.hash(); + let cloned_tx = tx.clone(); + + let result = self.add_internal(tx, origin, fetch_account, gas_estimator); + match result { + Ok(TransactionImportResult::Current) => { + self.local_transactions.mark_pending(hash); + }, + Ok(TransactionImportResult::Future) => { + self.local_transactions.mark_future(hash); + }, + Err(Error::Transaction(ref err)) => { + self.local_transactions.mark_rejected(cloned_tx, err.clone()); + }, + Err(_) => { + self.local_transactions.mark_invalid(cloned_tx); + }, + } + result + } else { + self.add_internal(tx, origin, fetch_account, gas_estimator) + } + } + + /// Adds signed transaction to the queue. + fn add_internal( + &mut self, + tx: SignedTransaction, + origin: TransactionOrigin, + fetch_account: &F, + gas_estimator: &G, + ) -> Result where + F: Fn(&Address) -> AccountDetails, + G: Fn(&SignedTransaction) -> U256, + { if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local { trace!(target: "txqueue", @@ -647,7 +697,6 @@ impl TransactionQueue { self.gas_limit, self.tx_gas_limit ); - return Err(Error::Transaction(TransactionError::GasLimitExceeded { limit: self.gas_limit, got: tx.gas, @@ -766,6 +815,11 @@ impl TransactionQueue { trace!(target: "txqueue", "Removing invalid transaction: {:?}", transaction.hash()); + // Mark in locals + if self.local_transactions.contains(transaction_hash) { + self.local_transactions.mark_invalid(transaction.transaction.clone()); + } + // Remove from future let order = self.future.drop(&sender, &nonce); if order.is_some() { @@ -821,15 +875,21 @@ impl TransactionQueue { qed"); if k >= current_nonce { let order = order.update_height(k, current_nonce); + if order.origin.is_local() { + self.local_transactions.mark_future(order.hash); + } if let Some(old) = self.future.insert(*sender, k, order.clone()) { - Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash); + Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash, &mut self.local_transactions); } } else { trace!(target: "txqueue", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); - self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`"); + let tx = self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`"); + if tx.origin.is_local() { + self.local_transactions.mark_mined(tx.transaction); + } } } - self.future.enforce_limit(&mut self.by_hash); + self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); } /// Returns top transactions from the queue ordered by priority. @@ -897,8 +957,11 @@ impl TransactionQueue { self.future.by_gas_price.remove(&order.gas_price, &order.hash); // Put to current let order = order.update_height(current_nonce, first_nonce); + if order.origin.is_local() { + self.local_transactions.mark_pending(order.hash); + } if let Some(old) = self.current.insert(address, current_nonce, order.clone()) { - Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash); + Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash, &mut self.local_transactions); } update_last_nonce_to = Some(current_nonce); current_nonce = current_nonce + U256::one(); @@ -957,9 +1020,11 @@ impl TransactionQueue { if nonce > next_nonce { // We have a gap - put to future. // Insert transaction (or replace old one with lower gas price) - try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash))); + try!(check_too_cheap( + Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash, &mut self.local_transactions) + )); // Enforce limit in Future - let removed = self.future.enforce_limit(&mut self.by_hash); + let removed = self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions); // Return an error if this transaction was not imported because of limit. try!(check_if_removed(&address, &nonce, removed)); @@ -973,13 +1038,15 @@ impl TransactionQueue { self.move_matching_future_to_current(address, nonce + U256::one(), state_nonce); // Replace transaction if any - try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash))); + try!(check_too_cheap( + Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash, &mut self.local_transactions) + )); // Keep track of highest nonce stored in current let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n)); self.last_nonces.insert(address, new_max); // Also enforce the limit - let removed = self.current.enforce_limit(&mut self.by_hash); + let removed = self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions); // If some transaction were removed because of limit we need to update last_nonces also. self.update_last_nonces(&removed); // Trigger error if the transaction we are importing was removed. @@ -1010,7 +1077,14 @@ impl TransactionQueue { /// /// Returns `true` if transaction actually got to the queue (`false` if there was already a transaction with higher /// gas_price) - fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, min_gas_price: (U256, PrioritizationStrategy), set: &mut TransactionSet, by_hash: &mut HashMap) -> bool { + fn replace_transaction( + tx: VerifiedTransaction, + base_nonce: U256, + min_gas_price: (U256, PrioritizationStrategy), + set: &mut TransactionSet, + by_hash: &mut HashMap, + local: &mut LocalTransactionsList, + ) -> bool { let order = TransactionOrder::for_transaction(&tx, base_nonce, min_gas_price.0, min_gas_price.1); let hash = tx.hash(); let address = tx.sender(); @@ -1021,14 +1095,24 @@ impl TransactionQueue { if let Some(old) = set.insert(address, nonce, order.clone()) { - Self::replace_orders(address, nonce, old, order, set, by_hash) + Self::replace_orders(address, nonce, old, order, set, by_hash, local) } else { true } } - fn replace_orders(address: Address, nonce: U256, old: TransactionOrder, order: TransactionOrder, set: &mut TransactionSet, by_hash: &mut HashMap) -> bool { + fn replace_orders( + address: Address, + nonce: U256, + old: TransactionOrder, + order: TransactionOrder, + set: &mut TransactionSet, + by_hash: &mut HashMap, + local: &mut LocalTransactionsList, + ) -> bool { // There was already transaction in queue. Let's check which one should stay + let old_hash = old.hash; + let new_hash = order.hash; let old_fee = old.gas_price; let new_fee = order.gas_price; if old_fee.cmp(&new_fee) == Ordering::Greater { @@ -1036,12 +1120,18 @@ impl TransactionQueue { // Put back old transaction since it has greater priority (higher gas_price) set.insert(address, nonce, old); // and remove new one - by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`."); + let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`."); + if order.origin.is_local() { + local.mark_replaced(order.transaction, old_fee, old_hash); + } false } else { trace!(target: "txqueue", "Replaced transaction: {:?} with transaction with higher gas price: {:?}", old.hash, order.hash); // Make sure we remove old transaction entirely - by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`."); + let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`."); + if old.origin.is_local() { + local.mark_replaced(old.transaction, new_fee, new_hash); + } true } } @@ -1078,6 +1168,7 @@ mod test { use error::{Error, TransactionError}; use super::*; use super::{TransactionSet, TransactionOrder, VerifiedTransaction}; + use miner::local_transactions::LocalTransactionsList; use client::TransactionImportResult; fn unwrap_tx_err(err: Result) -> TransactionError { @@ -1208,6 +1299,7 @@ mod test { #[test] fn should_create_transaction_set() { // given + let mut local = LocalTransactionsList::default(); let mut set = TransactionSet { by_priority: BTreeSet::new(), by_address: Table::new(), @@ -1235,7 +1327,7 @@ mod test { assert_eq!(set.by_address.len(), 2); // when - set.enforce_limit(&mut by_hash); + set.enforce_limit(&mut by_hash, &mut local); // then assert_eq!(by_hash.len(), 1);