New Transaction Queue implementation (#8074)

* Implementation of Verifier, Scoring and Ready.

* Queue in progress.

* TransactionPool.

* Prepare for txpool release.

* Miner refactor [WiP]

* WiP reworking miner.

* Make it compile.

* Add some docs.

* Split blockchain access to a separate file.

* Work on miner API.

* Fix ethcore tests.

* Refactor miner interface for sealing/work packages.

* Implement next nonce.

* RPC compiles.

* Implement couple of missing methdods for RPC.

* Add transaction queue listeners.

* Compiles!

* Clean-up and parallelize.

* Get rid of RefCell in header.

* Revert "Get rid of RefCell in header."

This reverts commit 0f2424c9b7319a786e1565ea2a8a6d801a21b4fb.

* Override Sync requirement.

* Fix status display.

* Unify logging.

* Extract some cheap checks.

* Measurements and optimizations.

* Fix scoring bug, heap size of bug and add cache

* Disable tx queueing and parallel verification.

* Make ethcore and ethcore-miner compile again.

* Make RPC compile again.

* Bunch of txpool tests.

* Migrate transaction queue tests.

* Nonce Cap

* Nonce cap cache and tests.

* Remove stale future transactions from the queue.

* Optimize scoring and write some tests.

* Simple penalization.

* Clean up and support for different scoring algorithms.

* Add CLI parameters for the new queue.

* Remove banning queue.

* Disable debug build.

* Change per_sender limit to be 1% instead of 5%

* Avoid cloning when propagating transactions.

* Remove old todo.

* Post-review fixes.

* Fix miner options default.

* Implement back ready transactions for light client.

* Get rid of from_pending_block

* Pass rejection reason.

* Add more details to drop.

* Rollback heap size of.

* Avoid cloning hashes when propagating and include more details on rejection.

* Fix tests.

* Introduce nonces cache.

* Remove uneccessary hashes allocation.

* Lower the mem limit.

* Re-enable parallel verification.

* Add miner log. Don't check the type if not below min_gas_price.

* Add more traces, fix disabling miner.

* Fix creating pending blocks twice on AuRa authorities.

* Fix tests.

* re-use pending blocks in AuRa

* Use reseal_min_period to prevent too frequent update_sealing.

* Fix log to contain hash not sender.

* Optimize local transactions.

* Fix aura tests.

* Update locks comments.

* Get rid of unsafe Sync impl.

* Review fixes.

* Remove excessive matches.

* Fix compilation errors.

* Use new pool in private transactions.

* Fix private-tx test.

* Fix secret store tests.

* Actually use gas_floor_target

* Fix config tests.

* Fix pool tests.

* Address grumbles.
This commit is contained in:
Tomasz Drwięga
2018-04-13 17:34:27 +02:00
committed by Marek Kotewicz
parent 03b96a7c0a
commit 1cd93e4ceb
105 changed files with 5185 additions and 5784 deletions

71
miner/src/pool/client.rs Normal file
View File

@@ -0,0 +1,71 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Pool state client.
//!
//! `Client` encapsulates all external data required for the verifaction and readiness.
//! It includes any Ethereum state parts required for checking the transaction and
//! any consensus-required structure of the transaction.
use std::fmt;
use ethereum_types::{U256, H256, H160 as Address};
use transaction;
/// Account Details
#[derive(Debug, Clone)]
pub struct AccountDetails {
/// Current account nonce
pub nonce: U256,
/// Current account balance
pub balance: U256,
/// Is this account a local account?
pub is_local: bool,
}
/// Transaction type
#[derive(Debug, PartialEq)]
pub enum TransactionType {
/// Regular transaction
Regular,
/// Service transaction (allowed by a contract to have gas_price=0)
Service,
}
/// Verification client.
pub trait Client: fmt::Debug + Sync {
/// Is transaction with given hash already in the blockchain?
fn transaction_already_included(&self, hash: &H256) -> bool;
/// Structurarily verify given transaction.
fn verify_transaction(&self, tx: transaction::UnverifiedTransaction)
-> Result<transaction::SignedTransaction, transaction::Error>;
/// Estimate minimal gas requirurement for given transaction.
fn required_gas(&self, tx: &transaction::Transaction) -> U256;
/// Fetch account details for given sender.
fn account_details(&self, address: &Address) -> AccountDetails;
/// Classify transaction (check if transaction is filtered by some contracts).
fn transaction_type(&self, tx: &transaction::SignedTransaction) -> TransactionType;
}
/// State nonce client
pub trait NonceClient: fmt::Debug + Sync {
/// Fetch only account nonce for given sender.
fn account_nonce(&self, address: &Address) -> U256;
}

161
miner/src/pool/listener.rs Normal file
View File

@@ -0,0 +1,161 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Notifier for new transaction hashes.
use std::fmt;
use std::sync::Arc;
use ethereum_types::H256;
use txpool::{self, VerifiedTransaction};
use pool::VerifiedTransaction as Transaction;
type Listener = Box<Fn(&[H256]) + Send + Sync>;
/// Manages notifications to pending transaction listeners.
#[derive(Default)]
pub struct Notifier {
listeners: Vec<Listener>,
pending: Vec<H256>,
}
impl fmt::Debug for Notifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Notifier")
.field("listeners", &self.listeners.len())
.field("pending", &self.pending)
.finish()
}
}
impl Notifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: Listener) {
self.listeners.push(f)
}
/// Notify listeners about all currently pending transactions.
pub fn notify(&mut self) {
for l in &self.listeners {
(l)(&self.pending);
}
self.pending.clear();
}
}
impl txpool::Listener<Transaction> for Notifier {
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
self.pending.push(*tx.hash());
}
}
/// Transaction pool logger.
#[derive(Default, Debug)]
pub struct Logger;
impl txpool::Listener<Transaction> for Logger {
fn added(&mut self, tx: &Arc<Transaction>, old: Option<&Arc<Transaction>>) {
debug!(target: "txqueue", "[{:?}] Added to the pool.", tx.hash());
debug!(
target: "txqueue",
"[{hash:?}] Sender: {sender}, nonce: {nonce}, gasPrice: {gas_price}, gas: {gas}, value: {value}, dataLen: {data}))",
hash = tx.hash(),
sender = tx.sender(),
nonce = tx.signed().nonce,
gas_price = tx.signed().gas_price,
gas = tx.signed().gas,
value = tx.signed().value,
data = tx.signed().data.len(),
);
if let Some(old) = old {
debug!(target: "txqueue", "[{:?}] Dropped. Replaced by [{:?}]", old.hash(), tx.hash());
}
}
fn rejected(&mut self, _tx: &Arc<Transaction>, reason: &txpool::ErrorKind) {
trace!(target: "txqueue", "Rejected {}.", reason);
}
fn dropped(&mut self, tx: &Arc<Transaction>, new: Option<&Transaction>) {
match new {
Some(new) => debug!(target: "txqueue", "[{:?}] Pushed out by [{:?}]", tx.hash(), new.hash()),
None => debug!(target: "txqueue", "[{:?}] Dropped.", tx.hash()),
}
}
fn invalid(&mut self, tx: &Arc<Transaction>) {
debug!(target: "txqueue", "[{:?}] Marked as invalid by executor.", tx.hash());
}
fn canceled(&mut self, tx: &Arc<Transaction>) {
debug!(target: "txqueue", "[{:?}] Canceled by the user.", tx.hash());
}
fn mined(&mut self, tx: &Arc<Transaction>) {
debug!(target: "txqueue", "[{:?}] Mined.", tx.hash());
}
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use transaction;
use txpool::Listener;
#[test]
fn should_notify_listeners() {
// given
let received = Arc::new(Mutex::new(vec![]));
let r = received.clone();
let listener = Box::new(move |hashes: &[H256]| {
*r.lock() = hashes.iter().map(|x| *x).collect();
});
let mut tx_listener = Notifier::default();
tx_listener.add(listener);
// when
let tx = new_tx();
tx_listener.added(&tx, None);
assert_eq!(*received.lock(), vec![]);
// then
tx_listener.notify();
assert_eq!(
*received.lock(),
vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()]
);
}
fn new_tx() -> Arc<Transaction> {
let signed = transaction::Transaction {
action: transaction::Action::Create,
data: vec![1, 2, 3],
nonce: 5.into(),
gas: 21_000.into(),
gas_price: 5.into(),
value: 0.into(),
}.fake_sign(5.into());
Arc::new(Transaction::from_pending_block_transaction(signed))
}
}

View File

