diff --git a/Cargo.lock b/Cargo.lock index 0da1c18a0..ab45a5569 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -554,6 +554,19 @@ dependencies = [ "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ethcore-bigint" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bigint 4.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "heapsize 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", + "plain_hasher 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ethcore-bloom-journal" version = "0.1.0" @@ -2409,6 +2422,14 @@ dependencies = [ "crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "plain_hasher" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "podio" version = "0.1.5" @@ -3276,6 +3297,16 @@ name = "traitobject" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "transaction-pool" +version = "1.9.0" +dependencies = [ + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-bigint 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "transient-hashmap" version = "0.4.0" @@ -3608,6 +3639,7 @@ dependencies = [ "checksum error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3" "checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "" "checksum ethabi 4.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c819a3adef0413a2519cbd9a19a35dd1c20c7a0110705beaba8aa4aa87eda95f" +"checksum ethcore-bigint 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bcb5af77e74a8f70e9c3337e069c37bc82178ef1b459c02091f73c4ad5281eb5" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum flate2 0.2.20 (registry+https://github.com/rust-lang/crates.io-index)" = "e6234dd4468ae5d1e2dbb06fe2b058696fdc50a339c68a393aefbf00bc81e423" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" @@ -3710,6 +3742,7 @@ dependencies = [ "checksum phf_generator 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "6b07ffcc532ccc85e3afc45865469bf5d9e4ef5bfcf9622e3cfe80c2d275ec03" "checksum phf_shared 0.7.21 (registry+https://github.com/rust-lang/crates.io-index)" = "07e24b0ca9643bdecd0632f2b3da6b1b89bbb0030e0b992afc1113b23a7bc2f2" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" +"checksum plain_hasher 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "83ae80873992f511142c07d0ec6c44de5636628fdb7e204abd655932ea79d995" "checksum podio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e5422a1ee1bc57cc47ae717b0137314258138f38fd5f3cea083f43a9725383a0" "checksum pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2412f3332a07c7a2a50168988dcc184f32180a9758ad470390e5f55e089f6b6e" "checksum primal 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0e31b86efadeaeb1235452171a66689682783149a6249ff334a2c5d8218d00a4" diff --git a/Cargo.toml b/Cargo.toml index 17250c32c..07573cdd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,4 +116,13 @@ lto = false panic = "abort" [workspace] -members = ["ethstore/cli", "ethkey/cli", "evmbin", "whisper", "chainspec", "dapps/js-glue", "ethcore/wasm/run"] +members = [ + "chainspec", + "dapps/js-glue", + "ethcore/wasm/run", + "ethkey/cli", + "ethstore/cli", + "evmbin", + "transaction-pool", + "whisper", +] diff --git a/transaction-pool/Cargo.toml b/transaction-pool/Cargo.toml new file mode 100644 index 000000000..3d53d3273 --- /dev/null +++ b/transaction-pool/Cargo.toml @@ -0,0 +1,12 @@ +[package] +description = "Generic transaction pool." +name = "transaction-pool" +version = "1.9.0" +license = "GPL-3.0" +authors = ["Parity Technologies "] + +[dependencies] +error-chain = "0.11" +log="0.3" +smallvec = "0.4" +ethcore-bigint = { features = ["heapsizeof"], version="0.2" } diff --git a/transaction-pool/src/error.rs b/transaction-pool/src/error.rs new file mode 100644 index 000000000..2adb75963 --- /dev/null +++ b/transaction-pool/src/error.rs @@ -0,0 +1,48 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use bigint::hash::H256; + +error_chain! { + errors { + AlreadyImported(hash: H256) { + description("transaction is already in the queue"), + display("[{:?}] transaction already imported", hash) + } + TooCheapToEnter(hash: H256) { + description("the pool is full and transaction is too cheap to replace any transaction"), + display("[{:?}] transaction too cheap to enter the pool", hash) + } + TooCheapToReplace(old_hash: H256, hash: H256) { + description("transaction is too cheap to replace existing transaction in the queue"), + display("[{:?}] transaction too cheap to replace: {:?}", hash, old_hash) + } + } +} + +#[cfg(test)] +impl PartialEq for ErrorKind { + fn eq(&self, other: &Self) -> bool { + use self::ErrorKind::*; + + match (self, other) { + (&AlreadyImported(ref h1), &AlreadyImported(ref h2)) => h1 == h2, + (&TooCheapToEnter(ref h1), &TooCheapToEnter(ref h2)) => h1 == h2, + (&TooCheapToReplace(ref old1, ref new1), &TooCheapToReplace(ref old2, ref new2)) => old1 == old2 && new1 == new2, + _ => false, + } + } +} diff --git a/transaction-pool/src/lib.rs b/transaction-pool/src/lib.rs new file mode 100644 index 000000000..047178dab --- /dev/null +++ b/transaction-pool/src/lib.rs @@ -0,0 +1,118 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Generic Transaction Pool +//! +//! An extensible and performant implementation of Ethereum Transaction Pool. +//! The pool stores ordered, verified transactions according to some pluggable +//! `Scoring` implementation. +//! The pool also allows you to construct a set of `pending` transactions according +//! to some notion of `Readiness` (pluggable). +//! +//! The pool is generic over transactions and should make no assumptions about them. +//! The only thing we can rely on is the `Scoring` that defines: +//! - the ordering of transactions from a single sender +//! - the priority of the transaction compared to other transactions from different senders +//! +//! NOTE: the transactions from a single sender are not ordered by priority, +//! but still when constructing pending set we always need to maintain the ordering +//! (i.e. `txs[1]` always needs to be included after `txs[0]` even if it has higher priority) +//! +//! ### Design Details +//! +//! Performance assumptions: +//! - Possibility to handle tens of thousands of transactions +//! - Fast insertions and replacements `O(per-sender + log(senders))` +//! - Reasonably fast removal of stalled transactions `O(per-sender)` +//! - Reasonably fast construction of pending set `O(txs * (log(senders) + log(per-sender))` +//! +//! The removal performance could be improved by trading some memory. Currently `SmallVec` is used +//! to store senders transactions, instead we could use `VecDeque` and efficiently `pop_front` +//! the best transactions. +//! +//! The pending set construction and insertion complexity could be reduced by introducing +//! a notion of `nonce` - an absolute, numeric ordering of transactions. +//! We don't do that because of possible implications of EIP208 where nonce might not be +//! explicitly available. +//! +//! 1. The pool groups transactions from particular sender together +//! and stores them ordered by `Scoring` within that group +//! i.e. `HashMap>`. +//! 2. Additionaly we maintain the best and the worst transaction from each sender +//! (by `Scoring` not `priority`) ordered by `priority`. +//! It means that we can easily identify the best transaction inside the entire pool +//! and the worst transaction. +//! 3. Whenever new transaction is inserted to the queue: +//! - first check all the limits (overall, memory, per-sender) +//! - retrieve all transactions from a sender +//! - binary search for position to insert the transaction +//! - decide if we are replacing existing transaction (3 outcomes: drop, replace, insert) +//! - update best and worst transaction from that sender if affected +//! 4. Pending List construction: +//! - Take the best transaction (by priority) from all senders to the List +//! - Replace the transaction with next transaction (by ordering) from that sender (if any) +//! - Repeat + +#![warn(missing_docs)] + +extern crate smallvec; +extern crate ethcore_bigint as bigint; + +#[macro_use] +extern crate error_chain; +#[macro_use] +extern crate log; + +#[cfg(test)] +mod tests; + +mod error; +mod listener; +mod options; +mod pool; +mod ready; +mod status; +mod transactions; +mod verifier; + +pub mod scoring; + +pub use self::listener::{Listener, NoopListener}; +pub use self::options::Options; +pub use self::pool::Pool; +pub use self::ready::{Ready, Readiness}; +pub use self::scoring::Scoring; +pub use self::status::{LightStatus, Status}; +pub use self::verifier::Verifier; + +use std::fmt; + +use self::bigint::prelude::{H256, H160 as Address}; + +/// Already verified transaction that can be safely queued. +pub trait VerifiedTransaction: fmt::Debug { + /// Transaction hash + fn hash(&self) -> &H256; + + /// Memory usage + fn mem_usage(&self) -> usize; + + /// Transaction sender + fn sender(&self) -> &Address; + + /// Unique index of insertion (lower = older). + fn insertion_id(&self) -> u64; +} diff --git a/transaction-pool/src/listener.rs b/transaction-pool/src/listener.rs new file mode 100644 index 000000000..36a5d9de2 --- /dev/null +++ b/transaction-pool/src/listener.rs @@ -0,0 +1,48 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::Arc; + +/// Transaction pool listener. +/// +/// Listener is being notified about status of every transaction in the pool. +pub trait Listener { + /// The transaction has been successfuly added to the pool. + /// If second argument is `Some` the transaction has took place of some other transaction + /// which was already in pool. + /// NOTE: You won't be notified about drop of `old` transaction separately. + fn added(&mut self, _tx: &Arc, _old: Option<&Arc>) {} + + /// The transaction was rejected from the pool. + /// It means that it was too cheap to replace any transaction already in the pool. + fn rejected(&mut self, _tx: T) {} + + /// The transaction was dropped from the pool because of a limit. + fn dropped(&mut self, _tx: &Arc) {} + + /// The transaction was marked as invalid by executor. + fn invalid(&mut self, _tx: &Arc) {} + + /// The transaction has been cancelled. + fn cancelled(&mut self, _tx: &Arc) {} + + /// The transaction has been mined. + fn mined(&mut self, _tx: &Arc) {} +} + +/// A no-op implementation of `Listener`. +pub struct NoopListener; +impl Listener for NoopListener {} diff --git a/transaction-pool/src/options.rs b/transaction-pool/src/options.rs new file mode 100644 index 000000000..ddec91286 --- /dev/null +++ b/transaction-pool/src/options.rs @@ -0,0 +1,36 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Transaction Pool options. +#[derive(Debug)] +pub struct Options { + /// Maximal number of transactions in the pool. + pub max_count: usize, + /// Maximal number of transactions from single sender. + pub max_per_sender: usize, + /// Maximal memory usage. + pub max_mem_usage: usize, +} + +impl Default for Options { + fn default() -> Self { + Options { + max_count: 1024, + max_per_sender: 16, + max_mem_usage: 8 * 1024 * 1024, + } + } +} diff --git a/transaction-pool/src/pool.rs b/transaction-pool/src/pool.rs new file mode 100644 index 000000000..1c58da3fe --- /dev/null +++ b/transaction-pool/src/pool.rs @@ -0,0 +1,431 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::Arc; +use std::collections::{HashMap, BTreeSet}; + +use bigint::hash::{H160, H256}; + +use error; +use listener::{Listener, NoopListener}; +use options::Options; +use ready::{Ready, Readiness}; +use scoring::{Scoring, ScoreWithRef}; +use status::{LightStatus, Status}; +use transactions::{AddResult, Transactions}; + +use {VerifiedTransaction}; + +type Sender = H160; + +/// A transaction pool. +#[derive(Debug)] +pub struct Pool, L = NoopListener> { + listener: L, + scoring: S, + options: Options, + mem_usage: usize, + + transactions: HashMap>, + by_hash: HashMap>, + + best_transactions: BTreeSet>, + worst_transactions: BTreeSet>, +} + +impl + Default> Default for Pool { + fn default() -> Self { + Self::with_scoring(S::default(), Options::default()) + } +} + +impl + Default> Pool { + /// Creates a new `Pool` with given options + /// and default `Scoring` and `Listener`. + pub fn with_options(options: Options) -> Self { + Self::with_scoring(S::default(), options) + } +} + +impl> Pool { + /// Creates a new `Pool` with given `Scoring` and options. + pub fn with_scoring(scoring: S, options: Options) -> Self { + Self::new(NoopListener, scoring, options) + } +} + + +const INITIAL_NUMBER_OF_SENDERS: usize = 16; + +impl Pool where + T: VerifiedTransaction, + S: Scoring, + L: Listener, +{ + /// Creates new `Pool` with given `Scoring`, `Listener` and options. + pub fn new(listener: L, scoring: S, options: Options) -> Self { + let transactions = HashMap::with_capacity(INITIAL_NUMBER_OF_SENDERS); + let by_hash = HashMap::with_capacity(options.max_count / 16); + + Pool { + listener, + scoring, + options, + mem_usage: 0, + transactions, + by_hash, + best_transactions: Default::default(), + worst_transactions: Default::default(), + } + + } + + /// Attempts to import new transaction to the pool, returns a `Arc` or an `Error`. + /// + /// NOTE: Since `Ready`ness is separate from the pool it's possible to import stalled transactions. + /// It's the caller responsibility to make sure that's not the case. + /// + /// NOTE: The transaction may push out some other transactions from the pool + /// either because of limits (see `Options`) or because `Scoring` decides that the transaction + /// replaces an existing transaction from that sender. + /// If any limit is reached the transaction with the lowest `Score` is evicted to make room. + /// + /// The `Listener` will be informed on any drops or rejections. + pub fn import(&mut self, mut transaction: T) -> error::Result> { + let mem_usage = transaction.mem_usage(); + + ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(*transaction.hash())); + + { + let remove_worst = |s: &mut Self, transaction| { + match s.remove_worst(&transaction) { + Err(err) => { + s.listener.rejected(transaction); + Err(err) + }, + Ok(removed) => { + s.listener.dropped(&removed); + s.finalize_remove(removed.hash()); + Ok(transaction) + }, + } + }; + + while self.by_hash.len() + 1 > self.options.max_count { + transaction = remove_worst(self, transaction)?; + } + + while self.mem_usage + mem_usage > self.options.max_mem_usage { + transaction = remove_worst(self, transaction)?; + } + } + + let (result, prev_state, current_state) = { + let transactions = self.transactions.entry(*transaction.sender()).or_insert_with(Transactions::default); + // get worst and best transactions for comparison + let prev = transactions.worst_and_best(); + let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender); + let current = transactions.worst_and_best(); + (result, prev, current) + }; + + // update best and worst transactions from this sender (if required) + self.update_senders_worst_and_best(prev_state, current_state); + + match result { + AddResult::Ok(tx) => { + self.listener.added(&tx, None); + self.finalize_insert(&tx, None); + Ok(tx) + }, + AddResult::PushedOut { new, old } | + AddResult::Replaced { new, old } => { + self.listener.added(&new, Some(&old)); + self.finalize_insert(&new, Some(&old)); + Ok(new) + }, + AddResult::TooCheap { new, old } => { + let hash = *new.hash(); + self.listener.rejected(new); + bail!(error::ErrorKind::TooCheapToReplace(*old.hash(), hash)) + }, + AddResult::TooCheapToEnter(new) => { + let hash = *new.hash(); + self.listener.rejected(new); + bail!(error::ErrorKind::TooCheapToEnter(hash)) + } + } + } + + /// Updates state of the pool statistics if the transaction was added to a set. + fn finalize_insert(&mut self, new: &Arc, old: Option<&Arc>) { + self.mem_usage += new.mem_usage(); + self.by_hash.insert(*new.hash(), new.clone()); + + if let Some(old) = old { + self.finalize_remove(old.hash()); + } + } + + /// Updates the pool statistics if transaction was removed. + fn finalize_remove(&mut self, hash: &H256) -> Option> { + self.by_hash.remove(hash).map(|old| { + self.mem_usage -= old.mem_usage(); + old + }) + } + + /// Updates best and worst transactions from a sender. + fn update_senders_worst_and_best( + &mut self, + previous: Option<((S::Score, Arc), (S::Score, Arc))>, + current: Option<((S::Score, Arc), (S::Score, Arc))>, + ) { + let worst_collection = &mut self.worst_transactions; + let best_collection = &mut self.best_transactions; + + let is_same = |a: &(S::Score, Arc), b: &(S::Score, Arc)| { + a.0 == b.0 && a.1.hash() == b.1.hash() + }; + + let update = |collection: &mut BTreeSet<_>, (score, tx), remove| if remove { + collection.remove(&ScoreWithRef::new(score, tx)); + } else { + collection.insert(ScoreWithRef::new(score, tx)); + }; + + match (previous, current) { + (None, Some((worst, best))) => { + update(worst_collection, worst, false); + update(best_collection, best, false); + }, + (Some((worst, best)), None) => { + // all transactions from that sender has been removed. + // We can clear a hashmap entry. + self.transactions.remove(worst.1.sender()); + update(worst_collection, worst, true); + update(best_collection, best, true); + }, + (Some((w1, b1)), Some((w2, b2))) => { + if !is_same(&w1, &w2) { + update(worst_collection, w1, true); + update(worst_collection, w2, false); + } + if !is_same(&b1, &b2) { + update(best_collection, b1, true); + update(best_collection, b2, false); + } + }, + (None, None) => {}, + } + } + + /// Attempts to remove the worst transaction from the pool if it's worse than the given one. + fn remove_worst(&mut self, transaction: &T) -> error::Result> { + let to_remove = match self.worst_transactions.iter().next_back() { + // No elements to remove? and the pool is still full? + None => { + warn!("The pool is full but there are no transactions to remove."); + return Err(error::ErrorKind::TooCheapToEnter(*transaction.hash()).into()); + }, + Some(old) => if self.scoring.should_replace(&old.transaction, transaction) { + // New transaction is better than the worst one so we can replace it. + old.clone() + } else { + // otherwise fail + return Err(error::ErrorKind::TooCheapToEnter(*transaction.hash()).into()) + }, + }; + + // Remove from transaction set + self.remove_from_set(to_remove.transaction.sender(), |set, scoring| { + set.remove(&to_remove.transaction, scoring) + }); + Ok(to_remove.transaction) + } + + /// Removes transaction from sender's transaction `HashMap`. + fn remove_from_set, &S) -> R>(&mut self, sender: &Sender, f: F) -> Option { + let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) { + let prev = set.worst_and_best(); + let result = f(set, &self.scoring); + (prev, set.worst_and_best(), result) + } else { + return None; + }; + + self.update_senders_worst_and_best(prev, next); + Some(result) + } + + /// Clears pool from all transactions. + /// This causes a listener notification that all transactions were dropped. + /// NOTE: the drop-notification order will be arbitrary. + pub fn clear(&mut self) { + self.mem_usage = 0; + self.transactions.clear(); + self.best_transactions.clear(); + self.worst_transactions.clear(); + + for (_hash, tx) in self.by_hash.drain() { + self.listener.dropped(&tx) + } + } + + /// Removes single transaction from the pool. + /// Depending on the `is_invalid` flag the listener + /// will either get a `cancelled` or `invalid` notification. + pub fn remove(&mut self, hash: &H256, is_invalid: bool) -> bool { + if let Some(tx) = self.finalize_remove(hash) { + self.remove_from_set(tx.sender(), |set, scoring| { + set.remove(&tx, scoring) + }); + if is_invalid { + self.listener.invalid(&tx); + } else { + self.listener.cancelled(&tx); + } + true + } else { + false + } + } + + /// Removes all stalled transactions from given sender. + fn remove_stalled>(&mut self, sender: &Sender, ready: &mut R) -> usize { + let removed_from_set = self.remove_from_set(sender, |transactions, scoring| { + transactions.cull(ready, scoring) + }); + + match removed_from_set { + Some(removed) => { + let len = removed.len(); + for tx in removed { + self.finalize_remove(tx.hash()); + self.listener.mined(&tx); + } + len + }, + None => 0, + } + } + + /// Removes all stalled transactions from given sender list (or from all senders). + pub fn cull>(&mut self, senders: Option<&[Sender]>, mut ready: R) -> usize { + let mut removed = 0; + match senders { + Some(senders) => { + for sender in senders { + removed += self.remove_stalled(sender, &mut ready); + } + }, + None => { + let senders = self.transactions.keys().cloned().collect::>(); + for sender in senders { + removed += self.remove_stalled(&sender, &mut ready); + } + }, + } + + removed + } + + /// Returns an iterator of pending (ready) transactions. + pub fn pending>(&self, ready: R) -> PendingIterator { + PendingIterator { + ready, + best_transactions: self.best_transactions.clone(), + pool: self, + } + } + + /// Computes the full status of the pool (including readiness). + pub fn status>(&self, mut ready: R) -> Status { + let mut status = Status::default(); + + for (_sender, transactions) in &self.transactions { + let len = transactions.len(); + for (idx, tx) in transactions.iter().enumerate() { + match ready.is_ready(tx) { + Readiness::Stalled => status.stalled += 1, + Readiness::Ready => status.pending += 1, + Readiness::Future => { + status.future += len - idx; + break; + } + } + } + } + + status + } + + /// Returns light status of the pool. + pub fn light_status(&self) -> LightStatus { + LightStatus { + mem_usage: self.mem_usage, + transaction_count: self.by_hash.len(), + senders: self.transactions.len(), + } + } +} + +/// An iterator over all pending (ready) transactions. +/// NOTE: the transactions are not removed from the queue. +/// You might remove them later by calling `cull`. +pub struct PendingIterator<'a, T, R, S, L> where + T: VerifiedTransaction + 'a, + S: Scoring + 'a, + L: 'a, +{ + ready: R, + best_transactions: BTreeSet>, + pool: &'a Pool, +} + +impl<'a, T, R, S, L> Iterator for PendingIterator<'a, T, R, S, L> where + T: VerifiedTransaction, + R: Ready, + S: Scoring, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + while !self.best_transactions.is_empty() { + let best = { + let best = self.best_transactions.iter().next().expect("current_best is not empty; qed").clone(); + self.best_transactions.take(&best).expect("Just taken from iterator; qed") + }; + + match self.ready.is_ready(&best.transaction) { + Readiness::Ready => { + // retrieve next one from that sender. + let next = self.pool.transactions + .get(best.transaction.sender()) + .and_then(|s| s.find_next(&best.transaction, &self.pool.scoring)); + if let Some((score, tx)) = next { + self.best_transactions.insert(ScoreWithRef::new(score, tx)); + } + + return Some(best.transaction) + }, + state => warn!("[{:?}] Ignoring {:?} transaction.", best.transaction.hash(), state), + } + } + + None + } +} diff --git a/transaction-pool/src/ready.rs b/transaction-pool/src/ready.rs new file mode 100644 index 000000000..1d8e342db --- /dev/null +++ b/transaction-pool/src/ready.rs @@ -0,0 +1,42 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Transaction readiness. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Readiness { + /// The transaction is stalled (and should/will be removed from the pool). + Stalled, + /// The transaction is ready to be included in pending set. + Ready, + /// The transaction is not yet ready. + Future, +} + +/// A readiness indicator. +pub trait Ready { + /// Returns true if transaction is ready to be included in pending block, + /// given all previous transactions that were ready are already included. + /// + /// NOTE: readiness of transactions will be checked according to `Score` ordering, + /// the implementation should maintain a state of already checked transactions. + fn is_ready(&mut self, tx: &T) -> Readiness; +} + +impl Ready for F where F: FnMut(&T) -> Readiness { + fn is_ready(&mut self, tx: &T) -> Readiness { + (*self)(tx) + } +} diff --git a/transaction-pool/src/scoring.rs b/transaction-pool/src/scoring.rs new file mode 100644 index 000000000..e4f923c9b --- /dev/null +++ b/transaction-pool/src/scoring.rs @@ -0,0 +1,145 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! A transactions ordering abstraction. + +use std::{cmp, fmt}; +use std::sync::Arc; + +use {VerifiedTransaction}; + +/// Represents a decision what to do with +/// a new transaction that tries to enter the pool. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Choice { + /// New transaction should be rejected + /// (i.e. the old transaction that occupies the same spot + /// is better). + RejectNew, + /// The old transaction should be dropped + /// in favour of the new one. + ReplaceOld, + /// The new transaction should be inserted + /// and both (old and new) should stay in the pool. + InsertNew, +} + +/// Describes a reason why the `Score` of transactions +/// should be updated. +/// The `Scoring` implementations can use this information +/// to update the `Score` table more efficiently. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Change { + /// New transaction has been inserted at given index. + /// The Score at that index is initialized with default value + /// and needs to be filled in. + InsertedAt(usize), + /// The transaction has been removed at given index and other transactions + /// shifted to it's place. + /// The scores were removed and shifted as well. + /// For simple scoring algorithms no action is required here. + RemovedAt(usize), + /// The transaction at given index has replaced a previous transaction. + /// The score at that index needs to be update (it contains value from previous transaction). + ReplacedAt(usize), + /// Given number of stalled transactions has been culled from the beginning. + /// Usually the score will have to be re-computed from scratch. + Culled(usize), +} + +/// A transaction ordering. +/// +/// The implementation should decide on order of transactions in the pool. +/// Each transaction should also get assigned a `Score` which is used to later +/// prioritize transactions in the pending set. +/// +/// Implementation notes: +/// - Returned `Score`s should match ordering of `compare` method. +/// - `compare` will be called only within a context of transactions from the same sender. +/// - `choose` will be called only if `compare` returns `Ordering::Equal` +/// - `should_replace` is used to decide if new transaction should push out an old transaction already in the queue. +/// - `Score`s and `compare` should align with `Ready` implementation. +/// +/// Example: Natural ordering of Ethereum transactions. +/// - `compare`: compares transaction `nonce` () +/// - `choose`: compares transactions `gasPrice` (decides if old transaction should be replaced) +/// - `update_scores`: score defined as `gasPrice` if `n==0` and `max(scores[n-1], gasPrice)` if `n>0` +/// - `should_replace`: compares `gasPrice` (decides if transaction from a different sender is more valuable) +/// +pub trait Scoring { + /// A score of a transaction. + type Score: cmp::Ord + Clone + Default + fmt::Debug; + + /// Decides on ordering of `T`s from a particular sender. + fn compare(&self, old: &T, other: &T) -> cmp::Ordering; + + /// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue. + fn choose(&self, old: &T, new: &T) -> Choice; + + /// Updates the transaction scores given a list of transactions and a change to previous scoring. + /// NOTE: you can safely assume that both slices have the same length. + /// (i.e. score at index `i` represents transaction at the same index) + fn update_scores(&self, txs: &[Arc], scores: &mut [Self::Score], change: Change); + + /// Decides if `new` should push out `old` transaction from the pool. + fn should_replace(&self, old: &T, new: &T) -> bool; +} + +/// A score with a reference to the transaction. +#[derive(Debug)] +pub struct ScoreWithRef { + /// Score + pub score: S, + /// Shared transaction + pub transaction: Arc, +} + +impl Clone for ScoreWithRef { + fn clone(&self) -> Self { + ScoreWithRef { + score: self.score.clone(), + transaction: self.transaction.clone(), + } + } +} + +impl ScoreWithRef { + /// Creates a new `ScoreWithRef` + pub fn new(score: S, transaction: Arc) -> Self { + ScoreWithRef { score, transaction } + } +} + +impl Ord for ScoreWithRef { + fn cmp(&self, other: &Self) -> cmp::Ordering { + other.score.cmp(&self.score) + .then(other.transaction.insertion_id().cmp(&self.transaction.insertion_id())) + } +} + +impl PartialOrd for ScoreWithRef { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for ScoreWithRef { + fn eq(&self, other: &Self) -> bool { + self.score == other.score && self.transaction.insertion_id() == other.transaction.insertion_id() + } +} + +impl Eq for ScoreWithRef {} diff --git a/transaction-pool/src/status.rs b/transaction-pool/src/status.rs new file mode 100644 index 000000000..5862f75a1 --- /dev/null +++ b/transaction-pool/src/status.rs @@ -0,0 +1,40 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Light pool status. +/// This status is cheap to compute and can be called frequently. +#[derive(Default, Debug, PartialEq, Eq)] +pub struct LightStatus { + /// Memory usage in bytes. + pub mem_usage: usize, + /// Total number of transactions in the pool. + pub transaction_count: usize, + /// Number of unique senders in the pool. + pub senders: usize, +} + +/// A full queue status. +/// To compute this status it is required to provide `Ready`. +/// NOTE: To compute the status we need to visit each transaction in the pool. +#[derive(Default, Debug, PartialEq, Eq)] +pub struct Status { + /// Number of stalled transactions. + pub stalled: usize, + /// Number of pending (ready) transactions. + pub pending: usize, + /// Number of future (not ready) transactions. + pub future: usize, +} diff --git a/transaction-pool/src/tests/helpers.rs b/transaction-pool/src/tests/helpers.rs new file mode 100644 index 000000000..faa9aba3e --- /dev/null +++ b/transaction-pool/src/tests/helpers.rs @@ -0,0 +1,80 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::cmp; +use std::collections::HashMap; + +use {scoring, Scoring, Ready, Readiness, Address as Sender}; +use super::{Transaction, SharedTransaction, U256}; + +#[derive(Default)] +pub struct DummyScoring; + +impl Scoring for DummyScoring { + type Score = U256; + + fn compare(&self, old: &Transaction, new: &Transaction) -> cmp::Ordering { + old.nonce.cmp(&new.nonce) + } + + fn choose(&self, old: &Transaction, new: &Transaction) -> scoring::Choice { + if old.nonce == new.nonce { + if new.gas_price > old.gas_price { + scoring::Choice::ReplaceOld + } else { + scoring::Choice::RejectNew + } + } else { + scoring::Choice::InsertNew + } + } + + fn update_scores(&self, txs: &[SharedTransaction], scores: &mut [Self::Score], _change: scoring::Change) { + for i in 0..txs.len() { + scores[i] = txs[i].gas_price; + } + } + + fn should_replace(&self, old: &Transaction, new: &Transaction) -> bool { + new.gas_price > old.gas_price + } +} + +#[derive(Default)] +pub struct NonceReady(HashMap, U256); + +impl NonceReady { + pub fn new>(min: T) -> Self { + let mut n = NonceReady::default(); + n.1 = min.into(); + n + } +} + +impl Ready for NonceReady { + fn is_ready(&mut self, tx: &Transaction) -> Readiness { + let min = self.1; + let nonce = self.0.entry(tx.sender).or_insert_with(|| min); + match tx.nonce.cmp(nonce) { + cmp::Ordering::Greater => Readiness::Future, + cmp::Ordering::Equal => { + *nonce = *nonce + 1.into(); + Readiness::Ready + }, + cmp::Ordering::Less => Readiness::Stalled, + } + } +} diff --git a/transaction-pool/src/tests/mod.rs b/transaction-pool/src/tests/mod.rs new file mode 100644 index 000000000..8dfb0e149 --- /dev/null +++ b/transaction-pool/src/tests/mod.rs @@ -0,0 +1,506 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod helpers; +mod tx_builder; + +use self::helpers::{DummyScoring, NonceReady}; +use self::tx_builder::TransactionBuilder; + +use std::sync::Arc; + +use bigint::prelude::{H256, U256, H160 as Address}; +use super::*; + +#[derive(Debug, PartialEq)] +pub struct Transaction { + pub hash: H256, + pub nonce: U256, + pub gas_price: U256, + pub gas: U256, + pub sender: Address, + pub insertion_id: u64, + pub mem_usage: usize, +} + +impl VerifiedTransaction for Transaction { + fn hash(&self) -> &H256 { &self.hash } + fn mem_usage(&self) -> usize { self.mem_usage } + fn sender(&self) -> &Address { &self.sender } + fn insertion_id(&self) -> u64 { self.insertion_id } +} + +pub type SharedTransaction = Arc; + +type TestPool = Pool; + +#[test] +fn should_clear_queue() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + assert_eq!(txq.light_status(), LightStatus { + mem_usage: 0, + transaction_count: 0, + senders: 0, + }); + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(1).mem_usage(1).new(); + + // add + txq.import(tx1).unwrap(); + txq.import(tx2).unwrap(); + assert_eq!(txq.light_status(), LightStatus { + mem_usage: 1, + transaction_count: 2, + senders: 1, + }); + + // when + txq.clear(); + + // then + assert_eq!(txq.light_status(), LightStatus { + mem_usage: 0, + transaction_count: 0, + senders: 0, + }); +} + +#[test] +fn should_not_allow_same_transaction_twice() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(0).new(); + + // when + txq.import(tx1).unwrap(); + txq.import(tx2).unwrap_err(); + + // then + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_replace_transaction() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + let tx1 = b.tx().nonce(0).gas_price(1).new(); + let tx2 = b.tx().nonce(0).gas_price(2).new(); + + // when + txq.import(tx1).unwrap(); + txq.import(tx2).unwrap(); + + // then + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_reject_if_above_count() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_options(Options { + max_count: 1, + ..Default::default() + }); + + // Reject second + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(1).new(); + let hash = *tx2.hash(); + txq.import(tx1).unwrap(); + assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash)); + assert_eq!(txq.light_status().transaction_count, 1); + + txq.clear(); + + // Replace first + let tx1 = b.tx().nonce(0).new(); + let tx2 = b.tx().nonce(0).sender(1).gas_price(2).new(); + txq.import(tx1).unwrap(); + txq.import(tx2).unwrap(); + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_reject_if_above_mem_usage() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_options(Options { + max_mem_usage: 1, + ..Default::default() + }); + + // Reject second + let tx1 = b.tx().nonce(1).mem_usage(1).new(); + let tx2 = b.tx().nonce(2).mem_usage(2).new(); + let hash = *tx2.hash(); + txq.import(tx1).unwrap(); + assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash)); + assert_eq!(txq.light_status().transaction_count, 1); + + txq.clear(); + + // Replace first + let tx1 = b.tx().nonce(1).mem_usage(1).new(); + let tx2 = b.tx().nonce(1).sender(1).gas_price(2).mem_usage(1).new(); + txq.import(tx1).unwrap(); + txq.import(tx2).unwrap(); + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_reject_if_above_sender_count() { + let b = TransactionBuilder::default(); + let mut txq = TestPool::with_options(Options { + max_per_sender: 1, + ..Default::default() + }); + + // Reject second + let tx1 = b.tx().nonce(1).new(); + let tx2 = b.tx().nonce(2).new(); + let hash = *tx2.hash(); + txq.import(tx1).unwrap(); + assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash)); + assert_eq!(txq.light_status().transaction_count, 1); + + txq.clear(); + + // Replace first + let tx1 = b.tx().nonce(1).new(); + let tx2 = b.tx().nonce(2).gas_price(2).new(); + let hash = *tx2.hash(); + txq.import(tx1).unwrap(); + // This results in error because we also compare nonces + assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash)); + assert_eq!(txq.light_status().transaction_count, 1); +} + +#[test] +fn should_construct_pending() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap(); + let tx2 = txq.import(b.tx().nonce(2).new()).unwrap(); + // this transaction doesn't get to the block despite high gas price + // because of block gas limit and simplistic ordering algorithm. + txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + txq.import(b.tx().nonce(5).new()).unwrap(); + + let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + // gap + txq.import(b.tx().sender(1).nonce(5).new()).unwrap(); + + let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!(txq.status(NonceReady::default()), Status { + stalled: 0, + pending: 9, + future: 2, + }); + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 3, + pending: 6, + future: 2, + }); + + // when + let mut current_gas = U256::zero(); + let limit = (21_000 * 8).into(); + let mut pending = txq.pending(NonceReady::default()).take_while(|tx| { + let should_take = tx.gas + current_gas <= limit; + if should_take { + current_gas = current_gas + tx.gas + } + should_take + }); + + assert_eq!(pending.next(), Some(tx0)); + assert_eq!(pending.next(), Some(tx1)); + assert_eq!(pending.next(), Some(tx9)); + assert_eq!(pending.next(), Some(tx5)); + assert_eq!(pending.next(), Some(tx6)); + assert_eq!(pending.next(), Some(tx7)); + assert_eq!(pending.next(), Some(tx8)); + assert_eq!(pending.next(), Some(tx2)); + assert_eq!(pending.next(), None); +} + +#[test] +fn should_remove_transaction() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx1 = txq.import(b.tx().nonce(0).new()).unwrap(); + let tx2 = txq.import(b.tx().nonce(1).new()).unwrap(); + txq.import(b.tx().nonce(2).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 3); + + // when + assert!(txq.remove(&tx2.hash(), false)); + + // then + assert_eq!(txq.light_status().transaction_count, 2); + let mut pending = txq.pending(NonceReady::default()); + assert_eq!(pending.next(), Some(tx1)); + assert_eq!(pending.next(), None); +} + +#[test] +fn should_cull_stalled_transactions() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + txq.import(b.tx().nonce(1).new()).unwrap(); + txq.import(b.tx().nonce(3).new()).unwrap(); + + txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(5).new()).unwrap(); + + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 2, + pending: 2, + future: 2, + }); + + // when + assert_eq!(txq.cull(None, NonceReady::new(1)), 2); + + // then + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 0, + pending: 2, + future: 2, + }); + assert_eq!(txq.light_status(), LightStatus { + transaction_count: 4, + senders: 2, + mem_usage: 0, + }); +} + +#[test] +fn should_cull_stalled_transactions_from_a_sender() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + txq.import(b.tx().nonce(1).new()).unwrap(); + + txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(2).new()).unwrap(); + + assert_eq!(txq.status(NonceReady::new(2)), Status { + stalled: 4, + pending: 1, + future: 0, + }); + + // when + let sender = 0.into(); + assert_eq!(txq.cull(Some(&[sender]), NonceReady::new(2)), 2); + + // then + assert_eq!(txq.status(NonceReady::new(2)), Status { + stalled: 2, + pending: 1, + future: 0, + }); + assert_eq!(txq.light_status(), LightStatus { + transaction_count: 3, + senders: 1, + mem_usage: 0, + }); +} + +#[test] +fn should_re_insert_after_cull() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + txq.import(b.tx().nonce(1).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 2, + pending: 2, + future: 0, + }); + + // when + assert_eq!(txq.cull(None, NonceReady::new(1)), 2); + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 0, + pending: 2, + future: 0, + }); + txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 2, + pending: 2, + future: 0, + }); +} + +mod listener { + use std::cell::RefCell; + use std::rc::Rc; + + use super::*; + + #[derive(Default)] + struct MyListener(pub Rc>>); + + impl Listener for MyListener { + fn added(&mut self, _tx: &SharedTransaction, old: Option<&SharedTransaction>) { + self.0.borrow_mut().push(if old.is_some() { "replaced" } else { "added" }); + } + + fn rejected(&mut self, _tx: Transaction) { + self.0.borrow_mut().push("rejected".into()); + } + + fn dropped(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("dropped".into()); + } + + fn invalid(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("invalid".into()); + } + + fn cancelled(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("cancelled".into()); + } + + fn mined(&mut self, _tx: &SharedTransaction) { + self.0.borrow_mut().push("mined".into()); + } + } + + #[test] + fn insert_transaction() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring, Options { + max_per_sender: 1, + max_count: 2, + ..Default::default() + }); + assert!(results.borrow().is_empty()); + + // Regular import + txq.import(b.tx().nonce(1).new()).unwrap(); + assert_eq!(*results.borrow(), &["added"]); + // Already present (no notification) + txq.import(b.tx().nonce(1).new()).unwrap_err(); + assert_eq!(*results.borrow(), &["added"]); + // Push out the first one + txq.import(b.tx().nonce(1).gas_price(1).new()).unwrap(); + assert_eq!(*results.borrow(), &["added", "replaced"]); + // Reject + txq.import(b.tx().nonce(1).new()).unwrap_err(); + assert_eq!(*results.borrow(), &["added", "replaced", "rejected"]); + results.borrow_mut().clear(); + // Different sender (accept) + txq.import(b.tx().sender(1).nonce(1).gas_price(2).new()).unwrap(); + assert_eq!(*results.borrow(), &["added"]); + // Third sender push out low gas price + txq.import(b.tx().sender(2).nonce(1).gas_price(4).new()).unwrap(); + assert_eq!(*results.borrow(), &["added", "dropped", "added"]); + // Reject (too cheap) + txq.import(b.tx().sender(2).nonce(1).gas_price(2).new()).unwrap_err(); + assert_eq!(*results.borrow(), &["added", "dropped", "added", "rejected"]); + + assert_eq!(txq.light_status().transaction_count, 2); + } + + #[test] + fn remove_transaction() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring, Options::default()); + + // insert + let tx1 = txq.import(b.tx().nonce(1).new()).unwrap(); + let tx2 = txq.import(b.tx().nonce(2).new()).unwrap(); + + // then + txq.remove(&tx1.hash(), false); + assert_eq!(*results.borrow(), &["added", "added", "cancelled"]); + txq.remove(&tx2.hash(), true); + assert_eq!(*results.borrow(), &["added", "added", "cancelled", "invalid"]); + assert_eq!(txq.light_status().transaction_count, 0); + } + + #[test] + fn clear_queue() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring, Options::default()); + + // insert + txq.import(b.tx().nonce(1).new()).unwrap(); + txq.import(b.tx().nonce(2).new()).unwrap(); + + // when + txq.clear(); + + // then + assert_eq!(*results.borrow(), &["added", "added", "dropped", "dropped"]); + } + + #[test] + fn cull_stalled() { + let b = TransactionBuilder::default(); + let listener = MyListener::default(); + let results = listener.0.clone(); + let mut txq = Pool::new(listener, DummyScoring, Options::default()); + + // insert + txq.import(b.tx().nonce(1).new()).unwrap(); + txq.import(b.tx().nonce(2).new()).unwrap(); + + // when + txq.cull(None, NonceReady::new(3)); + + // then + assert_eq!(*results.borrow(), &["added", "added", "mined", "mined"]); + } +} + diff --git a/transaction-pool/src/tests/tx_builder.rs b/transaction-pool/src/tests/tx_builder.rs new file mode 100644 index 000000000..cd50a4fd0 --- /dev/null +++ b/transaction-pool/src/tests/tx_builder.rs @@ -0,0 +1,74 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::rc::Rc; +use std::cell::Cell; + +use super::{Transaction, U256, Address}; + +#[derive(Debug, Default, Clone)] +pub struct TransactionBuilder { + nonce: U256, + gas_price: U256, + gas: U256, + sender: Address, + mem_usage: usize, + insertion_id: Rc>, +} + +impl TransactionBuilder { + pub fn tx(&self) -> Self { + self.clone() + } + + pub fn nonce>(mut self, nonce: T) -> Self { + self.nonce = nonce.into(); + self + } + + pub fn gas_price>(mut self, gas_price: T) -> Self { + self.gas_price = gas_price.into(); + self + } + + pub fn sender>(mut self, sender: T) -> Self { + self.sender = sender.into(); + self + } + + pub fn mem_usage(mut self, mem_usage: usize) -> Self { + self.mem_usage = mem_usage; + self + } + + pub fn new(self) -> Transaction { + let insertion_id = { + let id = self.insertion_id.get() + 1; + self.insertion_id.set(id); + id + }; + let hash = self.nonce ^ (U256::from(100) * self.gas_price) ^ (U256::from(100_000) * self.sender.low_u64().into()); + Transaction { + hash: hash.into(), + nonce: self.nonce, + gas_price: self.gas_price, + gas: 21_000.into(), + sender: self.sender, + insertion_id, + mem_usage: self.mem_usage, + } + } +} diff --git a/transaction-pool/src/transactions.rs b/transaction-pool/src/transactions.rs new file mode 100644 index 000000000..39fd08e93 --- /dev/null +++ b/transaction-pool/src/transactions.rs @@ -0,0 +1,216 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::{fmt, mem}; +use std::sync::Arc; + +use smallvec::SmallVec; + +use ready::{Ready, Readiness}; +use scoring::{self, Scoring}; + +#[derive(Debug)] +pub enum AddResult { + Ok(Arc), + TooCheapToEnter(T), + TooCheap { + old: Arc, + new: T, + }, + Replaced { + old: Arc, + new: Arc, + }, + PushedOut { + old: Arc, + new: Arc, + }, +} + +/// Represents all transactions from a particular sender ordered by nonce. +const PER_SENDER: usize = 8; +#[derive(Debug)] +pub struct Transactions> { + // TODO [ToDr] Consider using something that doesn't require shifting all records. + transactions: SmallVec<[Arc; PER_SENDER]>, + scores: SmallVec<[S::Score; PER_SENDER]>, +} + +impl> Default for Transactions { + fn default() -> Self { + Transactions { + transactions: Default::default(), + scores: Default::default(), + } + } +} + +impl> Transactions { + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } + + pub fn len(&self) -> usize { + self.transactions.len() + } + + pub fn iter(&self) -> ::std::slice::Iter> { + self.transactions.iter() + } + + pub fn worst_and_best(&self) -> Option<((S::Score, Arc), (S::Score, Arc))> { + let len = self.scores.len(); + self.scores.get(0).cloned().map(|best| { + let worst = self.scores[len - 1].clone(); + let best_tx = self.transactions[0].clone(); + let worst_tx = self.transactions[len - 1].clone(); + + ((worst, worst_tx), (best, best_tx)) + }) + } + + pub fn find_next(&self, tx: &T, scoring: &S) -> Option<(S::Score, Arc)> { + self.transactions.binary_search_by(|old| scoring.compare(old, &tx)).ok().and_then(|index| { + let index = index + 1; + if index < self.scores.len() { + Some((self.scores[index].clone(), self.transactions[index].clone())) + } else { + None + } + }) + } + + fn push_cheapest_transaction(&mut self, tx: T, scoring: &S, max_count: usize) -> AddResult { + let index = self.transactions.len(); + if index == max_count { + AddResult::TooCheapToEnter(tx) + } else { + let shared = Arc::new(tx); + self.transactions.push(shared.clone()); + self.scores.push(Default::default()); + scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::InsertedAt(index)); + + AddResult::Ok(shared) + } + } + + pub fn add(&mut self, tx: T, scoring: &S, max_count: usize) -> AddResult { + let index = match self.transactions.binary_search_by(|old| scoring.compare(old, &tx)) { + Ok(index) => index, + Err(index) => index, + }; + + // Insert at the end. + if index == self.transactions.len() { + return self.push_cheapest_transaction(tx, scoring, max_count) + } + + // Decide if the transaction should replace some other. + match scoring.choose(&self.transactions[index], &tx) { + // New transaction should be rejected + scoring::Choice::RejectNew => AddResult::TooCheap { + old: self.transactions[index].clone(), + new: tx, + }, + // New transaction should be kept along with old ones. + scoring::Choice::InsertNew => { + let new = Arc::new(tx); + + self.transactions.insert(index, new.clone()); + self.scores.insert(index, Default::default()); + scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::InsertedAt(index)); + + if self.transactions.len() > max_count { + let old = self.transactions.pop().expect("len is non-zero"); + self.scores.pop(); + scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::RemovedAt(self.transactions.len())); + + AddResult::PushedOut { + old, + new, + } + } else { + AddResult::Ok(new) + } + }, + // New transaction is replacing some other transaction already in the queue. + scoring::Choice::ReplaceOld => { + let new = Arc::new(tx); + let old = mem::replace(&mut self.transactions[index], new.clone()); + scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::ReplacedAt(index)); + + AddResult::Replaced { + old, + new, + } + }, + } + } + + pub fn remove(&mut self, tx: &T, scoring: &S) -> bool { + let index = match self.transactions.binary_search_by(|old| scoring.compare(old, tx)) { + Ok(index) => index, + Err(_) => { + warn!("Attempting to remove non-existent transaction {:?}", tx); + return false; + }, + }; + + self.transactions.remove(index); + self.scores.remove(index); + // Update scoring + scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::RemovedAt(index)); + return true; + } + + pub fn cull>(&mut self, ready: &mut R, scoring: &S) -> SmallVec<[Arc; PER_SENDER]> { + let mut result = SmallVec::new(); + if self.is_empty() { + return result; + } + + let mut first_non_stalled = 0; + for tx in &self.transactions { + match ready.is_ready(tx) { + Readiness::Stalled => { + first_non_stalled += 1; + }, + Readiness::Ready | Readiness::Future => break, + } + } + + // reverse the vectors to easily remove first elements. + self.transactions.reverse(); + self.scores.reverse(); + + for _ in 0..first_non_stalled { + self.scores.pop(); + result.push( + self.transactions.pop().expect("first_non_stalled is never greater than transactions.len(); qed") + ); + } + + self.transactions.reverse(); + self.scores.reverse(); + + // update scoring + scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::Culled(result.len())); + + // reverse the result to maintain correct order. + result.reverse(); + result + } +} diff --git a/transaction-pool/src/verifier.rs b/transaction-pool/src/verifier.rs new file mode 100644 index 000000000..e55a17e91 --- /dev/null +++ b/transaction-pool/src/verifier.rs @@ -0,0 +1,31 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use {VerifiedTransaction}; + +/// Transaction verification. +/// +/// Verifier is responsible to decide if the transaction should even be considered for pool inclusion. +pub trait Verifier { + /// Verification error. + type Error; + + /// Verified transaction. + type VerifiedTransaction: VerifiedTransaction; + + /// Verifies a `UnverifiedTransaction` and produces `VerifiedTransaction` instance. + fn verify_transaction(&self, tx: U) -> Result; +}