// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
use std::sync::Arc;
use std::slice;
use std::collections::{hash_map, HashMap, BTreeSet};
use error;
use listener::{Listener, NoopListener};
use options::Options;
use ready::{Ready, Readiness};
use scoring::{self, Scoring, ScoreWithRef};
use status::{LightStatus, Status};
use transactions::{AddResult, Transactions};
use {VerifiedTransaction};
/// Internal representation of transaction.
///
/// Includes unique insertion id that can be used for scoring explictly,
/// but internally is used to resolve conflicts in case of equal scoring
/// (newer transactionsa are preferred).
#[derive(Debug)]
pub struct Transaction {
/// Sequential id of the transaction
pub insertion_id: u64,
/// Shared transaction
pub transaction: Arc,
}
impl Clone for Transaction {
fn clone(&self) -> Self {
Transaction {
insertion_id: self.insertion_id,
transaction: self.transaction.clone(),
}
}
}
impl ::std::ops::Deref for Transaction {
type Target = Arc;
fn deref(&self) -> &Self::Target {
&self.transaction
}
}
/// A transaction pool.
#[derive(Debug)]
pub struct Pool, L = NoopListener> {
listener: L,
scoring: S,
options: Options,
mem_usage: usize,
transactions: HashMap>,
by_hash: HashMap>,
best_transactions: BTreeSet>,
worst_transactions: BTreeSet>,
insertion_id: u64,
}
impl + Default> Default for Pool {
fn default() -> Self {
Self::with_scoring(S::default(), Options::default())
}
}
impl + Default> Pool {
/// Creates a new `Pool` with given options
/// and default `Scoring` and `Listener`.
pub fn with_options(options: Options) -> Self {
Self::with_scoring(S::default(), options)
}
}
impl> Pool {
/// Creates a new `Pool` with given `Scoring` and options.
pub fn with_scoring(scoring: S, options: Options) -> Self {
Self::new(NoopListener, scoring, options)
}
}
const INITIAL_NUMBER_OF_SENDERS: usize = 16;
impl Pool where
T: VerifiedTransaction,
S: Scoring,
L: Listener,
{
/// Creates new `Pool` with given `Scoring`, `Listener` and options.
pub fn new(listener: L, scoring: S, options: Options) -> Self {
let transactions = HashMap::with_capacity(INITIAL_NUMBER_OF_SENDERS);
let by_hash = HashMap::with_capacity(options.max_count / 16);
Pool {
listener,
scoring,
options,
mem_usage: 0,
transactions,
by_hash,
best_transactions: Default::default(),
worst_transactions: Default::default(),
insertion_id: 0,
}
}
/// Attempts to import new transaction to the pool, returns a `Arc` or an `Error`.
///
/// NOTE: Since `Ready`ness is separate from the pool it's possible to import stalled transactions.
/// It's the caller responsibility to make sure that's not the case.
///
/// NOTE: The transaction may push out some other transactions from the pool
/// either because of limits (see `Options`) or because `Scoring` decides that the transaction
/// replaces an existing transaction from that sender.
/// If any limit is reached the transaction with the lowest `Score` is evicted to make room.
///
/// The `Listener` will be informed on any drops or rejections.
pub fn import(&mut self, transaction: T) -> error::Result> {
let mem_usage = transaction.mem_usage();
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash())));
self.insertion_id += 1;
let transaction = Transaction {
insertion_id: self.insertion_id,
transaction: Arc::new(transaction),
};
// TODO [ToDr] Most likely move this after the transaction is inserted.
// Avoid using should_replace, but rather use scoring for that.
{
let remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(transaction) {
Err(err) => {
s.listener.rejected(transaction, err.kind());
Err(err)
},
Ok(None) => Ok(false),
Ok(Some(removed)) => {
s.listener.dropped(&removed, Some(transaction));
s.finalize_remove(removed.hash());
Ok(true)
},
}
};
while self.by_hash.len() + 1 > self.options.max_count {
trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
if !remove_worst(self, &transaction)? {
break;
}
}
while self.mem_usage + mem_usage > self.options.max_mem_usage {
trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
if !remove_worst(self, &transaction)? {
break;
}
}
}
let (result, prev_state, current_state) = {
let transactions = self.transactions.entry(transaction.sender().clone()).or_insert_with(Transactions::default);
// get worst and best transactions for comparison
let prev = transactions.worst_and_best();
let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender);
let current = transactions.worst_and_best();
(result, prev, current)
};
// update best and worst transactions from this sender (if required)
self.update_senders_worst_and_best(prev_state, current_state);
match result {
AddResult::Ok(tx) => {
self.listener.added(&tx, None);
self.finalize_insert(&tx, None);
Ok(tx.transaction)
},
AddResult::PushedOut { new, old } |
AddResult::Replaced { new, old } => {
self.listener.added(&new, Some(&old));
self.finalize_insert(&new, Some(&old));
Ok(new.transaction)
},
AddResult::TooCheap { new, old } => {
let error = error::ErrorKind::TooCheapToReplace(format!("{:x}", old.hash()), format!("{:x}", new.hash()));
self.listener.rejected(&new, &error);
bail!(error)
},
AddResult::TooCheapToEnter(new, score) => {
let error = error::ErrorKind::TooCheapToEnter(format!("{:x}", new.hash()), format!("{:?}", score));
self.listener.rejected(&new, &error);
bail!(error)
}
}
}
/// Updates state of the pool statistics if the transaction was added to a set.
fn finalize_insert(&mut self, new: &Transaction, old: Option<&Transaction>) {
self.mem_usage += new.mem_usage();
self.by_hash.insert(new.hash().clone(), new.clone());
if let Some(old) = old {
self.finalize_remove(old.hash());
}
}
/// Updates the pool statistics if transaction was removed.
fn finalize_remove(&mut self, hash: &T::Hash) -> Option> {
self.by_hash.remove(hash).map(|old| {
self.mem_usage -= old.transaction.mem_usage();
old.transaction
})
}
/// Updates best and worst transactions from a sender.
fn update_senders_worst_and_best(
&mut self,
previous: Option<((S::Score, Transaction), (S::Score, Transaction))>,
current: Option<((S::Score, Transaction), (S::Score, Transaction))>,
) {
let worst_collection = &mut self.worst_transactions;
let best_collection = &mut self.best_transactions;
let is_same = |a: &(S::Score, Transaction), b: &(S::Score, Transaction)| {
a.0 == b.0 && a.1.hash() == b.1.hash()
};
let update = |collection: &mut BTreeSet<_>, (score, tx), remove| if remove {
collection.remove(&ScoreWithRef::new(score, tx));
} else {
collection.insert(ScoreWithRef::new(score, tx));
};
match (previous, current) {
(None, Some((worst, best))) => {
update(worst_collection, worst, false);
update(best_collection, best, false);
},
(Some((worst, best)), None) => {
// all transactions from that sender has been removed.
// We can clear a hashmap entry.
self.transactions.remove(worst.1.sender());
update(worst_collection, worst, true);
update(best_collection, best, true);
},
(Some((w1, b1)), Some((w2, b2))) => {
if !is_same(&w1, &w2) {
update(worst_collection, w1, true);
update(worst_collection, w2, false);
}
if !is_same(&b1, &b2) {
update(best_collection, b1, true);
update(best_collection, b2, false);
}
},
(None, None) => {},
}
}
/// Attempts to remove the worst transaction from the pool if it's worse than the given one.
///
/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
/// In such case we will accept the transaction even though it is going to exceed the limit.
fn remove_worst(&mut self, transaction: &Transaction) -> error::Result