Keep track of local transactions

This commit is contained in:
Tomasz Drwięga 2016-11-16 15:58:14 +01:00
parent 78b5c743f6
commit 66e327dfcb
6 changed files with 309 additions and 20 deletions

1
Cargo.lock generated
View File

@ -297,6 +297,7 @@ dependencies = [
"heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.9.4 (git+https://github.com/ethcore/hyper)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -27,6 +27,7 @@ time = "0.1"
rand = "0.3"
byteorder = "0.5"
transient-hashmap = "0.1"
linked-hash-map = "0.3.0"
evmjit = { path = "../evmjit", optional = true }
clippy = { version = "0.0.96", optional = true}
ethash = { path = "../ethash" }

View File

@ -102,6 +102,7 @@ extern crate rlp;
extern crate ethcore_bloom_journal as bloom_journal;
extern crate byteorder;
extern crate transient_hashmap;
extern crate linked_hash_map;
#[macro_use]
extern crate log;

View File

@ -0,0 +1,193 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Local Transactions List.
use linked_hash_map::LinkedHashMap;
use transaction::SignedTransaction;
use error::TransactionError;
use util::{U256, H256};
#[derive(Debug, PartialEq, Clone)]
pub enum Status {
/// The transaction is currently in the transaction queue.
Pending,
/// The transaction is in future part of the queue.
Future,
/// Transaction is already mined.
Mined(SignedTransaction),
/// Transaction is dropped because of limit
Dropped(SignedTransaction),
/// Replaced because of higher gas price of another transaction.
Replaced(SignedTransaction, U256, H256),
/// Transaction was never accepted to the queue.
Rejected(SignedTransaction, TransactionError),
/// Transaction is invalid.
Invalid(SignedTransaction),
}
impl Status {
fn is_current(&self) -> bool {
*self == Status::Pending || *self == Status::Future
}
}
/// Keeps track of local transactions that are in the queue or were mined/dropped recently.
#[derive(Debug)]
pub struct LocalTransactionsList {
max_old: usize,
transactions: LinkedHashMap<H256, Status>,
}
impl Default for LocalTransactionsList {
fn default() -> Self {
Self::new(10)
}
}
impl LocalTransactionsList {
pub fn new(max_old: usize) -> Self {
LocalTransactionsList {
max_old: max_old,
transactions: Default::default(),
}
}
pub fn mark_pending(&mut self, hash: H256) {
self.clear_old();
self.transactions.insert(hash, Status::Pending);
}
pub fn mark_future(&mut self, hash: H256) {
self.transactions.insert(hash, Status::Future);
self.clear_old();
}
pub fn mark_rejected(&mut self, tx: SignedTransaction, err: TransactionError) {
self.transactions.insert(tx.hash(), Status::Rejected(tx, err));
self.clear_old();
}
pub fn mark_replaced(&mut self, tx: SignedTransaction, gas_price: U256, hash: H256) {
self.transactions.insert(tx.hash(), Status::Replaced(tx, gas_price, hash));
self.clear_old();
}
pub fn mark_invalid(&mut self, tx: SignedTransaction) {
self.transactions.insert(tx.hash(), Status::Invalid(tx));
self.clear_old();
}
pub fn mark_dropped(&mut self, tx: SignedTransaction) {
self.transactions.insert(tx.hash(), Status::Dropped(tx));
self.clear_old();
}
pub fn mark_mined(&mut self, tx: SignedTransaction) {
self.transactions.insert(tx.hash(), Status::Mined(tx));
self.clear_old();
}
pub fn contains(&self, hash: &H256) -> bool {
self.transactions.contains_key(hash)
}
pub fn all_transactions(&self) -> &LinkedHashMap<H256, Status> {
&self.transactions
}
fn clear_old(&mut self) {
let number_of_old = self.transactions
.values()
.filter(|status| !status.is_current())
.count();
if self.max_old >= number_of_old {
return;
}
let to_remove = self.transactions
.iter()
.filter(|&(_, status)| !status.is_current())
.map(|(hash, _)| *hash)
.take(number_of_old - self.max_old)
.collect::<Vec<_>>();
for hash in to_remove {
self.transactions.remove(&hash);
}
}
}
#[cfg(test)]
mod tests {
use util::U256;
use ethkey::{Random, Generator};
use transaction::{Action, Transaction, SignedTransaction};
use super::{LocalTransactionsList, Status};
#[test]
fn should_add_transaction_as_pending() {
// given
let mut list = LocalTransactionsList::default();
// when
list.mark_pending(10.into());
list.mark_future(20.into());
// then
assert!(list.contains(&10.into()), "Should contain the transaction.");
assert!(list.contains(&20.into()), "Should contain the transaction.");
let statuses = list.all_transactions().values().cloned().collect::<Vec<Status>>();
assert_eq!(statuses, vec![Status::Pending, Status::Future]);
}
#[test]
fn should_clear_old_transactions() {
// given
let mut list = LocalTransactionsList::new(1);
let tx1 = new_tx(10.into());
let tx1_hash = tx1.hash();
let tx2 = new_tx(50.into());
let tx2_hash = tx2.hash();
list.mark_pending(10.into());
list.mark_invalid(tx1);
list.mark_dropped(tx2);
assert!(list.contains(&tx2_hash));
assert!(!list.contains(&tx1_hash));
assert!(list.contains(&10.into()));
// when
list.mark_future(15.into());
// then
assert!(list.contains(&10.into()));
assert!(list.contains(&15.into()));
}
fn new_tx(nonce: U256) -> SignedTransaction {
let keypair = Random.generate().unwrap();
Transaction {
action: Action::Create,
value: U256::from(100),
data: Default::default(),
gas: U256::from(10),
gas_price: U256::from(1245),
nonce: nonce
}.sign(keypair.secret(), None)
}
}

View File

@ -43,6 +43,7 @@
mod banning_queue;
mod external;
mod local_transactions;
mod miner;
mod price_info;
mod transaction_queue;

View File

@ -91,6 +91,7 @@ use util::table::Table;
use transaction::*;
use error::{Error, TransactionError};
use client::TransactionImportResult;
use miner::local_transactions::LocalTransactionsList;
/// Transaction origin
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -125,6 +126,12 @@ impl Ord for TransactionOrigin {
}
}
impl TransactionOrigin {
fn is_local(&self) -> bool {
*self == TransactionOrigin::Local
}
}
#[derive(Clone, Debug)]
/// Light structure used to identify transaction and its order
struct TransactionOrder {
@ -352,7 +359,7 @@ impl TransactionSet {
///
/// It drops transactions from this set but also removes associated `VerifiedTransaction`.
/// Returns addresses and lowest nonces of transactions removed because of limit.
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> Option<HashMap<Address, U256>> {
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>, local: &mut LocalTransactionsList) -> Option<HashMap<Address, U256>> {
let mut count = 0;
let mut gas: U256 = 0.into();
let to_drop : Vec<(Address, U256)> = {
@ -379,9 +386,13 @@ impl TransactionSet {
.expect("Transaction has just been found in `by_priority`; so it is in `by_address` also.");
trace!(target: "txqueue", "Dropped out of limit transaction: {:?}", order.hash);
by_hash.remove(&order.hash)
let order = by_hash.remove(&order.hash)
.expect("hash is in `by_priorty`; all hashes in `by_priority` must be in `by_hash`; qed");
if order.origin.is_local() {
local.mark_dropped(order.transaction);
}
let min = removed.get(&sender).map_or(nonce, |val| cmp::min(*val, nonce));
removed.insert(sender, min);
removed
@ -488,6 +499,8 @@ pub struct TransactionQueue {
by_hash: HashMap<H256, VerifiedTransaction>,
/// Last nonce of transaction in current (to quickly check next expected transaction)
last_nonces: HashMap<Address, U256>,
/// List of local transactions and their statuses.
local_transactions: LocalTransactionsList,
}
impl Default for TransactionQueue {
@ -529,6 +542,7 @@ impl TransactionQueue {
future: future,
by_hash: HashMap::new(),
last_nonces: HashMap::new(),
local_transactions: LocalTransactionsList::default(),
}
}
@ -537,8 +551,8 @@ impl TransactionQueue {
self.current.set_limit(limit);
self.future.set_limit(limit);
// And ensure the limits
self.current.enforce_limit(&mut self.by_hash);
self.future.enforce_limit(&mut self.by_hash);
self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
}
/// Returns current limit of transactions in the queue.
@ -578,7 +592,7 @@ impl TransactionQueue {
pub fn set_total_gas_limit(&mut self, gas_limit: U256) {
self.future.gas_limit = gas_limit;
self.current.gas_limit = gas_limit;
self.future.enforce_limit(&mut self.by_hash);
self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
}
/// Set the new limit for the amount of gas any individual transaction may have.
@ -609,6 +623,42 @@ impl TransactionQueue {
F: Fn(&Address) -> AccountDetails,
G: Fn(&SignedTransaction) -> U256,
{
if origin == TransactionOrigin::Local {
let hash = tx.hash();
let cloned_tx = tx.clone();
let result = self.add_internal(tx, origin, fetch_account, gas_estimator);
match result {
Ok(TransactionImportResult::Current) => {
self.local_transactions.mark_pending(hash);
},
Ok(TransactionImportResult::Future) => {
self.local_transactions.mark_future(hash);
},
Err(Error::Transaction(ref err)) => {
self.local_transactions.mark_rejected(cloned_tx, err.clone());
},
Err(_) => {
self.local_transactions.mark_invalid(cloned_tx);
},
}
result
} else {
self.add_internal(tx, origin, fetch_account, gas_estimator)
}
}
/// Adds signed transaction to the queue.
fn add_internal<F, G>(
&mut self,
tx: SignedTransaction,
origin: TransactionOrigin,
fetch_account: &F,
gas_estimator: &G,
) -> Result<TransactionImportResult, Error> where
F: Fn(&Address) -> AccountDetails,
G: Fn(&SignedTransaction) -> U256,
{
if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local {
trace!(target: "txqueue",
@ -647,7 +697,6 @@ impl TransactionQueue {
self.gas_limit,
self.tx_gas_limit
);
return Err(Error::Transaction(TransactionError::GasLimitExceeded {
limit: self.gas_limit,
got: tx.gas,
@ -766,6 +815,11 @@ impl TransactionQueue {
trace!(target: "txqueue", "Removing invalid transaction: {:?}", transaction.hash());
// Mark in locals
if self.local_transactions.contains(transaction_hash) {
self.local_transactions.mark_invalid(transaction.transaction.clone());
}
// Remove from future
let order = self.future.drop(&sender, &nonce);
if order.is_some() {
@ -821,15 +875,21 @@ impl TransactionQueue {
qed");
if k >= current_nonce {
let order = order.update_height(k, current_nonce);
if order.origin.is_local() {
self.local_transactions.mark_future(order.hash);
}
if let Some(old) = self.future.insert(*sender, k, order.clone()) {
Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash);
Self::replace_orders(*sender, k, old, order, &mut self.future, &mut self.by_hash, &mut self.local_transactions);
}
} else {
trace!(target: "txqueue", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce);
self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`");
let tx = self.by_hash.remove(&order.hash).expect("All transactions in `future` are also in `by_hash`");
if tx.origin.is_local() {
self.local_transactions.mark_mined(tx.transaction);
}
}
self.future.enforce_limit(&mut self.by_hash);
}
self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
}
/// Returns top transactions from the queue ordered by priority.
@ -897,8 +957,11 @@ impl TransactionQueue {
self.future.by_gas_price.remove(&order.gas_price, &order.hash);
// Put to current
let order = order.update_height(current_nonce, first_nonce);
if order.origin.is_local() {
self.local_transactions.mark_pending(order.hash);
}
if let Some(old) = self.current.insert(address, current_nonce, order.clone()) {
Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash);
Self::replace_orders(address, current_nonce, old, order, &mut self.current, &mut self.by_hash, &mut self.local_transactions);
}
update_last_nonce_to = Some(current_nonce);
current_nonce = current_nonce + U256::one();
@ -957,9 +1020,11 @@ impl TransactionQueue {
if nonce > next_nonce {
// We have a gap - put to future.
// Insert transaction (or replace old one with lower gas price)
try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash)));
try!(check_too_cheap(
Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.future, &mut self.by_hash, &mut self.local_transactions)
));
// Enforce limit in Future
let removed = self.future.enforce_limit(&mut self.by_hash);
let removed = self.future.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
// Return an error if this transaction was not imported because of limit.
try!(check_if_removed(&address, &nonce, removed));
@ -973,13 +1038,15 @@ impl TransactionQueue {
self.move_matching_future_to_current(address, nonce + U256::one(), state_nonce);
// Replace transaction if any
try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash)));
try!(check_too_cheap(
Self::replace_transaction(tx, state_nonce, min_gas_price, &mut self.current, &mut self.by_hash, &mut self.local_transactions)
));
// Keep track of highest nonce stored in current
let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n));
self.last_nonces.insert(address, new_max);
// Also enforce the limit
let removed = self.current.enforce_limit(&mut self.by_hash);
let removed = self.current.enforce_limit(&mut self.by_hash, &mut self.local_transactions);
// If some transaction were removed because of limit we need to update last_nonces also.
self.update_last_nonces(&removed);
// Trigger error if the transaction we are importing was removed.
@ -1010,7 +1077,14 @@ impl TransactionQueue {
///
/// Returns `true` if transaction actually got to the queue (`false` if there was already a transaction with higher
/// gas_price)
fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, min_gas_price: (U256, PrioritizationStrategy), set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> bool {
fn replace_transaction(
tx: VerifiedTransaction,
base_nonce: U256,
min_gas_price: (U256, PrioritizationStrategy),
set: &mut TransactionSet,
by_hash: &mut HashMap<H256, VerifiedTransaction>,
local: &mut LocalTransactionsList,
) -> bool {
let order = TransactionOrder::for_transaction(&tx, base_nonce, min_gas_price.0, min_gas_price.1);
let hash = tx.hash();
let address = tx.sender();
@ -1021,14 +1095,24 @@ impl TransactionQueue {
if let Some(old) = set.insert(address, nonce, order.clone()) {
Self::replace_orders(address, nonce, old, order, set, by_hash)
Self::replace_orders(address, nonce, old, order, set, by_hash, local)
} else {
true
}
}
fn replace_orders(address: Address, nonce: U256, old: TransactionOrder, order: TransactionOrder, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> bool {
fn replace_orders(
address: Address,
nonce: U256,
old: TransactionOrder,
order: TransactionOrder,
set: &mut TransactionSet,
by_hash: &mut HashMap<H256, VerifiedTransaction>,
local: &mut LocalTransactionsList,
) -> bool {
// There was already transaction in queue. Let's check which one should stay
let old_hash = old.hash;
let new_hash = order.hash;
let old_fee = old.gas_price;
let new_fee = order.gas_price;
if old_fee.cmp(&new_fee) == Ordering::Greater {
@ -1036,12 +1120,18 @@ impl TransactionQueue {
// Put back old transaction since it has greater priority (higher gas_price)
set.insert(address, nonce, old);
// and remove new one
by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`.");
let order = by_hash.remove(&order.hash).expect("The hash has been just inserted and no other line is altering `by_hash`.");
if order.origin.is_local() {
local.mark_replaced(order.transaction, old_fee, old_hash);
}
false
} else {
trace!(target: "txqueue", "Replaced transaction: {:?} with transaction with higher gas price: {:?}", old.hash, order.hash);
// Make sure we remove old transaction entirely
by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`.");
let old = by_hash.remove(&old.hash).expect("The hash is coming from `future` so it has to be in `by_hash`.");
if old.origin.is_local() {
local.mark_replaced(old.transaction, new_fee, new_hash);
}
true
}
}
@ -1078,6 +1168,7 @@ mod test {
use error::{Error, TransactionError};
use super::*;
use super::{TransactionSet, TransactionOrder, VerifiedTransaction};
use miner::local_transactions::LocalTransactionsList;
use client::TransactionImportResult;
fn unwrap_tx_err(err: Result<TransactionImportResult, Error>) -> TransactionError {
@ -1208,6 +1299,7 @@ mod test {
#[test]
fn should_create_transaction_set() {
// given
let mut local = LocalTransactionsList::default();
let mut set = TransactionSet {
by_priority: BTreeSet::new(),
by_address: Table::new(),
@ -1235,7 +1327,7 @@ mod test {
assert_eq!(set.by_address.len(), 2);
// when
set.enforce_limit(&mut by_hash);
set.enforce_limit(&mut by_hash, &mut local);
// then
assert_eq!(by_hash.len(), 1);