Merge pull request #651 from ethcore/tx_queue_integration

Tx_queue_docs -> To master
This commit is contained in:
Gav Wood 2016-03-10 13:19:40 +01:00
commit f708d36fad
3 changed files with 104 additions and 3 deletions

View File

@ -396,7 +396,8 @@ impl<V> Client<V> where V: Verifier {
.commit(header.number(), &header.hash(), ancient) .commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed."); .expect("State DB commit failed.");
// And update the chain // And update the chain after commit to prevent race conditions
// (when something is in chain but you are not able to fetch details)
self.chain.write().unwrap() self.chain.write().unwrap()
.insert_block(&block.bytes, receipts); .insert_block(&block.bytes, receipts);

View File

@ -72,6 +72,7 @@ mod chain;
mod io; mod io;
mod range_collection; mod range_collection;
mod transaction_queue; mod transaction_queue;
pub use transaction_queue::TransactionQueue;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View File

@ -17,6 +17,67 @@
// TODO [todr] - own transactions should have higher priority // TODO [todr] - own transactions should have higher priority
//! Transaction Queue //! Transaction Queue
//!
//! TransactionQueue keeps track of all transactions seen by the node (received from other peers) and own transactions
//! and orders them by priority. Top priority transactions are those with low nonce height (difference between
//! transaction's nonce and next nonce expected from this sender). If nonces are equal transaction's gas price is used
//! for comparison (higher gas price = higher priority).
//!
//! # Usage Example
//!
//! ```rust
//! extern crate ethcore_util as util;
//! extern crate ethcore;
//! extern crate ethsync;
//! extern crate rustc_serialize;
//!
//! use util::crypto::KeyPair;
//! use util::hash::Address;
//! use util::numbers::{Uint, U256};
//! use ethsync::TransactionQueue;
//! use ethcore::transaction::*;
//! use rustc_serialize::hex::FromHex;
//!
//! fn main() {
//! let key = KeyPair::create().unwrap();
//! let t1 = Transaction { action: Action::Create, value: U256::from(100), data: "3331600055".from_hex().unwrap(),
//! gas: U256::from(100_000), gas_price: U256::one(), nonce: U256::from(10) };
//! let t2 = Transaction { action: Action::Create, value: U256::from(100), data: "3331600055".from_hex().unwrap(),
//! gas: U256::from(100_000), gas_price: U256::one(), nonce: U256::from(11) };
//!
//! let st1 = t1.sign(&key.secret());
//! let st2 = t2.sign(&key.secret());
//! let default_nonce = |_a: &Address| U256::from(10);
//!
//! let mut txq = TransactionQueue::new();
//! txq.add(st2.clone(), &default_nonce);
//! txq.add(st1.clone(), &default_nonce);
//!
//! // Check status
//! assert_eq!(txq.status().pending, 2);
//! // Check top transactions
//! let top = txq.top_transactions(3);
//! assert_eq!(top.len(), 2);
//! assert_eq!(top[0], st1);
//! assert_eq!(top[1], st2);
//!
//! // And when transaction is removed (but nonce haven't changed)
//! // it will move invalid transactions to future
//! txq.remove(&st1.hash(), &default_nonce);
//! assert_eq!(txq.status().pending, 0);
//! assert_eq!(txq.status().future, 1);
//! assert_eq!(txq.top_transactions(3).len(), 0);
//! }
//! ```
//!
//! # Maintaing valid state
//!
//! 1. Whenever transaction is imported to queue (to queue) all other transactions from this sender are revalidated in current. It means that they are moved to future and back again (height recalculation & gap filling).
//! 2. Whenever transaction is removed:
//! - When it's removed from `future` - all `future` transactions heights are recalculated and then
//! we check if the transactions should go to `current` (comparing state nonce)
//! - When it's removed from `current` - all transactions from this sender (`current` & `future`) are recalculated.
//!
use std::cmp::{Ordering}; use std::cmp::{Ordering};
use std::collections::{HashMap, BTreeSet}; use std::collections::{HashMap, BTreeSet};
@ -28,9 +89,16 @@ use ethcore::error::Error;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Light structure used to identify transaction and it's order
struct TransactionOrder { struct TransactionOrder {
/// Primary ordering factory. Difference between transaction nonce and expected nonce in state
/// (e.g. Tx(nonce:5), State(nonce:0) -> height: 5)
/// High nonce_height = Low priority (processed later)
nonce_height: U256, nonce_height: U256,
/// Gas Price of the transaction.
/// Low gas price = Low priority (processed later)
gas_price: U256, gas_price: U256,
/// Hash to identify associated transaction
hash: H256, hash: H256,
} }
@ -71,7 +139,7 @@ impl Ord for TransactionOrder {
let a_gas = self.gas_price; let a_gas = self.gas_price;
let b_gas = b.gas_price; let b_gas = b.gas_price;
if a_gas != b_gas { if a_gas != b_gas {
return a_gas.cmp(&b_gas); return b_gas.cmp(&a_gas);
} }
// Compare hashes // Compare hashes
@ -79,6 +147,7 @@ impl Ord for TransactionOrder {
} }
} }
/// Verified transaction (with sender)
struct VerifiedTransaction { struct VerifiedTransaction {
transaction: SignedTransaction transaction: SignedTransaction
} }
@ -103,6 +172,11 @@ impl VerifiedTransaction {
} }
} }
/// Holds transactions accessible by (address, nonce) and by priority
///
/// TransactionSet keeps number of entries below limit, but it doesn't
/// automatically happen during `insert/remove` operations.
/// You have to call `enforce_limit` to remove lowest priority transactions from set.
struct TransactionSet { struct TransactionSet {
by_priority: BTreeSet<TransactionOrder>, by_priority: BTreeSet<TransactionOrder>,
by_address: Table<Address, U256, TransactionOrder>, by_address: Table<Address, U256, TransactionOrder>,
@ -110,11 +184,15 @@ struct TransactionSet {
} }
impl TransactionSet { impl TransactionSet {
/// Inserts `TransactionOrder` to this set
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option<TransactionOrder> { fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option<TransactionOrder> {
self.by_priority.insert(order.clone()); self.by_priority.insert(order.clone());
self.by_address.insert(sender, nonce, order) self.by_address.insert(sender, nonce, order)
} }
/// Remove low priority transactions if there is more then specified by given `limit`.
///
/// It drops transactions from this set but also removes associated `VerifiedTransaction`.
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) { fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
let len = self.by_priority.len(); let len = self.by_priority.len();
if len <= self.limit { if len <= self.limit {
@ -136,6 +214,7 @@ impl TransactionSet {
} }
} }
/// Drop transaction from this set (remove from `by_priority` and `by_address`)
fn drop(&mut self, sender: &Address, nonce: &U256) -> Option<TransactionOrder> { fn drop(&mut self, sender: &Address, nonce: &U256) -> Option<TransactionOrder> {
if let Some(tx_order) = self.by_address.remove(sender, nonce) { if let Some(tx_order) = self.by_address.remove(sender, nonce) {
self.by_priority.remove(&tx_order); self.by_priority.remove(&tx_order);
@ -144,6 +223,7 @@ impl TransactionSet {
None None
} }
/// Drop all transactions.
fn clear(&mut self) { fn clear(&mut self) {
self.by_priority.clear(); self.by_priority.clear();
self.by_address.clear(); self.by_address.clear();
@ -268,6 +348,8 @@ impl TransactionQueue {
// We will either move transaction to future or remove it completely // We will either move transaction to future or remove it completely
// so there will be no transactions from this sender in current // so there will be no transactions from this sender in current
self.last_nonces.remove(&sender); self.last_nonces.remove(&sender);
// First update height of transactions in future to avoid collisions
self.update_future(&sender, current_nonce);
// This should move all current transactions to future and remove old transactions // This should move all current transactions to future and remove old transactions
self.move_all_to_future(&sender, current_nonce); self.move_all_to_future(&sender, current_nonce);
// And now lets check if there is some chain of transactions in future // And now lets check if there is some chain of transactions in future
@ -277,6 +359,7 @@ impl TransactionQueue {
} }
} }
/// Update height of all transactions in future transactions set.
fn update_future(&mut self, sender: &Address, current_nonce: U256) { fn update_future(&mut self, sender: &Address, current_nonce: U256) {
// We need to drain all transactions for current sender from future and reinsert them with updated height // We need to drain all transactions for current sender from future and reinsert them with updated height
let all_nonces_from_sender = match self.future.by_address.row(&sender) { let all_nonces_from_sender = match self.future.by_address.row(&sender) {
@ -289,6 +372,8 @@ impl TransactionQueue {
} }
} }
/// Drop all transactions from given sender from `current`.
/// Either moves them to `future` or removes them from queue completely.
fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) { fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) {
let all_nonces_from_sender = match self.current.by_address.row(&sender) { let all_nonces_from_sender = match self.current.by_address.row(&sender) {
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(), Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
@ -309,7 +394,7 @@ impl TransactionQueue {
// Will be used when mining merged // Will be used when mining merged
#[allow(dead_code)] #[allow(dead_code)]
/// Returns top transactions from the queue /// Returns top transactions from the queue ordered by priority.
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> { pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
self.current.by_priority self.current.by_priority
.iter() .iter()
@ -327,6 +412,8 @@ impl TransactionQueue {
self.last_nonces.clear(); self.last_nonces.clear();
} }
/// Checks if there are any transactions in `future` that should actually be promoted to `current`
/// (because nonce matches).
fn move_matching_future_to_current(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { fn move_matching_future_to_current(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) {
{ {
let by_nonce = self.future.by_address.row_mut(&address); let by_nonce = self.future.by_address.row_mut(&address);
@ -348,6 +435,14 @@ impl TransactionQueue {
self.last_nonces.insert(address, current_nonce - U256::one()); self.last_nonces.insert(address, current_nonce - U256::one());
} }
/// Adds VerifiedTransaction to this queue.
///
/// Determines if it should be placed in current or future. When transaction is
/// imported to `current` also checks if there are any `future` transactions that should be promoted because of
/// this.
///
/// It ignores transactions that has already been imported (same `hash`) and replaces the transaction
/// iff `(address, nonce)` is the same but `gas_price` is higher.
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
@ -386,6 +481,10 @@ impl TransactionQueue {
self.current.enforce_limit(&mut self.by_hash); self.current.enforce_limit(&mut self.by_hash);
} }
/// Replaces transaction in given set (could be `future` or `current`).
///
/// If there is already transaction with same `(sender, nonce)` it will be replaced iff `gas_price` is higher.
/// One of the transactions is dropped from set and also removed from queue entirely (from `by_hash`).
fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) { fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
let order = TransactionOrder::for_transaction(&tx, base_nonce); let order = TransactionOrder::for_transaction(&tx, base_nonce);
let hash = tx.hash(); let hash = tx.hash();