diff --git a/Cargo.lock b/Cargo.lock
index a93d2d551..94e754997 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -297,6 +297,7 @@ dependencies = [
"heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.9.4 (git+https://github.com/ethcore/hyper)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml
index 667b40ace..48fce064a 100644
--- a/ethcore/Cargo.toml
+++ b/ethcore/Cargo.toml
@@ -27,6 +27,7 @@ time = "0.1"
rand = "0.3"
byteorder = "0.5"
transient-hashmap = "0.1"
+linked-hash-map = "0.3.0"
evmjit = { path = "../evmjit", optional = true }
clippy = { version = "0.0.96", optional = true}
ethash = { path = "../ethash" }
diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs
index bf3e59171..ca343b1a7 100644
--- a/ethcore/src/lib.rs
+++ b/ethcore/src/lib.rs
@@ -102,6 +102,7 @@ extern crate rlp;
extern crate ethcore_bloom_journal as bloom_journal;
extern crate byteorder;
extern crate transient_hashmap;
+extern crate linked_hash_map;
#[macro_use]
extern crate log;
diff --git a/ethcore/src/miner/local_transactions.rs b/ethcore/src/miner/local_transactions.rs
new file mode 100644
index 000000000..c9c869c33
--- /dev/null
+++ b/ethcore/src/miner/local_transactions.rs
@@ -0,0 +1,193 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+//! Local Transactions List.
+
+use linked_hash_map::LinkedHashMap;
+use transaction::SignedTransaction;
+use error::TransactionError;
+use util::{U256, H256};
+
+#[derive(Debug, PartialEq, Clone)]
+pub enum Status {
+ /// The transaction is currently in the transaction queue.
+ Pending,
+ /// The transaction is in future part of the queue.
+ Future,
+ /// Transaction is already mined.
+ Mined(SignedTransaction),
+ /// Transaction is dropped because of limit
+ Dropped(SignedTransaction),
+ /// Replaced because of higher gas price of another transaction.
+ Replaced(SignedTransaction, U256, H256),
+ /// Transaction was never accepted to the queue.
+ Rejected(SignedTransaction, TransactionError),
+ /// Transaction is invalid.
+ Invalid(SignedTransaction),
+}
+
+impl Status {
+ fn is_current(&self) -> bool {
+ *self == Status::Pending || *self == Status::Future
+ }
+}
+
+/// Keeps track of local transactions that are in the queue or were mined/dropped recently.
+#[derive(Debug)]
+pub struct LocalTransactionsList {
+ max_old: usize,
+ transactions: LinkedHashMap,
+}
+
+impl Default for LocalTransactionsList {
+ fn default() -> Self {
+ Self::new(10)
+ }
+}
+
+impl LocalTransactionsList {
+ pub fn new(max_old: usize) -> Self {
+ LocalTransactionsList {
+ max_old: max_old,
+ transactions: Default::default(),
+ }
+ }
+
+ pub fn mark_pending(&mut self, hash: H256) {
+ self.clear_old();
+ self.transactions.insert(hash, Status::Pending);
+ }
+
+ pub fn mark_future(&mut self, hash: H256) {
+ self.transactions.insert(hash, Status::Future);
+ self.clear_old();
+ }
+
+ pub fn mark_rejected(&mut self, tx: SignedTransaction, err: TransactionError) {
+ self.transactions.insert(tx.hash(), Status::Rejected(tx, err));
+ self.clear_old();
+ }
+
+ pub fn mark_replaced(&mut self, tx: SignedTransaction, gas_price: U256, hash: H256) {
+ self.transactions.insert(tx.hash(), Status::Replaced(tx, gas_price, hash));
+ self.clear_old();
+ }
+
+ pub fn mark_invalid(&mut self, tx: SignedTransaction) {
+ self.transactions.insert(tx.hash(), Status::Invalid(tx));
+ self.clear_old();
+ }
+
+ pub fn mark_dropped(&mut self, tx: SignedTransaction) {
+ self.transactions.insert(tx.hash(), Status::Dropped(tx));
+ self.clear_old();
+ }
+
+ pub fn mark_mined(&mut self, tx: SignedTransaction) {
+ self.transactions.insert(tx.hash(), Status::Mined(tx));
+ self.clear_old();
+ }
+
+ pub fn contains(&self, hash: &H256) -> bool {
+ self.transactions.contains_key(hash)
+ }
+
+ pub fn all_transactions(&self) -> &LinkedHashMap {
+ &self.transactions
+ }
+
+ fn clear_old(&mut self) {
+ let number_of_old = self.transactions
+ .values()
+ .filter(|status| !status.is_current())
+ .count();
+
+ if self.max_old >= number_of_old {
+ return;
+ }
+
+ let to_remove = self.transactions
+ .iter()
+ .filter(|&(_, status)| !status.is_current())
+ .map(|(hash, _)| *hash)
+ .take(number_of_old - self.max_old)
+ .collect::>();
+
+ for hash in to_remove {
+ self.transactions.remove(&hash);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use util::U256;
+ use ethkey::{Random, Generator};
+ use transaction::{Action, Transaction, SignedTransaction};
+ use super::{LocalTransactionsList, Status};
+
+ #[test]
+ fn should_add_transaction_as_pending() {
+ // given
+ let mut list = LocalTransactionsList::default();
+
+ // when
+ list.mark_pending(10.into());
+ list.mark_future(20.into());
+
+ // then
+ assert!(list.contains(&10.into()), "Should contain the transaction.");
+ assert!(list.contains(&20.into()), "Should contain the transaction.");
+ let statuses = list.all_transactions().values().cloned().collect::>();
+ assert_eq!(statuses, vec![Status::Pending, Status::Future]);
+ }
+
+ #[test]
+ fn should_clear_old_transactions() {
+ // given
+ let mut list = LocalTransactionsList::new(1);
+ let tx1 = new_tx(10.into());
+ let tx1_hash = tx1.hash();
+ let tx2 = new_tx(50.into());
+ let tx2_hash = tx2.hash();
+
+ list.mark_pending(10.into());
+ list.mark_invalid(tx1);
+ list.mark_dropped(tx2);
+ assert!(list.contains(&tx2_hash));
+ assert!(!list.contains(&tx1_hash));
+ assert!(list.contains(&10.into()));
+
+ // when
+ list.mark_future(15.into());
+
+ // then
+ assert!(list.contains(&10.into()));
+ assert!(list.contains(&15.into()));
+ }
+
+ fn new_tx(nonce: U256) -> SignedTransaction {
+ let keypair = Random.generate().unwrap();
+ Transaction {
+ action: Action::Create,
+ value: U256::from(100),
+ data: Default::default(),
+ gas: U256::from(10),
+ gas_price: U256::from(1245),
+ nonce: nonce
+ }.sign(keypair.secret(), None)
+ }
+}
diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs
index 2eb09cdf1..29d731e9e 100644
--- a/ethcore/src/miner/mod.rs
+++ b/ethcore/src/miner/mod.rs
@@ -43,6 +43,7 @@
mod banning_queue;
mod external;
+mod local_transactions;
mod miner;
mod price_info;
mod transaction_queue;
diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs
index cc10bbe98..9f688adf5 100644
--- a/ethcore/src/miner/transaction_queue.rs
+++ b/ethcore/src/miner/transaction_queue.rs
@@ -91,6 +91,7 @@ use util::table::Table;
use transaction::*;
use error::{Error, TransactionError};
use client::TransactionImportResult;
+use miner::local_transactions::LocalTransactionsList;
/// Transaction origin
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -125,6 +126,12 @@ impl Ord for TransactionOrigin {
}
}
+impl TransactionOrigin {
+ fn is_local(&self) -> bool {
+ *self == TransactionOrigin::Local
+ }
+}
+
#[derive(Clone, Debug)]
/// Light structure used to identify transaction and its order
struct TransactionOrder {
@@ -352,7 +359,7 @@ impl TransactionSet {
///
/// It drops transactions from this set but also removes associated `VerifiedTransaction`.
/// Returns addresses and lowest nonces of transactions removed because of limit.
- fn enforce_limit(&mut self, by_hash: &mut HashMap) -> Option> {
+ fn enforce_limit(&mut self, by_hash: &mut HashMap, local: &mut LocalTransactionsList) -> Option> {
let mut count = 0;
let mut gas: U256 = 0.into();
let to_drop : Vec<(Address, U256)> = {
@@ -379,9 +386,13 @@ impl TransactionSet {
.expect("Transaction has just been found in `by_priority`; so it is in `by_address` also.");
trace!(target: "txqueue", "Dropped out of limit transaction: {:?}", order.hash);
- by_hash.remove(&order.hash)
+ let order = by_hash.remove(&order.hash)
.expect("hash is in `by_priorty`; all hashes in `by_priority` must be in `by_hash`; qed");
+ if order.origin.is_local() {
+ local.mark_dropped(order.transaction);
+ }
+
let min = removed.get(&sender).map_or(nonce, |val| cmp::min(*val, nonce));
removed.insert(sender, min);
removed
@@ -488,6 +499,8 @@ pub struct TransactionQueue {
by_hash: HashMap,
/// Last nonce of transaction in current (to quickly check next expected transaction)
last_nonces: HashMap,
+ /// List of local transactions and their statuses.
+ local_transactions: LocalTransactionsList,
}
impl Default for TransactionQueue {
@@ -529,6 +542,7 @@ impl TransactionQueue {
future: future,
by_hash: HashMap::new(),
last_nonces: HashMap::new(),
+ local_transactions: LocalTransactionsList::default(),
}
}
@@ -537,8 +551,8 @@ impl TransactionQueue {
self.current.set_limit(limit);
self.future.set_limit(limit);
// And ensure the limits
- self.current.enforce_limit(&mut self.by_hash);
- self.future.enforce_limit(&mut self.by_hash);
+ self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
+ self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
}
/// Returns current limit of transactions in the queue.
@@ -578,7 +592,7 @@ impl TransactionQueue {
pub fn set_total_gas_limit(&mut self, gas_limit: U256) {
self.future.gas_limit = gas_limit;
self.current.gas_limit = gas_limit;
- self.future.enforce_limit(&mut self.by_hash);
+ self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
}
/// Set the new limit for the amount of gas any individual transaction may have.
@@ -609,6 +623,42 @@ impl TransactionQueue {
F: Fn(&Address) -> AccountDetails,
G: Fn(&SignedTransaction) -> U256,
{
+ if origin == TransactionOrigin::Local {
+ let hash = tx.hash();
+ let cloned_tx = tx.clone();
+
+ let result = self.add_internal(tx, origin, fetch_account, gas_estimator);
+ match result {
+ Ok(TransactionImportResult::Current) => {
+ self.local_transactions.mark_pending(hash);
+ },
+ Ok(TransactionImportResult::Future) => {
+ self.local_transactions.mark_future(hash);
+ },
+ Err(Error::Transaction(ref err)) => {
+ self.local_transactions.mark_rejected(cloned_tx, err.clone());
+ },
+ Err(_) => {
+ self.local_transactions.mark_invalid(cloned_tx);
+ },
+ }
+ result
+ } else {
+ self.add_internal(tx, origin, fetch_account, gas_estimator)
+ }
+ }
+
+ /// Adds signed transaction to the queue.
+ fn add_internal(
+ &mut self,
+ tx: SignedTransaction,
+ origin: TransactionOrigin,
+ fetch_account: &F,
+ gas_estimator: &G,
+ ) -> Result where
+ F: Fn(&Address) -> AccountDetails,
+ G: Fn(&SignedTransaction) -> U256,
+ {
if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local {
trace!(target: "txqueue",
@@ -647,7 +697,6 @@ impl TransactionQueue {
self.gas_limit,
self.tx_gas_limit
);
-
return Err(Error::Transaction(TransactionError::GasLimitExceeded {
limit: self.gas_limit,
got: tx.gas,
@@ -766,6 +815,11 @@ impl TransactionQueue {
trace!(target: "txqueue", "Removing invalid transaction: {:?}", transaction.hash());
+ // Mark in locals
+ if self.local_transactions.contains(transaction_hash) {
+ self.local_transactions.mark_invalid(transaction.transaction.clone());
+ }
+
// Remove from future
let order = self.future.drop(&sender, &nonce);
if order.is_some() {
@@ -821,15 +875,21 @@ impl TransactionQueue {
qed");
if k >= current_nonce {
let order = order.update_height(k, current_nonce);
+ if order.origin.is_local() {
+ self.local_transactions.mark_future(order.hash);
+ }
if let Some(old) = self.future.insert(*sender, k, order.clone()) {
- Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash);
+ Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash, &mut self.local_transactions);
}
} else {
trace!(target: "txqueue", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce);
- self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`");
+ let tx = self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`");
+ if tx.origin.is_local() {
+ self.local_transactions.mark_mined(tx.transaction);
+ }
}
}
- self.future.enforce_limit(&mut self.by_hash);
+ self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
}
/// Returns top transactions from the queue ordered by priority.
@@ -897,8 +957,11 @@ impl TransactionQueue {
self.future.by_gas_price.remove(&order.gas_price, &order.hash);
// Put to current
let order = order.update_height(current_nonce, first_nonce);
+ if order.origin.is_local() {
+ self.local_transactions.mark_pending(order.hash);
+ }
if let Some(old) = self.current.insert(address, current_nonce, order.clone()) {
- Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash);
+ Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash, &mut self.local_transactions);
}
update_last_nonce_to = Some(current_nonce);
current_nonce = current_nonce + U256::one();
@@ -957,9 +1020,11 @@ impl TransactionQueue {
if nonce > next_nonce {
// We have a gap - put to future.
// Insert transaction (or replace old one with lower gas price)
- try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash)));
+ try!(check_too_cheap(
+ Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash, &mut self.local_transactions)
+ ));
// Enforce limit in Future
- let removed = self.future.enforce_limit(&mut self.by_hash);
+ let removed = self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
// Return an error if this transaction was not imported because of limit.
try!(check_if_removed(&address, &nonce, removed));
@@ -973,13 +1038,15 @@ impl TransactionQueue {
self.move_matching_future_to_current(address, nonce + U256::one(), state_nonce);
// Replace transaction if any
- try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash)));
+ try!(check_too_cheap(
+ Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash, &mut self.local_transactions)
+ ));
// Keep track of highest nonce stored in current
let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n));
self.last_nonces.insert(address, new_max);
// Also enforce the limit
- let removed = self.current.enforce_limit(&mut self.by_hash);
+ let removed = self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
// If some transaction were removed because of limit we need to update last_nonces also.
self.update_last_nonces(&removed);
// Trigger error if the transaction we are importing was removed.
@@ -1010,7 +1077,14 @@ impl TransactionQueue {
///
/// Returns `true` if transaction actually got to the queue (`false` if there was already a transaction with higher
/// gas_price)
- fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, min_gas_price: (U256, PrioritizationStrategy), set: &mut TransactionSet, by_hash: &mut HashMap) -> bool {
+ fn replace_transaction(
+ tx: VerifiedTransaction,
+ base_nonce: U256,
+ min_gas_price: (U256, PrioritizationStrategy),
+ set: &mut TransactionSet,
+ by_hash: &mut HashMap,
+ local: &mut LocalTransactionsList,
+ ) -> bool {
let order = TransactionOrder::for_transaction(&tx, base_nonce, min_gas_price.0, min_gas_price.1);
let hash = tx.hash();
let address = tx.sender();
@@ -1021,14 +1095,24 @@ impl TransactionQueue {
if let Some(old) = set.insert(address, nonce, order.clone()) {
- Self::replace_orders(address, nonce, old, order, set, by_hash)
+ Self::replace_orders(address, nonce, old, order, set, by_hash, local)
} else {
true
}
}
- fn replace_orders(address: Address, nonce: U256, old: TransactionOrder, order: TransactionOrder, set: &mut TransactionSet, by_hash: &mut HashMap) -> bool {
+ fn replace_orders(
+ address: Address,
+ nonce: U256,
+ old: TransactionOrder,
+ order: TransactionOrder,
+ set: &mut TransactionSet,
+ by_hash: &mut HashMap,
+ local: &mut LocalTransactionsList,
+ ) -> bool {
// There was already transaction in queue. Let's check which one should stay
+ let old_hash = old.hash;
+ let new_hash = order.hash;
let old_fee = old.gas_price;
let new_fee = order.gas_price;
if old_fee.cmp(&new_fee) == Ordering::Greater {
@@ -1036,12 +1120,18 @@ impl TransactionQueue {
// Put back old transaction since it has greater priority (higher gas_price)
set.insert(address, nonce, old);
// and remove new one
- by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`.");
+ let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`.");
+ if order.origin.is_local() {
+ local.mark_replaced(order.transaction, old_fee, old_hash);
+ }
false
} else {
trace!(target: "txqueue", "Replaced transaction: {:?} with transaction with higher gas price: {:?}", old.hash, order.hash);
// Make sure we remove old transaction entirely
- by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`.");
+ let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`.");
+ if old.origin.is_local() {
+ local.mark_replaced(old.transaction, new_fee, new_hash);
+ }
true
}
}
@@ -1078,6 +1168,7 @@ mod test {
use error::{Error, TransactionError};
use super::*;
use super::{TransactionSet, TransactionOrder, VerifiedTransaction};
+ use miner::local_transactions::LocalTransactionsList;
use client::TransactionImportResult;
fn unwrap_tx_err(err: Result) -> TransactionError {
@@ -1208,6 +1299,7 @@ mod test {
#[test]
fn should_create_transaction_set() {
// given
+ let mut local = LocalTransactionsList::default();
let mut set = TransactionSet {
by_priority: BTreeSet::new(),
by_address: Table::new(),
@@ -1235,7 +1327,7 @@ mod test {
assert_eq!(set.by_address.len(), 2);
// when
- set.enforce_limit(&mut by_hash);
+ set.enforce_limit(&mut by_hash, &mut local);
// then
assert_eq!(by_hash.len(), 1);