@@ -0,0 +1,273 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Local Transactions List.
use std::sync::Arc;
use ethereum_types::H256;
use linked_hash_map::LinkedHashMap;
use pool::VerifiedTransaction as Transaction;
use txpool::{self, VerifiedTransaction};
/// Status of local transaction.
/// Can indicate that the transaction is currently part of the queue (`Pending/Future`)
/// or gives a reason why the transaction was removed.
#[derive(Debug, PartialEq, Clone)]
pub enum Status {
/// The transaction is currently in the transaction queue.
Pending(Arc<Transaction>),
/// Transaction is already mined.
Mined(Arc<Transaction>),
/// Transaction is dropped because of limit
Dropped(Arc<Transaction>),
/// Replaced because of higher gas price of another transaction.
Replaced {
/// Replaced transaction
old: Arc<Transaction>,
/// Transaction that replaced this one.
new: Arc<Transaction>,
},
/// Transaction was never accepted to the queue.
/// It means that it was too cheap to replace any transaction already in the pool.
Rejected(Arc<Transaction>, String),
/// Transaction is invalid.
Invalid(Arc<Transaction>),
/// Transaction was canceled.
Canceled(Arc<Transaction>),
}
impl Status {
fn is_pending(&self) -> bool {
match *self {
Status::Pending(_) => true,
_ => false,
}
}
}
/// Keeps track of local transactions that are in the queue or were mined/dropped recently.
#[derive(Debug)]
pub struct LocalTransactionsList {
max_old: usize,
transactions: LinkedHashMap<H256, Status>,
pending: usize,
}
impl Default for LocalTransactionsList {
fn default() -> Self {
Self::new(10)
}
}
impl LocalTransactionsList {
/// Create a new list of local transactions.
pub fn new(max_old: usize) -> Self {
LocalTransactionsList {
max_old,
transactions: Default::default(),
pending: 0,
}
}
/// Returns true if the transaction is already in local transactions.
pub fn contains(&self, hash: &H256) -> bool {
self.transactions.contains_key(hash)
}
/// Return a map of all currently stored transactions.
pub fn all_transactions(&self) -> &LinkedHashMap<H256, Status> {
&self.transactions
}
/// Returns true if there are pending local transactions.
pub fn has_pending(&self) -> bool {
self.pending > 0
}
fn clear_old(&mut self) {
let number_of_old = self.transactions.len() - self.pending;
if self.max_old >= number_of_old {
return;
}
let to_remove: Vec<_> = self.transactions
.iter()
.filter(|&(_, status)| !status.is_pending())
.map(|(hash, _)| *hash)
.take(number_of_old - self.max_old)
.collect();
for hash in to_remove {
self.transactions.remove(&hash);
}
}
fn insert(&mut self, hash: H256, status: Status) {
let result = self.transactions.insert(hash, status);
if let Some(old) = result {
if old.is_pending() {
self.pending -= 1;
}
}
}
}
impl txpool::Listener<Transaction> for LocalTransactionsList {
fn added(&mut self, tx: &Arc<Transaction>, old: Option<&Arc<Transaction>>) {
if !tx.priority().is_local() {
return;
}
debug!(target: "own_tx", "Imported to the pool (hash {:?})", tx.hash());
self.clear_old();
self.insert(*tx.hash(), Status::Pending(tx.clone()));
self.pending += 1;
if let Some(old) = old {
if self.transactions.contains_key(old.hash()) {
self.insert(*old.hash(), Status::Replaced {
old: old.clone(),
new: tx.clone(),
});
}
}
}
fn rejected(&mut self, tx: &Arc<Transaction>, reason: &txpool::ErrorKind) {
if !tx.priority().is_local() {
return;
}
debug!(target: "own_tx", "Transaction rejected (hash {:?}). {}", tx.hash(), reason);
self.insert(*tx.hash(), Status::Rejected(tx.clone(), format!("{}", reason)));
self.clear_old();
}
fn dropped(&mut self, tx: &Arc<Transaction>, new: Option<&Transaction>) {
if !tx.priority().is_local() {
return;
}
match new {
Some(new) => warn!(target: "own_tx", "Transaction pushed out because of limit (hash {:?}, replacement: {:?})", tx.hash(), new.hash()),
None => warn!(target: "own_tx", "Transaction dropped because of limit (hash: {:?})", tx.hash()),
}
self.insert(*tx.hash(), Status::Dropped(tx.clone()));
self.clear_old();
}
fn invalid(&mut self, tx: &Arc<Transaction>) {
if !tx.priority().is_local() {
return;
}
warn!(target: "own_tx", "Transaction marked invalid (hash {:?})", tx.hash());
self.insert(*tx.hash(), Status::Invalid(tx.clone()));
self.clear_old();
}
fn canceled(&mut self, tx: &Arc<Transaction>) {
if !tx.priority().is_local() {
return;
}
warn!(target: "own_tx", "Transaction canceled (hash {:?})", tx.hash());
self.insert(*tx.hash(), Status::Canceled(tx.clone()));
self.clear_old();
}
/// The transaction has been mined.
fn mined(&mut self, tx: &Arc<Transaction>) {
if !tx.priority().is_local() {
return;
}
info!(target: "own_tx", "Transaction mined (hash {:?})", tx.hash());
self.insert(*tx.hash(), Status::Mined(tx.clone()));
}
}
#[cfg(test)]
mod tests {
use super::*;
use ethereum_types::U256;
use ethkey::{Random, Generator};
use transaction;
use txpool::Listener;
use pool;
#[test]
fn should_add_transaction_as_pending() {
// given
let mut list = LocalTransactionsList::default();
let tx1 = new_tx(10);
let tx2 = new_tx(20);
// when
list.added(&tx1, None);
list.added(&tx2, None);
// then
assert!(list.contains(tx1.hash()));
assert!(list.contains(tx2.hash()));
let statuses = list.all_transactions().values().cloned().collect::<Vec<Status>>();
assert_eq!(statuses, vec![Status::Pending(tx1), Status::Pending(tx2)]);
}
#[test]
fn should_clear_old_transactions() {
// given
let mut list = LocalTransactionsList::new(1);
let tx1 = new_tx(10);
let tx2 = new_tx(50);
let tx3 = new_tx(51);
list.added(&tx1, None);
list.invalid(&tx1);
list.dropped(&tx2, None);
assert!(!list.contains(tx1.hash()));
assert!(list.contains(tx2.hash()));
assert!(!list.contains(tx3.hash()));
// when
list.added(&tx3, Some(&tx1));
// then
assert!(!list.contains(tx1.hash()));
assert!(list.contains(tx2.hash()));
assert!(list.contains(tx3.hash()));
}
fn new_tx<T: Into<U256>>(nonce: T) -> Arc<Transaction> {
let keypair = Random.generate().unwrap();
let signed = transaction::Transaction {
action: transaction::Action::Create,
value: U256::from(100),
data: Default::default(),
gas: U256::from(10),
gas_price: U256::from(1245),
nonce: nonce.into(),
}.sign(keypair.secret(), None);
let mut tx = Transaction::from_pending_block_transaction(signed);
tx.priority = pool::Priority::Local;
Arc::new(tx)
}
}

135
miner/src/pool/mod.rs Normal file
View File

@@ -0,0 +1,135 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Pool
use ethereum_types::{H256, Address};
use heapsize::HeapSizeOf;
use transaction;
use txpool;
mod listener;
mod queue;
mod ready;
mod scoring;
pub mod client;
pub mod local_transactions;
pub mod verifier;
#[cfg(test)]
mod tests;
pub use self::queue::{TransactionQueue, Status as QueueStatus};
pub use self::txpool::{VerifiedTransaction as PoolVerifiedTransaction, Options};
/// How to prioritize transactions in the pool
///
/// TODO [ToDr] Implement more strategies.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PrioritizationStrategy {
/// Simple gas-price based prioritization.
GasPriceOnly,
}
/// Transaction priority.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) enum Priority {
/// Local transactions (high priority)
///
/// Transactions either from a local account or
/// submitted over local RPC connection via `eth_sendRawTransaction`
Local,
/// Transactions from retracted blocks (medium priority)
///
/// When block becomes non-canonical we re-import the transactions it contains
/// to the queue and boost their priority.
Retracted,
/// Regular transactions received over the network. (no priority boost)
Regular,
}
impl Priority {
fn is_local(&self) -> bool {
match *self {
Priority::Local => true,
_ => false,
}
}
}
/// Verified transaction stored in the pool.
#[derive(Debug, PartialEq, Eq)]
pub struct VerifiedTransaction {
transaction: transaction::PendingTransaction,
// TODO [ToDr] hash and sender should go directly from the transaction
hash: H256,
sender: Address,
priority: Priority,
insertion_id: usize,
}
impl VerifiedTransaction {
/// Create `VerifiedTransaction` directly from `SignedTransaction`.
///
/// This method should be used only:
/// 1. for tests
/// 2. In case we are converting pending block transactions that are already in the queue to match the function signature.
pub fn from_pending_block_transaction(tx: transaction::SignedTransaction) -> Self {
let hash = tx.hash();
let sender = tx.sender();
VerifiedTransaction {
transaction: tx.into(),
hash,
sender,
priority: Priority::Retracted,
insertion_id: 0,
}
}
/// Gets transaction priority.
pub(crate) fn priority(&self) -> Priority {
self.priority
}
/// Gets wrapped `SignedTransaction`
pub fn signed(&self) -> &transaction::SignedTransaction {
&self.transaction
}
/// Gets wrapped `PendingTransaction`
pub fn pending(&self) -> &transaction::PendingTransaction {
&self.transaction
}
}
impl txpool::VerifiedTransaction for VerifiedTransaction {
fn hash(&self) -> &H256 {
&self.hash
}
fn mem_usage(&self) -> usize {
self.transaction.heap_size_of_children()
}
fn sender(&self) -> &Address {
&self.sender
}
fn insertion_id(&self) -> u64 {
self.insertion_id as u64
}
}

