From 8542d651ae8714de4f79342ba9f999dcac8b0988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 11:45:20 +0100 Subject: [PATCH] Refactoring transactions queue to avoid cloning transactions --- sync/src/transaction_queue.rs | 388 ++++++++++++++++++++-------------- 1 file changed, 224 insertions(+), 164 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index f2e15f955..1bbfbe36d 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -18,121 +18,126 @@ //! Transaction Queue -use std::vec::Vec; use std::cmp::{Ordering}; use std::collections::{HashMap, BTreeSet}; use util::numbers::{Uint, U256}; -use util::hash::{Address}; +use util::hash::{Address, H256}; use util::table::*; use ethcore::transaction::*; #[derive(Clone, Debug)] -struct VerifiedTransaction { - tx: SignedTransaction, - nonce_height: U256 +struct TransactionOrder { + nonce_height: U256, + gas_price: U256, + hash: H256, } -impl VerifiedTransaction { - pub fn new(tx: SignedTransaction, nonce_height: U256) -> VerifiedTransaction { - VerifiedTransaction { - tx: tx, - nonce_height: nonce_height +impl TransactionOrder { + pub fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self { + TransactionOrder { + nonce_height: tx.nonce() - base_nonce, + gas_price: tx.transaction.gas_price, + hash: tx.hash(), } } - - pub fn sender(&self) -> Address { - self.tx.sender().unwrap() - } } -impl Eq for VerifiedTransaction {} -impl PartialEq for VerifiedTransaction { - fn eq(&self, other: &VerifiedTransaction) -> bool { +impl Eq for TransactionOrder {} +impl PartialEq for TransactionOrder { + fn eq(&self, other: &TransactionOrder) -> bool { self.cmp(other) == Ordering::Equal } } -impl PartialOrd for VerifiedTransaction { - fn partial_cmp(&self, other: &VerifiedTransaction) -> Option { +impl PartialOrd for TransactionOrder { + fn partial_cmp(&self, other: &TransactionOrder) -> Option { Some(self.cmp(other)) } } -impl Ord for VerifiedTransaction { - fn cmp(&self, b: &VerifiedTransaction) -> Ordering { +impl Ord for TransactionOrder { + fn cmp(&self, b: &TransactionOrder) -> Ordering { // First check nonce_height if self.nonce_height != b.nonce_height { return self.nonce_height.cmp(&b.nonce_height); } // Then compare gas_prices - let a_gas = self.tx.gas_price; - let b_gas = b.tx.gas_price; + let a_gas = self.gas_price; + let b_gas = b.gas_price; if a_gas != b_gas { return a_gas.cmp(&b_gas); } - // Compare nonce - let a_nonce = self.tx.nonce; - let b_nonce = b.tx.nonce; - if a_nonce != b_nonce { - return a_nonce.cmp(&b_nonce); - } - - // and senders - let a_sender = self.sender(); - let b_sender = b.sender(); - a_sender.cmp(&b_sender) + // Compare hashes + self.hash.cmp(&b.hash) } } -struct TransactionsByPriorityAndAddress { - priority: BTreeSet, - address: Table, +struct VerifiedTransaction { + transaction: SignedTransaction +} +impl VerifiedTransaction { + fn new(transaction: SignedTransaction) -> Self { + VerifiedTransaction { + transaction: transaction + } + } + + fn hash(&self) -> H256 { + self.transaction.hash() + } + + fn nonce(&self) -> U256 { + self.transaction.nonce + } + + fn sender(&self) -> Address { + self.transaction.sender().unwrap() + } +} + +struct TransactionSet { + by_priority: BTreeSet, + by_address: Table, limit: usize, } -impl TransactionsByPriorityAndAddress { - fn insert(&mut self, address: Address, nonce: U256, verified_tx: VerifiedTransaction) { - self.priority.insert(verified_tx.clone()); - self.address.insert(address, nonce, verified_tx); +impl TransactionSet { + fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) { + self.by_priority.insert(order.clone()); + self.by_address.insert(sender, nonce, order); } - fn enforce_limit(&mut self) { - let len = self.priority.len(); + fn enforce_limit(&mut self, by_hash: &HashMap) { + let len = self.by_priority.len(); if len <= self.limit { return; } - let to_remove : Vec = { - self.priority + let to_drop : Vec<&VerifiedTransaction> = { + self.by_priority .iter() .skip(self.limit) - .map(|v_tx| v_tx.tx.clone()) + .map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected.")) .collect() }; - for tx in to_remove { - self.remove(&tx); + for tx in to_drop { + self.drop(&tx.sender(), &tx.nonce()); } } - fn remove_by_address(&mut self, sender: &Address, nonce: &U256) -> Option { - if let Some(verified_tx) = self.address.remove(sender, nonce) { - self.priority.remove(&verified_tx); - return Some(verified_tx); + fn drop(&mut self, sender: &Address, nonce: &U256) -> Option { + if let Some(tx_order) = self.by_address.remove(sender, nonce) { + self.by_priority.remove(&tx_order); + return Some(tx_order); } None } - fn remove(&mut self, tx: &SignedTransaction) -> Option { - // First find the transaction by address - let address = tx.sender().unwrap(); - self.remove_by_address(&address, &tx.nonce) - } - fn clear(&mut self) { - self.priority.clear(); - self.address.clear(); + self.by_priority.clear(); + self.by_address.clear(); } } @@ -148,9 +153,11 @@ pub struct TransactionQueueStatus { /// TransactionQueue implementation pub struct TransactionQueue { /// Priority queue for transactions that can go to block - current: TransactionsByPriorityAndAddress, + current: TransactionSet, /// Priority queue for transactions that has been received but are not yet valid to go to block - future: TransactionsByPriorityAndAddress, + future: TransactionSet, + /// All transactions managed by queue indexed by hash + by_hash: HashMap, /// Last nonce of transaction in current last_nonces: HashMap, /// First nonce of transaction in current (used to determine priority) @@ -165,20 +172,21 @@ impl TransactionQueue { /// Create new instance of this Queue with specified limits pub fn with_limits(current_limit: usize, future_limit: usize) -> Self { - let current = TransactionsByPriorityAndAddress { - address: Table::new(), - priority: BTreeSet::new(), + let current = TransactionSet { + by_priority: BTreeSet::new(), + by_address: Table::new(), limit: current_limit, }; - let future = TransactionsByPriorityAndAddress { - address: Table::new(), - priority: BTreeSet::new(), + let future = TransactionSet { + by_priority: BTreeSet::new(), + by_address: Table::new(), limit: future_limit, }; TransactionQueue { current: current, future: future, + by_hash: HashMap::new(), last_nonces: HashMap::new(), first_nonces: HashMap::new(), } @@ -187,8 +195,8 @@ impl TransactionQueue { /// Returns current status for this queue pub fn status(&self) -> TransactionQueueStatus { TransactionQueueStatus { - pending: self.current.priority.len(), - future: self.future.priority.len(), + pending: self.current.by_priority.len(), + future: self.future.by_priority.len(), } } @@ -203,101 +211,107 @@ impl TransactionQueue { /// Add signed transaction to queue to be verified and imported pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { - self.import_tx(tx, fetch_nonce); + self.import_tx(VerifiedTransaction::new(tx), fetch_nonce); } - /// Removes all transactions in given slice + /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, txs: &[SignedTransaction]) { + pub fn remove_all(&mut self, txs: &[H256]) { for tx in txs { self.remove(&tx); } } - /// Removes transaction from queue. + /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, tx: &SignedTransaction) { + pub fn remove(&mut self, hash: &H256) { + let transaction = self.by_hash.remove(hash); + if transaction.is_none() { + // We don't know this transaction + return; + } + let transaction = transaction.unwrap(); + let sender = transaction.sender(); + let nonce = transaction.nonce(); + + // Remove from future + self.future.drop(&sender, &nonce); + // Remove from current - let removed = self.current.remove(tx); - if let Some(verified_tx) = removed { - let sender = verified_tx.sender(); - - // Are there any other transactions from this sender? - if !self.current.address.has_row(&sender) { - // Clear last & first nonces - self.last_nonces.remove(&sender); - self.first_nonces.remove(&sender); - return; - } - - // Let's find those with higher nonce (TODO [todr] optimize?) - let to_move_to_future = { - let row_map = self.current.address.row(&sender).unwrap(); - let tx_nonce = verified_tx.tx.nonce; - let mut to_future = Vec::new(); - let mut highest = U256::zero(); - let mut lowest = tx_nonce.clone(); - - // Search nonces to remove and track lowest and highest - for (nonce, _) in row_map.iter() { - if nonce > &tx_nonce { - to_future.push(nonce.clone()); - } else if nonce > &highest { - highest = nonce.clone(); - } else if nonce < &lowest { - lowest = nonce.clone(); - } - } - - // Update first_nonces and last_nonces - if highest == U256::zero() { - self.last_nonces.remove(&sender); - } else { - self.last_nonces.insert(sender.clone(), highest); - } - - if lowest == tx_nonce { - self.first_nonces.remove(&sender); - } else { - self.first_nonces.insert(sender.clone(), lowest); - } - - // return to future - to_future - }; - - for k in to_move_to_future { - if let Some(v) = self.current.remove_by_address(&sender, &k) { - self.future.insert(sender.clone(), v.tx.nonce, v); - } - } - self.future.enforce_limit(); + let order = self.current.drop(&sender, &nonce); + if order.is_none() { return; } - // Remove from future - { - let sender = tx.sender().unwrap(); - if let Some(_) = self.future.remove_by_address(&sender, &tx.nonce) { - return; + // Are there any other transactions from this sender? + if !self.current.by_address.has_row(&sender) { + // Clear last & first nonces + self.last_nonces.remove(&sender); + self.first_nonces.remove(&sender); + return; + } + + // Let's find those with higher nonce (TODO [todr] optimize?) + let to_move_to_future = { + let row_map = self.current.by_address.row(&sender).unwrap(); + let mut to_future = Vec::new(); + let mut highest = U256::zero(); + let mut lowest = nonce.clone(); + + // Search nonces to remove and track lowest and highest + for (current_nonce, _) in row_map.iter() { + if current_nonce > &nonce { + to_future.push(current_nonce.clone()); + } else if current_nonce > &highest { + highest = current_nonce.clone(); + } else if current_nonce < &lowest { + lowest = current_nonce.clone(); + } + } + + // Update first_nonces and last_nonces + if highest == U256::zero() { + self.last_nonces.remove(&sender); + } else { + self.last_nonces.insert(sender.clone(), highest); + } + + if lowest == nonce { + self.first_nonces.remove(&sender); + } else { + self.first_nonces.insert(sender.clone(), lowest); + } + + // return to future + to_future + }; + + for k in to_move_to_future { + if let Some(v) = self.current.drop(&sender, &k) { + // TODO [todr] Recalculate height? + self.future.insert(sender.clone(), k, v); } } + self.future.enforce_limit(&self.by_hash); } /// Returns top transactions from the queue pub fn top_transactions(&self, size: usize) -> Vec { - self.current.priority + self.current.by_priority .iter() .take(size) - .map(|t| t.tx.clone()).collect() + .map(|t| self.by_hash.get(&t.hash).expect("Transaction Queue Inconsistency")) + .map(|t| t.transaction.clone()) + .collect() } /// Removes all elements (in any state) from the queue pub fn clear(&mut self) { self.current.clear(); self.future.clear(); + self.by_hash.clear(); self.last_nonces.clear(); self.first_nonces.clear(); } @@ -305,31 +319,30 @@ impl TransactionQueue { fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { let mut current_nonce = current_nonce + U256::one(); { - let txs_by_nonce = self.future.address.row_mut(&address); - if let None = txs_by_nonce { + let by_nonce = self.future.by_address.row_mut(&address); + if let None = by_nonce { return None; } - let mut txs_by_nonce = txs_by_nonce.unwrap(); - - while let Some(tx) = txs_by_nonce.remove(¤t_nonce) { - // remove also from priority - self.future.priority.remove(&tx); + let mut by_nonce = by_nonce.unwrap(); + while let Some(order) = by_nonce.remove(¤t_nonce) { + // remove also from priority and hash + self.future.by_priority.remove(&order); // Put to current - let height = current_nonce - first_nonce; - let verified_tx = VerifiedTransaction::new(tx.tx, U256::from(height)); - self.current.insert(address.clone(), verified_tx.tx.nonce, verified_tx); + let transaction = self.by_hash.get(&order.hash).expect("TransactionQueue Inconsistency"); + let order = TransactionOrder::for_transaction(transaction, first_nonce); + self.current.insert(address.clone(), transaction.nonce(), order); current_nonce = current_nonce + U256::one(); } } - self.future.address.clear_if_empty(&address); + self.future.by_address.clear_if_empty(&address); // Returns last inserted nonce Some(current_nonce - U256::one()) } - fn import_tx(&mut self, tx: SignedTransaction, fetch_nonce: &T) + fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { - let nonce = tx.nonce; - let address = tx.sender().unwrap(); + let nonce = tx.nonce(); + let address = tx.sender(); let next_nonce = U256::one() + self.last_nonces .get(&address) @@ -338,11 +351,12 @@ impl TransactionQueue { // Check height if nonce > next_nonce { - let height = nonce - next_nonce; - let verified_tx = VerifiedTransaction::new(tx, height); + let order = TransactionOrder::for_transaction(&tx, next_nonce); + // Insert to by_hash + self.by_hash.insert(tx.hash(), tx); // We have a gap - put to future - self.future.insert(address, nonce, verified_tx); - self.future.enforce_limit(); + self.future.insert(address, nonce, order); + self.future.enforce_limit(&self.by_hash); return; } else if next_nonce > nonce { // Droping transaction @@ -354,29 +368,34 @@ impl TransactionQueue { .cloned() .unwrap_or_else(|| nonce.clone()); - let height = nonce - first_nonce; - let verified_tx = VerifiedTransaction::new(tx, height); + let order = TransactionOrder::for_transaction(&tx, first_nonce); + // Insert to by_hash + self.by_hash.insert(tx.hash(), tx); + // Insert to current - self.current.insert(address.clone(), nonce, verified_tx); + self.current.insert(address.clone(), nonce, order); // But maybe there are some more items waiting in future? let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce); self.first_nonces.insert(address.clone(), first_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit - self.current.enforce_limit(); + self.current.enforce_limit(&self.by_hash); } } + #[cfg(test)] mod test { extern crate rustc_serialize; use self::rustc_serialize::hex::FromHex; - + use std::collections::{HashMap, BTreeSet}; use util::crypto::KeyPair; use util::numbers::{U256, Uint}; use util::hash::{Address}; + use util::table::*; use ethcore::transaction::*; use super::*; + use super::{TransactionSet, TransactionOrder, VerifiedTransaction}; fn new_unsigned_tx(nonce: U256) -> Transaction { Transaction { @@ -408,6 +427,46 @@ mod test { (tx.sign(secret), tx2.sign(secret)) } + #[test] + fn should_create_transaction_set() { + // given + let mut set = TransactionSet { + by_priority: BTreeSet::new(), + by_address: Table::new(), + limit: 1 + }; + let (tx1, tx2) = new_txs(U256::from(1)); + let tx1 = VerifiedTransaction::new(tx1); + let tx2 = VerifiedTransaction::new(tx2); + let by_hash = { + let mut x = HashMap::new(); + let tx1 = VerifiedTransaction::new(tx1.transaction.clone()); + let tx2 = VerifiedTransaction::new(tx2.transaction.clone()); + x.insert(tx1.hash(), tx1); + x.insert(tx2.hash(), tx2); + x + }; + // Insert both transactions + let order1 = TransactionOrder::for_transaction(&tx1, U256::zero()); + set.insert(tx1.sender(), tx1.nonce(), order1.clone()); + let order2 = TransactionOrder::for_transaction(&tx2, U256::zero()); + set.insert(tx2.sender(), tx2.nonce(), order2.clone()); + assert_eq!(set.by_priority.len(), 2); + assert_eq!(set.by_address.len(), 2); + + // when + set.enforce_limit(&by_hash); + + // then + assert_eq!(set.by_priority.len(), 1); + assert_eq!(set.by_address.len(), 1); + assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1); + set.clear(); + assert_eq!(set.by_priority.len(), 0); + assert_eq!(set.by_address.len(), 0); + } + + #[test] fn should_import_tx() { // given @@ -495,8 +554,8 @@ mod test { assert_eq!(txq2.status().future, 1); // when - txq2.remove(&tx); - txq2.remove(&tx2); + txq2.remove(&tx.hash()); + txq2.remove(&tx2.hash()); // then @@ -518,7 +577,7 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx); + txq.remove(&tx.hash()); // then let stats = txq.status(); @@ -608,14 +667,15 @@ mod test { assert_eq!(txq.status().pending, 2); // when - txq.remove(&tx1); + txq.remove(&tx1.hash()); + assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); txq.add(tx1.clone(), &default_nonce); // then let stats = txq.status(); - assert_eq!(stats.pending, 2); assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); }