445
miner/src/pool/queue.rs Normal file
View File

@@ -0,0 +1,445 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Ethereum Transaction Queue
use std::{cmp, fmt};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::collections::BTreeMap;
use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock;
use rayon::prelude::*;
use transaction;
use txpool::{self, Verifier};
use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy};
use pool::local_transactions::LocalTransactionsList;
type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger));
type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;
/// Max cache time in milliseconds for pending transactions.
///
/// Pending transactions are cached and will only be computed again
/// if last cache has been created earler than `TIMESTAMP_CACHE` ms ago.
/// This timeout applies only if there are local pending transactions
/// since it only affects transaction Condition.
const TIMESTAMP_CACHE: u64 = 1000;
/// Transaction queue status.
#[derive(Debug, Clone, PartialEq)]
pub struct Status {
/// Verifier options.
pub options: verifier::Options,
/// Current status of the transaction pool.
pub status: txpool::LightStatus,
/// Current limits of the transaction pool.
pub limits: txpool::Options,
}
impl fmt::Display for Status {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
writeln!(
fmt,
"Pool: {current}/{max} ({senders} senders; {mem}/{mem_max} kB) [minGasPrice: {gp} Mwei, maxGas: {max_gas}]",
current = self.status.transaction_count,
max = self.limits.max_count,
senders = self.status.senders,
mem = self.status.mem_usage / 1024,
mem_max = self.limits.max_mem_usage / 1024,
gp = self.options.minimal_gas_price / 1_000_000.into(),
max_gas = cmp::min(self.options.block_gas_limit, self.options.tx_gas_limit),
)
}
}
#[derive(Debug)]
struct CachedPending {
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<U256>,
has_local_pending: bool,
pending: Option<Vec<Arc<pool::VerifiedTransaction>>>,
}
impl CachedPending {
/// Creates new `CachedPending` without cached set.
pub fn none() -> Self {
CachedPending {
block_number: 0,
current_timestamp: 0,
has_local_pending: false,
pending: None,
nonce_cap: None,
}
}
/// Remove cached pending set.
pub fn clear(&mut self) {
self.pending = None;
}
/// Returns cached pending set (if any) if it's valid.
pub fn pending(
&self,
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<&U256>,
) -> Option<Vec<Arc<pool::VerifiedTransaction>>> {
// First check if we have anything in cache.
let pending = self.pending.as_ref()?;
if block_number != self.block_number {
return None;
}
// In case we don't have any local pending transactions
// there is no need to invalidate the cache because of timestamp.
// Timestamp only affects local `PendingTransactions` with `Condition::Timestamp`.
if self.has_local_pending && current_timestamp > self.current_timestamp + TIMESTAMP_CACHE {
return None;
}
// It's fine to return limited set even if `nonce_cap` is `None`.
// The worst thing that may happen is that some transactions won't get propagated in current round,
// but they are not really valid in current block anyway. We will propagate them in the next round.
// Also there is no way to have both `Some` with different numbers since it depends on the block number
// and a constant parameter in schedule (`nonce_cap_increment`)
if self.nonce_cap.is_none() && nonce_cap.is_some() {
return None;
}
Some(pending.clone())
}
}
/// Ethereum Transaction Queue
///
/// Responsible for:
/// - verifying incoming transactions
/// - maintaining a pool of verified transactions.
/// - returning an iterator for transactions that are ready to be included in block (pending)
#[derive(Debug)]
pub struct TransactionQueue {
insertion_id: Arc<AtomicUsize>,
pool: RwLock<Pool>,
options: RwLock<verifier::Options>,
cached_pending: RwLock<CachedPending>,
}
impl TransactionQueue {
/// Create new queue with given pool limits and initial verification options.
pub fn new(
limits: txpool::Options,
verification_options: verifier::Options,
strategy: PrioritizationStrategy,
) -> Self {
TransactionQueue {
insertion_id: Default::default(),
pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)),
options: RwLock::new(verification_options),
cached_pending: RwLock::new(CachedPending::none()),
}
}
/// Update verification options
///
/// Some parameters of verification may vary in time (like block gas limit or minimal gas price).
pub fn set_verifier_options(&self, options: verifier::Options) {
*self.options.write() = options;
}
/// Import a set of transactions to the pool.
///
/// Given blockchain and state access (Client)
/// verifies and imports transactions to the pool.
pub fn import<C: client::Client>(
&self,
client: C,
transactions: Vec<verifier::Transaction>,
) -> Vec<Result<(), transaction::Error>> {
// Run verification
let _timer = ::trace_time::PerfTimer::new("queue::verifyAndImport");
let options = self.options.read().clone();
let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());
let results = transactions
.into_par_iter()
.map(|transaction| verifier.verify_transaction(transaction))
.map(|result| result.and_then(|verified| {
self.pool.write().import(verified)
.map(|_imported| ())
.map_err(convert_error)
}))
.collect::<Vec<_>>();
// Notify about imported transactions.
(self.pool.write().listener_mut().1).0.notify();
if results.iter().any(|r| r.is_ok()) {
self.cached_pending.write().clear();
}
results
}
/// Returns all transactions in the queue ordered by priority.
pub fn all_transactions(&self) -> Vec<Arc<pool::VerifiedTransaction>> {
let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready;
self.pool.read().pending(ready).collect()
}
/// Returns current pneding transactions.
///
/// NOTE: This may return a cached version of pending transaction set.
/// Re-computing the pending set is possible with `#collect_pending` method,
/// but be aware that it's a pretty expensive operation.
pub fn pending<C>(
&self,
client: C,
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<U256>,
) -> Vec<Arc<pool::VerifiedTransaction>> where
C: client::NonceClient,
{
if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) {
return pending;
}
// Double check after acquiring write lock
let mut cached_pending = self.cached_pending.write();
if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) {
return pending;
}
let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect());
*cached_pending = CachedPending {
block_number,
current_timestamp,
nonce_cap,
has_local_pending: self.has_local_pending_transactions(),
pending: Some(pending.clone()),
};
pending
}
/// Collect pending transactions.
///
/// NOTE This is re-computing the pending set and it might be expensive to do so.
/// Prefer using cached pending set using `#pending` method.
pub fn collect_pending<C, F, T>(
&self,
client: C,
block_number: u64,
current_timestamp: u64,
nonce_cap: Option<U256>,
collect: F,
) -> T where
C: client::NonceClient,
F: FnOnce(txpool::PendingIterator<
pool::VerifiedTransaction,
(ready::Condition, ready::State<C>),
scoring::NonceAndGasPrice,
Listener,
>) -> T,
{
let pending_readiness = ready::Condition::new(block_number, current_timestamp);
// don't mark any transactions as stale at this point.
let stale_id = None;
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
let ready = (pending_readiness, state_readiness);
collect(self.pool.read().pending(ready))
}
/// Culls all stalled transactions from the pool.
pub fn cull<C: client::NonceClient>(
&self,
client: C,
) {
// We don't care about future transactions, so nonce_cap is not important.
let nonce_cap = None;
// We want to clear stale transactions from the queue as well.
// (Transactions that are occuping the queue for a long time without being included)
let stale_id = {
let current_id = self.insertion_id.load(atomic::Ordering::Relaxed) as u64;
// wait at least for half of the queue to be replaced
let gap = self.pool.read().options().max_count / 2;
// but never less than 100 transactions
let gap = cmp::max(100, gap) as u64;
current_id.checked_sub(gap)
};
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
let removed = self.pool.write().cull(None, state_readiness);
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
}
/// Returns next valid nonce for given sender
/// or `None` if there are no pending transactions from that sender.
pub fn next_nonce<C: client::NonceClient>(
&self,
client: C,
address: &Address,
) -> Option<U256> {
// Do not take nonce_cap into account when determining next nonce.
let nonce_cap = None;
// Also we ignore stale transactions in the queue.
let stale_id = None;
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
self.pool.read().pending_from_sender(state_readiness, address)
.last()
.map(|tx| tx.signed().nonce + 1.into())
}
/// Retrieve a transaction from the pool.
///
/// Given transaction hash looks up that transaction in the pool
/// and returns a shared pointer to it or `None` if it's not present.
pub fn find(
&self,
hash: &H256,
) -> Option<Arc<pool::VerifiedTransaction>> {
self.pool.read().find(hash)
}
/// Remove a set of transactions from the pool.
///
/// Given an iterator of transaction hashes
/// removes them from the pool.
/// That method should be used if invalid transactions are detected
/// or you want to cancel a transaction.
pub fn remove<'a, T: IntoIterator<Item = &'a H256>>(
&self,
hashes: T,
is_invalid: bool,
) -> Vec<Option<Arc<pool::VerifiedTransaction>>> {
let results = {
let mut pool = self.pool.write();
hashes
.into_iter()
.map(|hash| pool.remove(hash, is_invalid))
.collect::<Vec<_>>()
};
if results.iter().any(Option::is_some) {
self.cached_pending.write().clear();
}
results
}
/// Clear the entire pool.
pub fn clear(&self) {
self.pool.write().clear();
}
/// Penalize given senders.
pub fn penalize<'a, T: IntoIterator<Item = &'a Address>>(&self, senders: T) {
let mut pool = self.pool.write();
for sender in senders {
pool.update_scores(sender, ());
}
}
/// Returns gas price of currently the worst transaction in the pool.
pub fn current_worst_gas_price(&self) -> U256 {
match self.pool.read().worst_transaction() {
Some(tx) => tx.signed().gas_price,
None => self.options.read().minimal_gas_price,
}
}
/// Returns a status of the queue.
pub fn status(&self) -> Status {
let pool = self.pool.read();
let status = pool.light_status();
let limits = pool.options();
let options = self.options.read().clone();
Status {
options,
status,
limits,
}
}
/// Check if there are any local transactions in the pool.
///
/// Returns `true` if there are any transactions in the pool
/// that has been marked as local.
///
/// Local transactions are the ones from accounts managed by this node
/// and transactions submitted via local RPC (`eth_sendRawTransaction`)
pub fn has_local_pending_transactions(&self) -> bool {
self.pool.read().listener().0.has_pending()
}
/// Returns status of recently seen local transactions.
pub fn local_transactions(&self) -> BTreeMap<H256, pool::local_transactions::Status> {
self.pool.read().listener().0.all_transactions().iter().map(|(a, b)| (*a, b.clone())).collect()
}
/// Add a callback to be notified about all transactions entering the pool.
pub fn add_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
let mut pool = self.pool.write();
(pool.listener_mut().1).0.add(f);
}
}
fn convert_error(err: txpool::Error) -> transaction::Error {
use self::txpool::ErrorKind;
match *err.kind() {
ErrorKind::AlreadyImported(..) => transaction::Error::AlreadyImported,
ErrorKind::TooCheapToEnter(..) => transaction::Error::LimitReached,
ErrorKind::TooCheapToReplace(..) => transaction::Error::TooCheapToReplace,
ref e => {
warn!(target: "txqueue", "Unknown import error: {:?}", e);
transaction::Error::NotAllowed
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use pool::tests::client::TestClient;
#[test]
fn should_get_pending_transactions() {
let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly);
let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None);
for tx in pending {
assert!(tx.signed().nonce > 0.into());
}
}
}

212
miner/src/pool/ready.rs Normal file
View File

@@ -0,0 +1,212 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Readiness indicator
//!
//! Transaction readiness is responsible for indicating if
//! particular transaction can be included in the block.
//!
//! Regular transactions are ready iff the current state nonce
//! (obtained from `NonceClient`) equals to the transaction nonce.
//!
//! Let's define `S = state nonce`. Transactions are processed
//! in order, so we first include transaction with nonce `S`,
//! but then we are able to include the one with `S + 1` nonce.
//! So bear in mind that transactions can be included in chains
//! and their readiness is dependent on previous transactions from
//! the same sender.
//!
//! There are three possible outcomes:
//! - The transaction is old (stalled; state nonce > transaction nonce)
//! - The transaction is ready (current; state nonce == transaction nonce)
//! - The transaction is not ready yet (future; state nonce < transaction nonce)
//!
//! NOTE The transactions are always checked for readines in order they are stored within the queue.
//! First `Readiness::Future` response also causes all subsequent transactions from the same sender
//! to be marked as `Future`.
use std::cmp;
use std::collections::HashMap;
use ethereum_types::{U256, H160 as Address};
use transaction;
use txpool::{self, VerifiedTransaction as PoolVerifiedTransaction};
use super::client::NonceClient;
use super::VerifiedTransaction;
/// Checks readiness of transactions by comparing the nonce to state nonce.
#[derive(Debug)]
pub struct State<C> {
nonces: HashMap<Address, U256>,
state: C,
max_nonce: Option<U256>,
stale_id: Option<u64>,
}
impl<C> State<C> {
/// Create new State checker, given client interface.
pub fn new(
state: C,
stale_id: Option<u64>,
max_nonce: Option<U256>,
) -> Self {
State {
nonces: Default::default(),
state,
max_nonce,
stale_id,
}
}
}
impl<C: NonceClient> txpool::Ready<VerifiedTransaction> for State<C> {
fn is_ready(&mut self, tx: &VerifiedTransaction) -> txpool::Readiness {
// Check max nonce
match self.max_nonce {
Some(nonce) if tx.transaction.nonce > nonce => {
return txpool::Readiness::Future;
},
_ => {},
}
let sender = tx.sender();
let state = &self.state;
let state_nonce = || state.account_nonce(sender);
let nonce = self.nonces.entry(*sender).or_insert_with(state_nonce);
match tx.transaction.nonce.cmp(nonce) {
// Before marking as future check for stale ids
cmp::Ordering::Greater => match self.stale_id {
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stalled,
_ => txpool::Readiness::Future,
},
cmp::Ordering::Less => txpool::Readiness::Stalled,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
},
}
}
}
/// Checks readines of Pending transactions by comparing it with current time and block number.
#[derive(Debug)]
pub struct Condition {
block_number: u64,
now: u64,
}
impl Condition {
/// Create a new condition checker given current block number and UTC timestamp.
pub fn new(block_number: u64, now: u64) -> Self {
Condition {
block_number,
now,
}
}
}
impl txpool::Ready<VerifiedTransaction> for Condition {
fn is_ready(&mut self, tx: &VerifiedTransaction) -> txpool::Readiness {
match tx.transaction.condition {
Some(transaction::Condition::Number(block)) if block > self.block_number => txpool::Readiness::Future,
Some(transaction::Condition::Timestamp(time)) if time > self.now => txpool::Readiness::Future,
_ => txpool::Readiness::Ready,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use txpool::Ready;
use pool::tests::client::TestClient;
use pool::tests::tx::{Tx, TxExt};
#[test]
fn should_return_correct_state_readiness() {
// given
let (tx1, tx2, tx3) = Tx::default().signed_triple();
let (tx1, tx2, tx3) = (tx1.verified(), tx2.verified(), tx3.verified());
// when
assert_eq!(State::new(TestClient::new(), None, None).is_ready(&tx3), txpool::Readiness::Future);
assert_eq!(State::new(TestClient::new(), None, None).is_ready(&tx2), txpool::Readiness::Future);
let mut ready = State::new(TestClient::new(), None, None);
// then
assert_eq!(ready.is_ready(&tx1), txpool::Readiness::Ready);
assert_eq!(ready.is_ready(&tx2), txpool::Readiness::Ready);
assert_eq!(ready.is_ready(&tx3), txpool::Readiness::Ready);
}
#[test]
fn should_return_future_if_nonce_cap_reached() {
// given
let tx = Tx::default().signed().verified();
// when
let res1 = State::new(TestClient::new(), None, Some(10.into())).is_ready(&tx);
let res2 = State::new(TestClient::new(), None, Some(124.into())).is_ready(&tx);
// then
assert_eq!(res1, txpool::Readiness::Future);
assert_eq!(res2, txpool::Readiness::Ready);
}
#[test]
fn should_return_stale_if_nonce_does_not_match() {
// given
let tx = Tx::default().signed().verified();
// when
let res = State::new(TestClient::new().with_nonce(125), None, None).is_ready(&tx);
// then
assert_eq!(res, txpool::Readiness::Stalled);
}
#[test]
fn should_return_stale_for_old_transactions() {
// given
let (_, tx) = Tx::default().signed_pair().verified();
// when
let res = State::new(TestClient::new(), Some(1), None).is_ready(&tx);
// then
assert_eq!(res, txpool::Readiness::Stalled);
}
#[test]
fn should_check_readiness_of_condition() {
// given
let tx = Tx::default().signed();
let v = |tx: transaction::PendingTransaction| TestClient::new().verify(tx);
let tx1 = v(transaction::PendingTransaction::new(tx.clone(), transaction::Condition::Number(5).into()));
let tx2 = v(transaction::PendingTransaction::new(tx.clone(), transaction::Condition::Timestamp(3).into()));
let tx3 = v(transaction::PendingTransaction::new(tx.clone(), None));
// when/then
assert_eq!(Condition::new(0, 0).is_ready(&tx1), txpool::Readiness::Future);
assert_eq!(Condition::new(0, 0).is_ready(&tx2), txpool::Readiness::Future);
assert_eq!(Condition::new(0, 0).is_ready(&tx3), txpool::Readiness::Ready);
assert_eq!(Condition::new(5, 0).is_ready(&tx1), txpool::Readiness::Ready);
assert_eq!(Condition::new(0, 3).is_ready(&tx2), txpool::Readiness::Ready);
}
}

171
miner/src/pool/scoring.rs Normal file
View File

@@ -0,0 +1,171 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Scoring and Ordering
//!
//! Ethereum transactions from the same sender are ordered by `nonce`.
//! Low nonces need to be included first. If there are two transactions from the same sender
//! and with the same `nonce` only one of them can be included.
//! We choose the one with higher gas price, but also require that gas price increment
//! is high enough to prevent attacking miners by requiring them to reshuffle/reexecute
//! the queue too often.
//!
//! Transactions between senders are prioritized using `gas price`. Higher `gas price`
//! yields more profits for miners. Additionally we prioritize transactions that originate
//! from our local node (own transactions).
use std::cmp;
use std::sync::Arc;
use ethereum_types::U256;
use txpool;
use super::{PrioritizationStrategy, VerifiedTransaction};
/// Transaction with the same (sender, nonce) can be replaced only if
/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT`
const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25%
/// Simple, gas-price based scoring for transactions.
///
/// NOTE: Currently penalization does not apply to new transactions that enter the pool.
/// We might want to store penalization status in some persistent state.
#[derive(Debug)]
pub struct NonceAndGasPrice(pub PrioritizationStrategy);
impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
type Score = U256;
type Event = ();
fn compare(&self, old: &VerifiedTransaction, other: &VerifiedTransaction) -> cmp::Ordering {
old.transaction.nonce.cmp(&other.transaction.nonce)
}
fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> txpool::scoring::Choice {
if old.transaction.nonce != new.transaction.nonce {
return txpool::scoring::Choice::InsertNew
}
let old_gp = old.transaction.gas_price;
let new_gp = new.transaction.gas_price;
let min_required_gp = old_gp + (old_gp >> GAS_PRICE_BUMP_SHIFT);
match min_required_gp.cmp(&new_gp) {
cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew,
_ => txpool::scoring::Choice::ReplaceOld,
}
}
fn update_scores(&self, txs: &[Arc<VerifiedTransaction>], scores: &mut [U256], change: txpool::scoring::Change) {
use self::txpool::scoring::Change;
match change {
Change::Culled(_) => {},
Change::RemovedAt(_) => {}
Change::InsertedAt(i) | Change::ReplacedAt(i) => {
assert!(i < txs.len());
assert!(i < scores.len());
scores[i] = txs[i].transaction.gas_price;
let boost = match txs[i].priority() {
super::Priority::Local => 15,
super::Priority::Retracted => 10,
super::Priority::Regular => 0,
};
scores[i] = scores[i] << boost;
},
// We are only sending an event in case of penalization.
// So just lower the priority of all non-local transactions.
Change::Event(_) => {
for (score, tx) in scores.iter_mut().zip(txs) {
// Never penalize local transactions.
if !tx.priority().is_local() {
*score = *score >> 3;
}
}
},
}
}
fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> bool {
if old.sender == new.sender {
// prefer earliest transaction
if new.transaction.nonce < old.transaction.nonce {
return true
}
}
self.choose(old, new) == txpool::scoring::Choice::ReplaceOld
}
}
#[cfg(test)]
mod tests {
use super::*;
use pool::tests::tx::{Tx, TxExt};
use txpool::Scoring;
#[test]
fn should_calculate_score_correctly() {
// given
let scoring = NonceAndGasPrice(PrioritizationStrategy::GasPriceOnly);
let (tx1, tx2, tx3) = Tx::default().signed_triple();
let transactions = vec![tx1, tx2, tx3].into_iter().enumerate().map(|(i, tx)| {
let mut verified = tx.verified();
verified.priority = match i {
0 => ::pool::Priority::Local,
1 => ::pool::Priority::Retracted,
_ => ::pool::Priority::Regular,
};
Arc::new(verified)
}).collect::<Vec<_>>();
let initial_scores = vec![U256::from(0), 0.into(), 0.into()];
// No update required
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(0));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(1));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(2));
assert_eq!(scores, initial_scores);
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(0));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(1));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(2));
assert_eq!(scores, initial_scores);
// Compute score at given index
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(0));
assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(1));
assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(2));
assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]);
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(0));
assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(1));
assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(2));
assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]);
// Check penalization
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Event(()));
assert_eq!(scores, vec![32768.into(), 128.into(), 0.into()]);
}
}

View File

@@ -0,0 +1,125 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::{U256, H256, Address};
use transaction::{self, Transaction, SignedTransaction, UnverifiedTransaction};
use pool;
use pool::client::AccountDetails;
#[derive(Debug, Clone)]
pub struct TestClient {
account_details: AccountDetails,
gas_required: U256,
is_service_transaction: bool,
local_address: Address,
}
impl Default for TestClient {
fn default() -> Self {
TestClient {
account_details: AccountDetails {
nonce: 123.into(),
balance: 63_100.into(),
is_local: false,
},
gas_required: 21_000.into(),
is_service_transaction: false,
local_address: Default::default(),
}
}
}
impl TestClient {
pub fn new() -> Self {
TestClient::default()
}
pub fn with_balance<T: Into<U256>>(mut self, balance: T) -> Self {
self.account_details.balance = balance.into();
self
}
pub fn with_nonce<T: Into<U256>>(mut self, nonce: T) -> Self {
self.account_details.nonce = nonce.into();
self
}
pub fn with_gas_required<T: Into<U256>>(mut self, gas_required: T) -> Self {
self.gas_required = gas_required.into();
self
}
pub fn with_local(mut self, address: &Address) -> Self {
self.local_address = *address;
self
}
pub fn with_service_transaction(mut self) -> Self {
self.is_service_transaction = true;
self
}
pub fn verify<T: Into<transaction::PendingTransaction>>(&self, tx: T) -> pool::VerifiedTransaction {
let tx = tx.into();
pool::VerifiedTransaction {
hash: tx.hash(),
sender: tx.sender(),
priority: pool::Priority::Regular,
transaction: tx,
insertion_id: 1,
}
}
}
impl pool::client::Client for TestClient {
fn transaction_already_included(&self, _hash: &H256) -> bool {
false
}
fn verify_transaction(&self, tx: UnverifiedTransaction)
-> Result<SignedTransaction, transaction::Error>
{
Ok(SignedTransaction::new(tx)?)
}
fn account_details(&self, address: &Address) -> AccountDetails {
let mut details = self.account_details.clone();
if address == &self.local_address {
details.is_local = true;
}
details
}
fn required_gas(&self, _tx: &Transaction) -> U256 {
self.gas_required
}
fn transaction_type(&self, _tx: &SignedTransaction) -> pool::client::TransactionType {
if self.is_service_transaction {
pool::client::TransactionType::Service
} else {
pool::client::TransactionType::Regular
}
}
}
impl pool::client::NonceClient for TestClient {
fn account_nonce(&self, _address: &Address) -> U256 {
self.account_details.nonce
}
}

757
miner/src/pool/tests/mod.rs Normal file
View File

@@ -0,0 +1,757 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::U256;
use transaction::{self, PendingTransaction};
use txpool;
use pool::{verifier, TransactionQueue, PrioritizationStrategy};
pub mod tx;
pub mod client;
use self::tx::{Tx, TxExt, PairExt};
use self::client::TestClient;
fn new_queue() -> TransactionQueue {
TransactionQueue::new(
txpool::Options {
max_count: 3,
max_per_sender: 3,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
)
}
#[test]
fn should_return_correct_nonces_when_dropped_because_of_limit() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 3,
max_per_sender: 1,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let (tx1, tx2) = Tx::gas_price(2).signed_pair();
let sender = tx1.sender();
let nonce = tx1.nonce;
// when
let result = txq.import(TestClient::new(), vec![tx1, tx2].local());
assert_eq!(result, vec![Ok(()), Err(transaction::Error::LimitReached)]);
assert_eq!(txq.status().status.transaction_count, 1);
// then
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into()));
// when
let tx1 = Tx::gas_price(2).signed();
let tx2 = Tx::gas_price(2).signed();
let tx3 = Tx::gas_price(1).signed();
let tx4 = Tx::gas_price(3).signed();
let res = txq.import(TestClient::new(), vec![tx1, tx2].local());
let res2 = txq.import(TestClient::new(), vec![tx3, tx4].local());
// then
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(res2, vec![Err(transaction::Error::LimitReached), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 3);
// First inserted transacton got dropped because of limit
assert_eq!(txq.next_nonce(TestClient::new(), &sender), None);
}
#[test]
fn should_handle_same_transaction_imported_twice_with_different_state_nonces() {
// given
let txq = new_queue();
let (tx, tx2) = Tx::default().signed_replacement();
let hash = tx2.hash();
let client = TestClient::new().with_nonce(122);
// First insert one transaction to future
let res = txq.import(client.clone(), vec![tx].local());
assert_eq!(res, vec![Ok(())]);
// next_nonce === None -> transaction is in future
assert_eq!(txq.next_nonce(client.clone(), &tx2.sender()), None);
// now import second transaction to current
let res = txq.import(TestClient::new(), vec![tx2.local()]);
// and then there should be only one transaction in current (the one with higher gas_price)
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top[0].hash, hash);
}
#[test]
fn should_move_all_transactions_from_future() {
// given
let txq = new_queue();
let txs = Tx::default().signed_pair();
let (hash, hash2) = txs.hash();
let (tx, tx2) = txs;
let client = TestClient::new().with_nonce(122);
// First insert one transaction to future
let res = txq.import(client.clone(), vec![tx.local()]);
assert_eq!(res, vec![Ok(())]);
// next_nonce === None -> transaction is in future
assert_eq!(txq.next_nonce(client.clone(), &tx2.sender()), None);
// now import second transaction to current
let res = txq.import(client.clone(), vec![tx2.local()]);
// then
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2);
}
#[test]
fn should_drop_transactions_from_senders_without_balance() {
// given
let txq = new_queue();
let tx = Tx::default().signed();
let client = TestClient::new().with_balance(1);
// when
let res = txq.import(client, vec![tx.local()]);
// then
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: U256::from(1),
cost: U256::from(21_100),
})]);
assert_eq!(txq.status().status.transaction_count, 0);
}
#[test]
fn should_not_import_transaction_below_min_gas_price_threshold_if_external() {
// given
let txq = new_queue();
let tx = Tx::default();
txq.set_verifier_options(verifier::Options {
minimal_gas_price: 3.into(),
..Default::default()
});
// when
let res = txq.import(TestClient::new(), vec![tx.signed().unverified()]);
// then
assert_eq!(res, vec![Err(transaction::Error::InsufficientGasPrice {
minimal: U256::from(3),
got: U256::from(1),
})]);
assert_eq!(txq.status().status.transaction_count, 0);
}
#[test]
fn should_import_transaction_below_min_gas_price_threshold_if_local() {
// given
let txq = new_queue();
let tx = Tx::default();
txq.set_verifier_options(verifier::Options {
minimal_gas_price: 3.into(),
..Default::default()
});
// when
let res = txq.import(TestClient::new(), vec![tx.signed().local()]);
// then
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
}
#[test]
fn should_import_txs_from_same_sender() {
// given
let txq = new_queue();
let txs = Tx::default().signed_pair();
let (hash, hash2) = txs.hash();
// when
txq.import(TestClient::new(), txs.local().into_vec());
// then
let top = txq.pending(TestClient::new(), 0 ,0, None);
assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
}
#[test]
fn should_prioritize_local_transactions_within_same_nonce_height() {
// given
let txq = new_queue();
let tx = Tx::default().signed();
// the second one has same nonce but higher `gas_price`
let tx2 = Tx::gas_price(2).signed();
let (hash, hash2) = (tx.hash(), tx2.hash());
let client = TestClient::new().with_local(&tx.sender());
// when
// first insert the one with higher gas price
let res = txq.import(client.clone(), vec![tx.local(), tx2.unverified()]);
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(client, 0, 0, None);
assert_eq!(top[0].hash, hash); // local should be first
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
}
#[test]
fn should_prioritize_reimported_transactions_within_same_nonce_height() {
// given
let txq = new_queue();
let tx = Tx::default().signed();
// the second one has same nonce but higher `gas_price`
let tx2 = Tx::gas_price(2).signed();
let (hash, hash2) = (tx.hash(), tx2.hash());
// when
// first insert local one with higher gas price
// then the one with lower gas price, but from retracted block
let res = txq.import(TestClient::new(), vec![tx2.unverified(), tx.retracted()]);
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top[0].hash, hash); // retracted should be first
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
}
#[test]
fn should_not_prioritize_local_transactions_with_different_nonce_height() {
// given
let txq = new_queue();
let txs = Tx::default().signed_pair();
let (hash, hash2) = txs.hash();
let (tx, tx2) = txs;
// when
let res = txq.import(TestClient::new(), vec![tx.unverified(), tx2.local()]);
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top[0].hash, hash);
assert_eq!(top[1].hash, hash2);
assert_eq!(top.len(), 2);
}
#[test]
fn should_put_transaction_to_futures_if_gap_detected() {
// given
let txq = new_queue();
let (tx, _, tx2) = Tx::default().signed_triple();
let hash = tx.hash();
// when
let res = txq.import(TestClient::new(), vec![tx, tx2].local());
// then
assert_eq!(res, vec![Ok(()), Ok(())]);
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top.len(), 1);
assert_eq!(top[0].hash, hash);
}
#[test]
fn should_handle_min_block() {
// given
let txq = new_queue();
let (tx, tx2) = Tx::default().signed_pair();
// when
let res = txq.import(TestClient::new(), vec![
verifier::Transaction::Local(PendingTransaction::new(tx, transaction::Condition::Number(1).into())),
tx2.local()
]);
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top.len(), 0);
let top = txq.pending(TestClient::new(), 1, 0, None);
assert_eq!(top.len(), 2);
}
#[test]
fn should_correctly_update_futures_when_removing() {
// given
let txq = new_queue();
let txs= Tx::default().signed_pair();
let res = txq.import(TestClient::new().with_nonce(121), txs.local().into_vec());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
// when
txq.cull(TestClient::new().with_nonce(125));
// should remove both transactions since they are stalled
// then
assert_eq!(txq.status().status.transaction_count, 0);
}
#[test]
fn should_move_transactions_if_gap_filled() {
// given
let txq = new_queue();
let (tx, tx1, tx2) = Tx::default().signed_triple();
let res = txq.import(TestClient::new(), vec![tx, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
// when
let res = txq.import(TestClient::new(), vec![tx1.local()]);
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.status().status.transaction_count, 3);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3);
}
#[test]
fn should_remove_transaction() {
// given
let txq = new_queue();
let (tx, _, tx2) = Tx::default().signed_triple();
let res = txq.import(TestClient::default(), vec![tx, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
// when
txq.cull(TestClient::new().with_nonce(124));
assert_eq!(txq.status().status.transaction_count, 1);
assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1);
txq.cull(TestClient::new().with_nonce(126));
// then
assert_eq!(txq.status().status.transaction_count, 0);
}
#[test]
fn should_move_transactions_to_future_if_gap_introduced() {
// given
let txq = new_queue();
let (tx, tx2) = Tx::default().signed_pair();
let hash = tx.hash();
let tx3 = Tx::default().signed();
let res = txq.import(TestClient::new(), vec![tx3, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
let res = txq.import(TestClient::new(), vec![tx].local());
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 3);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3);
// when
txq.remove(vec![&hash], true);
// then
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
}
#[test]
fn should_clear_queue() {
// given
let txq = new_queue();
let txs = Tx::default().signed_pair();
// add
txq.import(TestClient::new(), txs.local().into_vec());
assert_eq!(txq.status().status.transaction_count, 2);
// when
txq.clear();
// then
assert_eq!(txq.status().status.transaction_count, 0);
}
#[test]
fn should_prefer_current_transactions_when_hitting_the_limit() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let (tx, tx2) = Tx::default().signed_pair();
let hash = tx.hash();
let sender = tx.sender();
let res = txq.import(TestClient::new(), vec![tx2.unverified()]);
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
// when
let res = txq.import(TestClient::new(), vec![tx.unverified()]);
// then
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
let top = txq.pending(TestClient::new(), 0, 0, None);
assert_eq!(top.len(), 1);
assert_eq!(top[0].hash, hash);
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into()));
}
#[test]
fn should_drop_transactions_with_old_nonces() {
let txq = new_queue();
let tx = Tx::default().signed();
// when
let res = txq.import(TestClient::new().with_nonce(125), vec![tx.unverified()]);
// then
assert_eq!(res, vec![Err(transaction::Error::Old)]);
assert_eq!(txq.status().status.transaction_count, 0);
}
#[test]
fn should_not_insert_same_transaction_twice() {
// given
let txq = new_queue();
let (_tx1, tx2) = Tx::default().signed_pair();
let res = txq.import(TestClient::new(), vec![tx2.clone().local()]);
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
// when
let res = txq.import(TestClient::new(), vec![tx2.local()]);
// then
assert_eq!(res, vec![Err(transaction::Error::AlreadyImported)]);
assert_eq!(txq.status().status.transaction_count, 1);
}
#[test]
fn should_accept_same_transaction_twice_if_removed() {
// given
let txq = new_queue();
let txs = Tx::default().signed_pair();
let (tx1, _) = txs.clone();
let (hash, _) = txs.hash();
let res = txq.import(TestClient::new(), txs.local().into_vec());
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2);
// when
txq.remove(vec![&hash], true);
assert_eq!(txq.status().status.transaction_count, 1);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0);
let res = txq.import(TestClient::new(), vec![tx1].local());
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2);
}
#[test]
fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() {
// given
let txq = new_queue();
let (tx, tx2) = Tx::gas_price(20).signed_replacement();
let (tx3, tx4) = Tx::gas_price(1).signed_replacement();
let client = TestClient::new().with_balance(1_000_000);
// when
let res = txq.import(client.clone(), vec![tx, tx3].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
let res = txq.import(client.clone(), vec![tx2, tx4].local());
// then
assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 2);
assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20));
assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2));
}
#[test]
fn should_return_none_when_transaction_from_given_address_does_not_exist() {
// given
let txq = new_queue();
// then
assert_eq!(txq.next_nonce(TestClient::new(), &Default::default()), None);
}
#[test]
fn should_return_correct_nonce_when_transactions_from_given_address_exist() {
// given
let txq = new_queue();
let tx = Tx::default().signed();
let from = tx.sender();
let nonce = tx.nonce;
// when
txq.import(TestClient::new(), vec![tx.local()]);
// then
assert_eq!(txq.next_nonce(TestClient::new(), &from), Some(nonce + 1.into()));
}
#[test]
fn should_return_valid_last_nonce_after_cull() {
// given
let txq = new_queue();
let (tx1, _, tx2) = Tx::default().signed_triple();
let sender = tx1.sender();
// when
// Second should go to future
let res = txq.import(TestClient::new(), vec![tx1, tx2].local());
assert_eq!(res, vec![Ok(()), Ok(())]);
// Now block is imported
let client = TestClient::new().with_nonce(124);
txq.cull(client.clone());
// tx2 should be not be promoted to current
assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0);
// then
assert_eq!(txq.next_nonce(client.clone(), &sender), None);
assert_eq!(txq.next_nonce(client.with_nonce(125), &sender), Some(126.into()));
}
#[test]
fn should_return_true_if_there_is_local_transaction_pending() {
// given
let txq = new_queue();
let (tx1, tx2) = Tx::default().signed_pair();
assert_eq!(txq.has_local_pending_transactions(), false);
let client = TestClient::new().with_local(&tx1.sender());
// when
let res = txq.import(client.clone(), vec![tx1.unverified(), tx2.local()]);
assert_eq!(res, vec![Ok(()), Ok(())]);
// then
assert_eq!(txq.has_local_pending_transactions(), true);
}
#[test]
fn should_reject_transactions_below_base_gas() {
// given
let txq = new_queue();
let tx = Tx::default().signed();
// when
let res = txq.import(TestClient::new().with_gas_required(100_001), vec![tx].local());
// then
assert_eq!(res, vec![Err(transaction::Error::InsufficientGas {
minimal: 100_001.into(),
got: 21_000.into(),
})]);
}
#[test]
fn should_remove_out_of_date_transactions_occupying_queue() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 105,
max_per_sender: 3,
max_mem_usage: 5_000_000,
},
verifier::Options {
minimal_gas_price: 10.into(),
..Default::default()
},
PrioritizationStrategy::GasPriceOnly,
);
// that transaction will be occupying the queue
let (_, tx) = Tx::default().signed_pair();
let res = txq.import(TestClient::new(), vec![tx.local()]);
assert_eq!(res, vec![Ok(())]);
// This should not clear the transaction (yet)
txq.cull(TestClient::new());
assert_eq!(txq.status().status.transaction_count, 1);
// Now insert at least 100 transactions to have the other one marked as future.
for _ in 0..34 {
let (tx1, tx2, tx3) = Tx::default().signed_triple();
txq.import(TestClient::new(), vec![tx1, tx2, tx3].local());
}
assert_eq!(txq.status().status.transaction_count, 103);
// when
txq.cull(TestClient::new());
// then
assert_eq!(txq.status().status.transaction_count, 102);
}
#[test]
fn should_accept_local_transactions_below_min_gas_price() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 3,
max_per_sender: 3,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 10.into(),
..Default::default()
},
PrioritizationStrategy::GasPriceOnly,
);
let tx = Tx::gas_price(1).signed();
// when
let res = txq.import(TestClient::new(), vec![tx.local()]);
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
}
#[test]
fn should_accept_local_service_transaction() {
// given
let txq = new_queue();
let tx = Tx::gas_price(0).signed();
// when
let res = txq.import(
TestClient::new()
.with_local(&tx.sender()),
vec![tx.local()]
);
assert_eq!(res, vec![Ok(())]);
// then
assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1);
}
#[test]
fn should_not_accept_external_service_transaction_if_sender_not_certified() {
// given
let txq = new_queue();
let tx1 = Tx::gas_price(0).signed().unverified();
let tx2 = Tx::gas_price(0).signed().retracted();
let tx3 = Tx::gas_price(0).signed().unverified();
// when
let res = txq.import(TestClient::new(), vec![tx1, tx2]);
assert_eq!(res, vec![
Err(transaction::Error::InsufficientGasPrice {
minimal: 1.into(),
got: 0.into(),
}),
Err(transaction::Error::InsufficientGasPrice {
minimal: 1.into(),
got: 0.into(),
}),
]);
// then
let res = txq.import(TestClient::new().with_service_transaction(), vec![tx3]);
assert_eq!(res, vec![Ok(())]);
}
#[test]
fn should_not_return_transactions_over_nonce_cap() {
// given
let txq = new_queue();
let (tx1, tx2, tx3) = Tx::default().signed_triple();
let res = txq.import(
TestClient::new(),
vec![tx1, tx2, tx3].local()
);
assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]);
// when
let all = txq.pending(TestClient::new(), 0, 0, None);
// This should invalidate the cache!
let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into()));
// then
assert_eq!(all.len(), 3);
assert_eq!(limited.len(), 1);
}
#[test]
fn should_clear_cache_after_timeout_for_local() {
// given
let txq = new_queue();
let (tx, tx2) = Tx::default().signed_pair();
let res = txq.import(TestClient::new(), vec![
verifier::Transaction::Local(PendingTransaction::new(tx, transaction::Condition::Timestamp(1000).into())),
tx2.local()
]);
assert_eq!(res, vec![Ok(()), Ok(())]);
// This should populate cache and set timestamp to 1
// when
assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0);
assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0);
// This should invalidate the cache and trigger transaction ready.
// then
assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2);
}

185
miner/src/pool/tests/tx.rs Normal file
View File

@@ -0,0 +1,185 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::{U256, H256};
use ethkey::{Random, Generator};
use rustc_hex::FromHex;
use transaction::{self, Transaction, SignedTransaction, UnverifiedTransaction};
use pool::{verifier, VerifiedTransaction};
#[derive(Clone)]
pub struct Tx {
nonce: u64,
gas: u64,
gas_price: u64,
}
impl Default for Tx {
fn default() -> Self {
Tx {
nonce: 123,
gas: 21_000,
gas_price: 1,
}
}
}
impl Tx {
pub fn gas_price(gas_price: u64) -> Self {
Tx {
gas_price,
..Default::default()
}
}
pub fn signed(self) -> SignedTransaction {
let keypair = Random.generate().unwrap();
self.unsigned().sign(keypair.secret(), None)
}
pub fn signed_pair(self) -> (SignedTransaction, SignedTransaction) {
let (tx1, tx2, _) = self.signed_triple();
(tx1, tx2)
}
pub fn signed_triple(mut self) -> (SignedTransaction, SignedTransaction, SignedTransaction) {
let keypair = Random.generate().unwrap();
let tx1 = self.clone().unsigned().sign(keypair.secret(), None);
self.nonce += 1;
let tx2 = self.clone().unsigned().sign(keypair.secret(), None);
self.nonce += 1;
let tx3 = self.unsigned().sign(keypair.secret(), None);
(tx1, tx2, tx3)
}
pub fn signed_replacement(mut self) -> (SignedTransaction, SignedTransaction) {
let keypair = Random.generate().unwrap();
let tx1 = self.clone().unsigned().sign(keypair.secret(), None);
self.gas_price += 1;
let tx2 = self.unsigned().sign(keypair.secret(), None);
(tx1, tx2)
}
pub fn unsigned(self) -> Transaction {
Transaction {
action: transaction::Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: self.gas.into(),
gas_price: self.gas_price.into(),
nonce: self.nonce.into()
}
}
}
pub trait TxExt: Sized {
type Out;
type Verified;
type Hash;
fn hash(&self) -> Self::Hash;
fn local(self) -> Self::Out;
fn retracted(self) -> Self::Out;
fn unverified(self) -> Self::Out;
fn verified(self) -> Self::Verified;
}
impl<A, B, O, V, H> TxExt for (A, B) where
A: TxExt<Out=O, Verified=V, Hash=H>,
B: TxExt<Out=O, Verified=V, Hash=H>,
{
type Out = (O, O);
type Verified = (V, V);
type Hash = (H, H);
fn hash(&self) -> Self::Hash { (self.0.hash(), self.1.hash()) }
fn local(self) -> Self::Out { (self.0.local(), self.1.local()) }
fn retracted(self) -> Self::Out { (self.0.retracted(), self.1.retracted()) }
fn unverified(self) -> Self::Out { (self.0.unverified(), self.1.unverified()) }
fn verified(self) -> Self::Verified { (self.0.verified(), self.1.verified()) }
}
impl TxExt for SignedTransaction {
type Out = verifier::Transaction;
type Verified = VerifiedTransaction;
type Hash = H256;
fn hash(&self) -> Self::Hash {
UnverifiedTransaction::hash(self)
}
fn local(self) -> Self::Out {
verifier::Transaction::Local(self.into())
}
fn retracted(self) -> Self::Out {
verifier::Transaction::Retracted(self.into())
}
fn unverified(self) -> Self::Out {
verifier::Transaction::Unverified(self.into())
}
fn verified(self) -> Self::Verified {
VerifiedTransaction::from_pending_block_transaction(self)
}
}
impl TxExt for Vec<SignedTransaction> {
type Out = Vec<verifier::Transaction>;
type Verified = Vec<VerifiedTransaction>;
type Hash = Vec<H256>;
fn hash(&self) -> Self::Hash {
self.iter().map(|tx| tx.hash()).collect()
}
fn local(self) -> Self::Out {
self.into_iter().map(Into::into).map(verifier::Transaction::Local).collect()
}
fn retracted(self) -> Self::Out {
self.into_iter().map(Into::into).map(verifier::Transaction::Retracted).collect()
}
fn unverified(self) -> Self::Out {
self.into_iter().map(Into::into).map(verifier::Transaction::Unverified).collect()
}
fn verified(self) -> Self::Verified {
self.into_iter().map(VerifiedTransaction::from_pending_block_transaction).collect()
}
}
pub trait PairExt {
type Type;
fn into_vec(self) -> Vec<Self::Type>;
}
impl<A> PairExt for (A, A) {
type Type = A;
fn into_vec(self) -> Vec<A> {
vec![self.0, self.1]
}
}

288
miner/src/pool/verifier.rs Normal file
View File

@@ -0,0 +1,288 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Verifier
//!
//! Responsible for verifying a transaction before importing to the pool.
//! Should make sure that the transaction is structuraly valid.
//!
//! May have some overlap with `Readiness` since we don't want to keep around
//! stalled transactions.
use std::cmp;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use ethereum_types::{U256, H256};
use transaction;
use txpool;
use super::client::{Client, TransactionType};
use super::VerifiedTransaction;
/// Verification options.
#[derive(Debug, Clone, PartialEq)]
pub struct Options {
/// Minimal allowed gas price.
pub minimal_gas_price: U256,
/// Current block gas limit.
pub block_gas_limit: U256,
/// Maximal gas limit for a single transaction.
pub tx_gas_limit: U256,
}
#[cfg(test)]
impl Default for Options {
fn default() -> Self {
Options {
minimal_gas_price: 0.into(),
block_gas_limit: U256::max_value(),
tx_gas_limit: U256::max_value(),
}
}
}
/// Transaction to verify.
pub enum Transaction {
/// Fresh, never verified transaction.
///
/// We need to do full verification of such transactions
Unverified(transaction::UnverifiedTransaction),
/// Transaction from retracted block.
///
/// We could skip some parts of verification of such transactions
Retracted(transaction::UnverifiedTransaction),
/// Locally signed or retracted transaction.
///
/// We can skip consistency verifications and just verify readiness.
Local(transaction::PendingTransaction),
}
impl Transaction {
fn hash(&self) -> H256 {
match *self {
Transaction::Unverified(ref tx) => tx.hash(),
Transaction::Retracted(ref tx) => tx.hash(),
Transaction::Local(ref tx) => tx.hash(),
}
}
fn gas(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas,
Transaction::Retracted(ref tx) => &tx.gas,
Transaction::Local(ref tx) => &tx.gas,
}
}
fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn transaction(&self) -> &transaction::Transaction {
match *self {
Transaction::Unverified(ref tx) => &*tx,
Transaction::Retracted(ref tx) => &*tx,
Transaction::Local(ref tx) => &*tx,
}
}
fn is_local(&self) -> bool {
match *self {
Transaction::Local(..) => true,
_ => false,
}
}
fn is_retracted(&self) -> bool {
match *self {
Transaction::Retracted(..) => true,
_ => false,
}
}
}
/// Transaction verifier.
///
/// Verification can be run in parallel for all incoming transactions.
#[derive(Debug)]
pub struct Verifier<C> {
client: C,
options: Options,
id: Arc<AtomicUsize>,
}
impl<C> Verifier<C> {
/// Creates new transaction verfier with specified options.
pub fn new(client: C, options: Options, id: Arc<AtomicUsize>) -> Self {
Verifier {
client,
options,
id,
}
}
}
impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
type Error = transaction::Error;
type VerifiedTransaction = VerifiedTransaction;
fn verify_transaction(&self, tx: Transaction) -> Result<Self::VerifiedTransaction, Self::Error> {
// The checks here should be ordered by cost/complexity.
// Cheap checks should be done as early as possible to discard unneeded transactions early.
let hash = tx.hash();
if self.client.transaction_already_included(&hash) {
trace!(target: "txqueue", "[{:?}] Rejected tx already in the blockchain", hash);
bail!(transaction::Error::AlreadyImported)
}
let gas_limit = cmp::min(self.options.tx_gas_limit, self.options.block_gas_limit);
if tx.gas() > &gas_limit {
debug!(
target: "txqueue",
"[{:?}] Dropping transaction above gas limit: {} > min({}, {})",
hash,
tx.gas(),
self.options.block_gas_limit,
self.options.tx_gas_limit,
);
bail!(transaction::Error::GasLimitExceeded {
limit: gas_limit,
got: *tx.gas(),
});
}
let minimal_gas = self.client.required_gas(tx.transaction());
if tx.gas() < &minimal_gas {
trace!(target: "txqueue",
"[{:?}] Dropping transaction with insufficient gas: {} < {}",
hash,
tx.gas(),
minimal_gas,
);
bail!(transaction::Error::InsufficientGas {
minimal: minimal_gas,
got: *tx.gas(),
})
}
let is_own = tx.is_local();
// Quick exit for non-service transactions
if tx.gas_price() < &self.options.minimal_gas_price
&& !tx.gas_price().is_zero()
&& !is_own
{
trace!(
target: "txqueue",
"[{:?}] Rejected tx below minimal gas price threshold: {} < {}",
hash,
tx.gas_price(),
self.options.minimal_gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: self.options.minimal_gas_price,
got: *tx.gas_price(),
});
}
// Some more heavy checks below.
// Actually recover sender and verify that transaction
let is_retracted = tx.is_retracted();
let transaction = match tx {
Transaction::Retracted(tx) | Transaction::Unverified(tx) => match self.client.verify_transaction(tx) {
Ok(signed) => signed.into(),
Err(err) => {
debug!(target: "txqueue", "[{:?}] Rejected tx {:?}", hash, err);
bail!(err)
},
},
Transaction::Local(tx) => tx,
};
let sender = transaction.sender();
let account_details = self.client.account_details(&sender);
if transaction.gas_price < self.options.minimal_gas_price {
let transaction_type = self.client.transaction_type(&transaction);
if let TransactionType::Service = transaction_type {
debug!(target: "txqueue", "Service tx {:?} below minimal gas price accepted", hash);
} else if is_own || account_details.is_local {
info!(target: "own_tx", "Local tx {:?} below minimal gas price accepted", hash);
} else {
trace!(
target: "txqueue",
"[{:?}] Rejected tx below minimal gas price threshold: {} < {}",
hash,
transaction.gas_price,
self.options.minimal_gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: self.options.minimal_gas_price,
got: transaction.gas_price,
});
}
}
let cost = transaction.value + transaction.gas_price * transaction.gas;
if account_details.balance < cost {
debug!(
target: "txqueue",
"[{:?}] Rejected tx with not enough balance: {} < {}",
hash,
account_details.balance,
cost,
);
bail!(transaction::Error::InsufficientBalance {
cost: cost,
balance: account_details.balance,
});
}
if transaction.nonce < account_details.nonce {
debug!(
target: "txqueue",
"[{:?}] Rejected tx with old nonce ({} < {})",
hash,
transaction.nonce,
account_details.nonce,
);
bail!(transaction::Error::Old);
}
let priority = match (is_own || account_details.is_local, is_retracted) {
(true, _) => super::Priority::Local,
(false, false) => super::Priority::Regular,
(false, true) => super::Priority::Retracted,
};
Ok(VerifiedTransaction {
transaction,
priority,
hash,
sender,
insertion_id: self.id.fetch_add(1, atomic::Ordering::AcqRel),
})
}
}