From 725e894f9bd26397364943df9d259211bc532844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 1 Mar 2016 21:48:58 +0100 Subject: [PATCH 1/5] TransactionsQueue implementation --- Cargo.lock | 1 + sync/Cargo.toml | 1 + sync/src/lib.rs | 2 + sync/src/transaction_queue.rs | 622 ++++++++++++++++++++++++++++++++++ 4 files changed, 626 insertions(+) create mode 100644 sync/src/transaction_queue.rs diff --git a/Cargo.lock b/Cargo.lock index bca236813..e558606eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -259,6 +259,7 @@ dependencies = [ "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 26a7d463c..2ce65ca77 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -16,6 +16,7 @@ env_logger = "0.3" time = "0.1.34" rand = "0.3.13" heapsize = "0.3" +rustc-serialize = "0.3" [features] default = [] diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 6f28fc320..74541660d 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -70,6 +70,8 @@ use io::NetSyncIo; mod chain; mod io; mod range_collection; +// TODO [todr] Made public to suppress dead code warnings +pub mod transaction_queue; #[cfg(test)] mod tests; diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs new file mode 100644 index 000000000..341607afe --- /dev/null +++ b/sync/src/transaction_queue.rs @@ -0,0 +1,622 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +// TODO [todr] - own transactions should have higher priority + +//! Transaction Queue + +use std::vec::Vec; +use std::cmp::{Ordering}; +use std::collections::{HashMap, BTreeSet}; +use util::uint::{Uint, U256}; +use util::hash::{Address}; +use util::table::*; +use ethcore::transaction::*; + + +#[derive(Clone, Debug)] +struct VerifiedTransaction { + tx: SignedTransaction, + nonce_height: U256 +} + +impl VerifiedTransaction { + pub fn new(tx: SignedTransaction, nonce_height: U256) -> VerifiedTransaction { + VerifiedTransaction { + tx: tx, + nonce_height: nonce_height + } + } + + pub fn sender(&self) -> Address { + self.tx.sender().unwrap() + } +} + +impl Eq for VerifiedTransaction {} +impl PartialEq for VerifiedTransaction { + fn eq(&self, other: &VerifiedTransaction) -> bool { + self.cmp(other) == Ordering::Equal + } +} +impl PartialOrd for VerifiedTransaction { + fn partial_cmp(&self, other: &VerifiedTransaction) -> Option { + Some(self.cmp(other)) + } +} +impl Ord for VerifiedTransaction { + fn cmp(&self, b: &VerifiedTransaction) -> Ordering { + // First check nonce_height + if self.nonce_height != b.nonce_height { + return self.nonce_height.cmp(&b.nonce_height); + } + + // Then compare gas_prices + let a_gas = self.tx.gas_price; + let b_gas = b.tx.gas_price; + if a_gas != b_gas { + return a_gas.cmp(&b_gas); + } + + // Compare nonce + let a_nonce = self.tx.nonce; + let b_nonce = b.tx.nonce; + if a_nonce != b_nonce { + return a_nonce.cmp(&b_nonce); + } + + // and senders + let a_sender = self.sender(); + let b_sender = b.sender(); + a_sender.cmp(&b_sender) + } +} + +struct TransactionsByPriorityAndAddress { + priority: BTreeSet, + address: Table, + limit: usize, +} + +impl TransactionsByPriorityAndAddress { + fn insert(&mut self, address: Address, nonce: U256, verified_tx: VerifiedTransaction) { + self.priority.insert(verified_tx.clone()); + self.address.insert(address, nonce, verified_tx); + } + + fn enforce_limit(&mut self) { + let len = self.priority.len(); + if len <= self.limit { + return; + } + + let to_remove : Vec = { + self.priority + .iter() + .skip(self.limit) + .map(|v_tx| v_tx.tx.clone()) + .collect() + }; + + for tx in to_remove { + self.remove(&tx); + } + } + + fn remove_by_address(&mut self, sender: &Address, nonce: &U256) -> Option { + if let Some(verified_tx) = self.address.remove(sender, nonce) { + self.priority.remove(&verified_tx); + return Some(verified_tx); + } + None + } + + fn remove(&mut self, tx: &SignedTransaction) -> Option { + // First find the transaction by address + let address = tx.sender().unwrap(); + self.remove_by_address(&address, &tx.nonce) + } + + fn clear(&mut self) { + self.priority.clear(); + self.address.clear(); + } +} + +#[derive(Debug)] +/// Current status of the queue +pub struct TransactionQueueStatus { + /// Number of pending transactions (ready to go to block) + pub pending: usize, + /// Number of future transactions (waiting for transactions with lower nonces first) + pub future: usize, +} + +/// TransactionQueue implementation +pub struct TransactionQueue { + /// Priority queue for transactions that can go to block + current: TransactionsByPriorityAndAddress, + /// Priority queue for transactions that has been received but are not yet valid to go to block + future: TransactionsByPriorityAndAddress, + /// Last nonce of transaction in current + last_nonces: HashMap, + /// First nonce of transaction in current (used to determine priority) + first_nonces: HashMap, +} + +impl TransactionQueue { + /// Creates new instance of this Queue + pub fn new() -> Self { + Self::with_limits(1024, 1024) + } + + /// Create new instance of this Queue with specified limits + pub fn with_limits(current_limit: usize, future_limit: usize) -> Self { + let current = TransactionsByPriorityAndAddress { + address: Table::new(), + priority: BTreeSet::new(), + limit: current_limit, + }; + let future = TransactionsByPriorityAndAddress { + address: Table::new(), + priority: BTreeSet::new(), + limit: future_limit, + }; + + TransactionQueue { + current: current, + future: future, + last_nonces: HashMap::new(), + first_nonces: HashMap::new(), + } + } + + /// Returns current status for this queue + pub fn status(&self) -> TransactionQueueStatus { + TransactionQueueStatus { + pending: self.current.priority.len(), + future: self.future.priority.len(), + } + } + + /// Adds all signed transactions to queue to be verified and imported + pub fn add_all(&mut self, txs: Vec, fetch_nonce: T) + where T: Fn(&Address) -> U256 { + for tx in txs.into_iter() { + self.add(tx, &fetch_nonce); + } + } + + /// Add signed transaction to queue to be verified and imported + pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T) + where T: Fn(&Address) -> U256 { + self.import_tx(tx, fetch_nonce); + } + + /// Removes all transactions in given slice + /// + /// If gap is introduced marks subsequent transactions as future + pub fn remove_all(&mut self, txs: &[SignedTransaction]) { + for tx in txs { + self.remove(&tx); + } + } + + /// Removes transaction from queue. + /// + /// If gap is introduced marks subsequent transactions as future + pub fn remove(&mut self, tx: &SignedTransaction) { + // Remove from current + let removed = self.current.remove(tx); + if let Some(verified_tx) = removed { + let sender = verified_tx.sender(); + + // Are there any other transactions from this sender? + if !self.current.address.has_row(&sender) { + // Clear last & first nonces + self.last_nonces.remove(&sender); + self.first_nonces.remove(&sender); + return; + } + + // Let's find those with higher nonce (TODO [todr] optimize?) + let to_move_to_future = { + let row_map = self.current.address.row(&sender).unwrap(); + let tx_nonce = verified_tx.tx.nonce; + let mut to_future = Vec::new(); + let mut highest = U256::zero(); + let mut lowest = tx_nonce.clone(); + + // Search nonces to remove and track lowest and highest + for (nonce, _) in row_map.iter() { + if nonce > &tx_nonce { + to_future.push(nonce.clone()); + } else if nonce > &highest { + highest = nonce.clone(); + } else if nonce < &lowest { + lowest = nonce.clone(); + } + } + + // Update first_nonces and last_nonces + if highest == U256::zero() { + self.last_nonces.remove(&sender); + } else { + self.last_nonces.insert(sender.clone(), highest); + } + + if lowest == tx_nonce { + self.first_nonces.remove(&sender); + } else { + self.first_nonces.insert(sender.clone(), lowest); + } + + // return to future + to_future + }; + + for k in to_move_to_future { + if let Some(v) = self.current.remove_by_address(&sender, &k) { + self.future.insert(sender.clone(), v.tx.nonce, v); + } + } + self.future.enforce_limit(); + return; + } + + // Remove from future + { + let sender = tx.sender().unwrap(); + if let Some(_) = self.future.remove_by_address(&sender, &tx.nonce) { + return; + } + } + } + + /// Returns top transactions from the queue + pub fn top_transactions(&self, size: usize) -> Vec { + self.current.priority + .iter() + .take(size) + .map(|t| t.tx.clone()).collect() + } + + /// Removes all elements (in any state) from the queue + pub fn clear(&mut self) { + self.current.clear(); + self.future.clear(); + self.last_nonces.clear(); + self.first_nonces.clear(); + } + + fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { + let mut current_nonce = current_nonce + U256::one(); + { + let txs_by_nonce = self.future.address.row_mut(&address); + if let None = txs_by_nonce { + return None; + } + let mut txs_by_nonce = txs_by_nonce.unwrap(); + + while let Some(tx) = txs_by_nonce.remove(¤t_nonce) { + // remove also from priority + self.future.priority.remove(&tx); + // Put to current + let height = current_nonce - first_nonce; + let verified_tx = VerifiedTransaction::new(tx.tx, U256::from(height)); + self.current.insert(address.clone(), verified_tx.tx.nonce, verified_tx); + current_nonce = current_nonce + U256::one(); + } + } + self.future.address.clear_if_empty(&address); + // Returns last inserted nonce + Some(current_nonce - U256::one()) + } + + fn import_tx(&mut self, tx: SignedTransaction, fetch_nonce: &T) + where T: Fn(&Address) -> U256 { + let nonce = tx.nonce; + let address = tx.sender().unwrap(); + + let next_nonce = U256::one() + self.last_nonces + .get(&address) + .cloned() + .unwrap_or_else(|| fetch_nonce(&address)); + + // Check height + if nonce > next_nonce { + let height = nonce - next_nonce; + let verified_tx = VerifiedTransaction::new(tx, height); + // We have a gap - put to future + self.future.insert(address, nonce, verified_tx); + self.future.enforce_limit(); + return; + } else if next_nonce > nonce { + // Droping transaction + return; + } + + let first_nonce = self.first_nonces + .get(&address) + .cloned() + .unwrap_or_else(|| nonce.clone()); + + let height = nonce - first_nonce; + let verified_tx = VerifiedTransaction::new(tx, height); + // Insert to current + self.current.insert(address.clone(), nonce, verified_tx); + // But maybe there are some more items waiting in future? + let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce); + self.first_nonces.insert(address.clone(), first_nonce); + self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); + // Enforce limit + self.current.enforce_limit(); + } +} + +#[cfg(test)] +mod test { + extern crate rustc_serialize; + use self::rustc_serialize::hex::FromHex; + + use util::crypto::KeyPair; + use util::uint::{U256, Uint}; + use util::hash::{Address}; + use ethcore::transaction::*; + use super::*; + + fn new_unsigned_tx(nonce: U256) -> Transaction { + Transaction { + action: Action::Create, + value: U256::from(100), + data: "3331600055".from_hex().unwrap(), + gas: U256::from(100_000), + gas_price: U256::one(), + nonce: nonce + } + } + + fn new_tx() -> SignedTransaction { + let keypair = KeyPair::create().unwrap(); + new_unsigned_tx(U256::from(123)).sign(&keypair.secret()) + } + + fn default_nonce(_address: &Address) -> U256 { + U256::from(122) + } + + fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) { + let keypair = KeyPair::create().unwrap(); + let secret = &keypair.secret(); + let nonce = U256::from(123); + let tx = new_unsigned_tx(nonce); + let tx2 = new_unsigned_tx(nonce + second_nonce); + + (tx.sign(secret), tx2.sign(secret)) + } + + #[test] + fn should_import_tx() { + // given + let mut txq = TransactionQueue::new(); + let tx = new_tx(); + + // when + txq.add(tx, &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 1); + } + + #[test] + fn should_import_txs_from_same_sender() { + // given + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(1)); + + // when + txq.add(tx.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce); + + // then + let top = txq.top_transactions(5); + assert_eq!(top[0], tx); + assert_eq!(top[1], tx2); + assert_eq!(top.len(), 2); + } + + #[test] + fn should_put_transaction_to_futures_if_gap_detected() { + // given + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(2)); + + // when + txq.add(tx.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 1); + assert_eq!(stats.future, 1); + let top = txq.top_transactions(5); + assert_eq!(top.len(), 1); + assert_eq!(top[0], tx); + } + + #[test] + fn should_move_transactions_if_gap_filled() { + // given + let mut txq = TransactionQueue::new(); + let kp = KeyPair::create().unwrap(); + let secret = kp.secret(); + let tx = new_unsigned_tx(U256::from(123)).sign(&secret); + let tx1 = new_unsigned_tx(U256::from(124)).sign(&secret); + let tx2 = new_unsigned_tx(U256::from(125)).sign(&secret); + + txq.add(tx, &default_nonce); + assert_eq!(txq.status().pending, 1); + txq.add(tx2, &default_nonce); + assert_eq!(txq.status().future, 1); + + // when + txq.add(tx1, &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 3); + assert_eq!(stats.future, 0); + } + + #[test] + fn should_remove_transaction() { + // given + let mut txq2 = TransactionQueue::new(); + let (tx, tx2) = new_txs(U256::from(3)); + txq2.add(tx.clone(), &default_nonce); + txq2.add(tx2.clone(), &default_nonce); + assert_eq!(txq2.status().pending, 1); + assert_eq!(txq2.status().future, 1); + + // when + txq2.remove(&tx); + txq2.remove(&tx2); + + + // then + let stats = txq2.status(); + assert_eq!(stats.pending, 0); + assert_eq!(stats.future, 0); + } + + #[test] + fn should_move_transactions_to_future_if_gap_introduced() { + // given + let mut txq = TransactionQueue::new(); + let (tx, tx2) = new_txs(U256::from(1)); + let tx3 = new_tx(); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx3.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce); + assert_eq!(txq.status().pending, 3); + + // when + txq.remove(&tx); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 1); + assert_eq!(stats.pending, 1); + } + + #[test] + fn should_clear_queue() { + // given + let mut txq = TransactionQueue::new(); + let (tx, tx2) = new_txs(U256::one()); + + // add + txq.add(tx2.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce); + let stats = txq.status(); + assert_eq!(stats.pending, 2); + + // when + txq.clear(); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 0); + } + + #[test] + fn should_drop_old_transactions_when_hitting_the_limit() { + // given + let mut txq = TransactionQueue::with_limits(1, 1); + let (tx, tx2) = new_txs(U256::one()); + txq.add(tx.clone(), &default_nonce); + assert_eq!(txq.status().pending, 1); + + // when + txq.add(tx2.clone(), &default_nonce); + + // then + let t = txq.top_transactions(2); + assert_eq!(txq.status().pending, 1); + assert_eq!(t.len(), 1); + assert_eq!(t[0], tx); + } + + #[test] + fn should_limit_future_transactions() { + let mut txq = TransactionQueue::with_limits(10, 1); + let (tx1, tx2) = new_txs(U256::from(4)); + let (tx3, tx4) = new_txs(U256::from(4)); + txq.add(tx1.clone(), &default_nonce); + txq.add(tx3.clone(), &default_nonce); + assert_eq!(txq.status().pending, 2); + + // when + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx4.clone(), &default_nonce); + + // then + assert_eq!(txq.status().future, 1); + } + + #[test] + fn should_drop_transactions_with_old_nonces() { + let mut txq = TransactionQueue::new(); + let tx = new_tx(); + let last_nonce = tx.nonce.clone(); + let fetch_last_nonce = |_a: &Address| last_nonce; + + // when + txq.add(tx, &fetch_last_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 0); + assert_eq!(stats.future, 0); + } + + #[test] + fn should_accept_same_transaction_twice() { + // given + let mut txq = TransactionQueue::new(); + let (tx1, tx2) = new_txs(U256::from(1)); + txq.add(tx1.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().pending, 2); + + // when + txq.remove(&tx1); + assert_eq!(txq.status().future, 1); + txq.add(tx1.clone(), &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 2); + assert_eq!(stats.future, 0); + + } + +} From fc9999fb059576cb040e3d0d8e2526f693412802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 2 Mar 2016 21:26:48 +0100 Subject: [PATCH 2/5] Changing uint to numbers --- sync/src/transaction_queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 341607afe..f2e15f955 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -21,7 +21,7 @@ use std::vec::Vec; use std::cmp::{Ordering}; use std::collections::{HashMap, BTreeSet}; -use util::uint::{Uint, U256}; +use util::numbers::{Uint, U256}; use util::hash::{Address}; use util::table::*; use ethcore::transaction::*; @@ -373,7 +373,7 @@ mod test { use self::rustc_serialize::hex::FromHex; use util::crypto::KeyPair; - use util::uint::{U256, Uint}; + use util::numbers::{U256, Uint}; use util::hash::{Address}; use ethcore::transaction::*; use super::*; From 8542d651ae8714de4f79342ba9f999dcac8b0988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 11:45:20 +0100 Subject: [PATCH 3/5] Refactoring transactions queue to avoid cloning transactions --- sync/src/transaction_queue.rs | 388 ++++++++++++++++++++-------------- 1 file changed, 224 insertions(+), 164 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index f2e15f955..1bbfbe36d 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -18,121 +18,126 @@ //! Transaction Queue -use std::vec::Vec; use std::cmp::{Ordering}; use std::collections::{HashMap, BTreeSet}; use util::numbers::{Uint, U256}; -use util::hash::{Address}; +use util::hash::{Address, H256}; use util::table::*; use ethcore::transaction::*; #[derive(Clone, Debug)] -struct VerifiedTransaction { - tx: SignedTransaction, - nonce_height: U256 +struct TransactionOrder { + nonce_height: U256, + gas_price: U256, + hash: H256, } -impl VerifiedTransaction { - pub fn new(tx: SignedTransaction, nonce_height: U256) -> VerifiedTransaction { - VerifiedTransaction { - tx: tx, - nonce_height: nonce_height +impl TransactionOrder { + pub fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self { + TransactionOrder { + nonce_height: tx.nonce() - base_nonce, + gas_price: tx.transaction.gas_price, + hash: tx.hash(), } } - - pub fn sender(&self) -> Address { - self.tx.sender().unwrap() - } } -impl Eq for VerifiedTransaction {} -impl PartialEq for VerifiedTransaction { - fn eq(&self, other: &VerifiedTransaction) -> bool { +impl Eq for TransactionOrder {} +impl PartialEq for TransactionOrder { + fn eq(&self, other: &TransactionOrder) -> bool { self.cmp(other) == Ordering::Equal } } -impl PartialOrd for VerifiedTransaction { - fn partial_cmp(&self, other: &VerifiedTransaction) -> Option { +impl PartialOrd for TransactionOrder { + fn partial_cmp(&self, other: &TransactionOrder) -> Option { Some(self.cmp(other)) } } -impl Ord for VerifiedTransaction { - fn cmp(&self, b: &VerifiedTransaction) -> Ordering { +impl Ord for TransactionOrder { + fn cmp(&self, b: &TransactionOrder) -> Ordering { // First check nonce_height if self.nonce_height != b.nonce_height { return self.nonce_height.cmp(&b.nonce_height); } // Then compare gas_prices - let a_gas = self.tx.gas_price; - let b_gas = b.tx.gas_price; + let a_gas = self.gas_price; + let b_gas = b.gas_price; if a_gas != b_gas { return a_gas.cmp(&b_gas); } - // Compare nonce - let a_nonce = self.tx.nonce; - let b_nonce = b.tx.nonce; - if a_nonce != b_nonce { - return a_nonce.cmp(&b_nonce); - } - - // and senders - let a_sender = self.sender(); - let b_sender = b.sender(); - a_sender.cmp(&b_sender) + // Compare hashes + self.hash.cmp(&b.hash) } } -struct TransactionsByPriorityAndAddress { - priority: BTreeSet, - address: Table, +struct VerifiedTransaction { + transaction: SignedTransaction +} +impl VerifiedTransaction { + fn new(transaction: SignedTransaction) -> Self { + VerifiedTransaction { + transaction: transaction + } + } + + fn hash(&self) -> H256 { + self.transaction.hash() + } + + fn nonce(&self) -> U256 { + self.transaction.nonce + } + + fn sender(&self) -> Address { + self.transaction.sender().unwrap() + } +} + +struct TransactionSet { + by_priority: BTreeSet, + by_address: Table, limit: usize, } -impl TransactionsByPriorityAndAddress { - fn insert(&mut self, address: Address, nonce: U256, verified_tx: VerifiedTransaction) { - self.priority.insert(verified_tx.clone()); - self.address.insert(address, nonce, verified_tx); +impl TransactionSet { + fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) { + self.by_priority.insert(order.clone()); + self.by_address.insert(sender, nonce, order); } - fn enforce_limit(&mut self) { - let len = self.priority.len(); + fn enforce_limit(&mut self, by_hash: &HashMap) { + let len = self.by_priority.len(); if len <= self.limit { return; } - let to_remove : Vec = { - self.priority + let to_drop : Vec<&VerifiedTransaction> = { + self.by_priority .iter() .skip(self.limit) - .map(|v_tx| v_tx.tx.clone()) + .map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected.")) .collect() }; - for tx in to_remove { - self.remove(&tx); + for tx in to_drop { + self.drop(&tx.sender(), &tx.nonce()); } } - fn remove_by_address(&mut self, sender: &Address, nonce: &U256) -> Option { - if let Some(verified_tx) = self.address.remove(sender, nonce) { - self.priority.remove(&verified_tx); - return Some(verified_tx); + fn drop(&mut self, sender: &Address, nonce: &U256) -> Option { + if let Some(tx_order) = self.by_address.remove(sender, nonce) { + self.by_priority.remove(&tx_order); + return Some(tx_order); } None } - fn remove(&mut self, tx: &SignedTransaction) -> Option { - // First find the transaction by address - let address = tx.sender().unwrap(); - self.remove_by_address(&address, &tx.nonce) - } - fn clear(&mut self) { - self.priority.clear(); - self.address.clear(); + self.by_priority.clear(); + self.by_address.clear(); } } @@ -148,9 +153,11 @@ pub struct TransactionQueueStatus { /// TransactionQueue implementation pub struct TransactionQueue { /// Priority queue for transactions that can go to block - current: TransactionsByPriorityAndAddress, + current: TransactionSet, /// Priority queue for transactions that has been received but are not yet valid to go to block - future: TransactionsByPriorityAndAddress, + future: TransactionSet, + /// All transactions managed by queue indexed by hash + by_hash: HashMap, /// Last nonce of transaction in current last_nonces: HashMap, /// First nonce of transaction in current (used to determine priority) @@ -165,20 +172,21 @@ impl TransactionQueue { /// Create new instance of this Queue with specified limits pub fn with_limits(current_limit: usize, future_limit: usize) -> Self { - let current = TransactionsByPriorityAndAddress { - address: Table::new(), - priority: BTreeSet::new(), + let current = TransactionSet { + by_priority: BTreeSet::new(), + by_address: Table::new(), limit: current_limit, }; - let future = TransactionsByPriorityAndAddress { - address: Table::new(), - priority: BTreeSet::new(), + let future = TransactionSet { + by_priority: BTreeSet::new(), + by_address: Table::new(), limit: future_limit, }; TransactionQueue { current: current, future: future, + by_hash: HashMap::new(), last_nonces: HashMap::new(), first_nonces: HashMap::new(), } @@ -187,8 +195,8 @@ impl TransactionQueue { /// Returns current status for this queue pub fn status(&self) -> TransactionQueueStatus { TransactionQueueStatus { - pending: self.current.priority.len(), - future: self.future.priority.len(), + pending: self.current.by_priority.len(), + future: self.future.by_priority.len(), } } @@ -203,101 +211,107 @@ impl TransactionQueue { /// Add signed transaction to queue to be verified and imported pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { - self.import_tx(tx, fetch_nonce); + self.import_tx(VerifiedTransaction::new(tx), fetch_nonce); } - /// Removes all transactions in given slice + /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, txs: &[SignedTransaction]) { + pub fn remove_all(&mut self, txs: &[H256]) { for tx in txs { self.remove(&tx); } } - /// Removes transaction from queue. + /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, tx: &SignedTransaction) { + pub fn remove(&mut self, hash: &H256) { + let transaction = self.by_hash.remove(hash); + if transaction.is_none() { + // We don't know this transaction + return; + } + let transaction = transaction.unwrap(); + let sender = transaction.sender(); + let nonce = transaction.nonce(); + + // Remove from future + self.future.drop(&sender, &nonce); + // Remove from current - let removed = self.current.remove(tx); - if let Some(verified_tx) = removed { - let sender = verified_tx.sender(); - - // Are there any other transactions from this sender? - if !self.current.address.has_row(&sender) { - // Clear last & first nonces - self.last_nonces.remove(&sender); - self.first_nonces.remove(&sender); - return; - } - - // Let's find those with higher nonce (TODO [todr] optimize?) - let to_move_to_future = { - let row_map = self.current.address.row(&sender).unwrap(); - let tx_nonce = verified_tx.tx.nonce; - let mut to_future = Vec::new(); - let mut highest = U256::zero(); - let mut lowest = tx_nonce.clone(); - - // Search nonces to remove and track lowest and highest - for (nonce, _) in row_map.iter() { - if nonce > &tx_nonce { - to_future.push(nonce.clone()); - } else if nonce > &highest { - highest = nonce.clone(); - } else if nonce < &lowest { - lowest = nonce.clone(); - } - } - - // Update first_nonces and last_nonces - if highest == U256::zero() { - self.last_nonces.remove(&sender); - } else { - self.last_nonces.insert(sender.clone(), highest); - } - - if lowest == tx_nonce { - self.first_nonces.remove(&sender); - } else { - self.first_nonces.insert(sender.clone(), lowest); - } - - // return to future - to_future - }; - - for k in to_move_to_future { - if let Some(v) = self.current.remove_by_address(&sender, &k) { - self.future.insert(sender.clone(), v.tx.nonce, v); - } - } - self.future.enforce_limit(); + let order = self.current.drop(&sender, &nonce); + if order.is_none() { return; } - // Remove from future - { - let sender = tx.sender().unwrap(); - if let Some(_) = self.future.remove_by_address(&sender, &tx.nonce) { - return; + // Are there any other transactions from this sender? + if !self.current.by_address.has_row(&sender) { + // Clear last & first nonces + self.last_nonces.remove(&sender); + self.first_nonces.remove(&sender); + return; + } + + // Let's find those with higher nonce (TODO [todr] optimize?) + let to_move_to_future = { + let row_map = self.current.by_address.row(&sender).unwrap(); + let mut to_future = Vec::new(); + let mut highest = U256::zero(); + let mut lowest = nonce.clone(); + + // Search nonces to remove and track lowest and highest + for (current_nonce, _) in row_map.iter() { + if current_nonce > &nonce { + to_future.push(current_nonce.clone()); + } else if current_nonce > &highest { + highest = current_nonce.clone(); + } else if current_nonce < &lowest { + lowest = current_nonce.clone(); + } + } + + // Update first_nonces and last_nonces + if highest == U256::zero() { + self.last_nonces.remove(&sender); + } else { + self.last_nonces.insert(sender.clone(), highest); + } + + if lowest == nonce { + self.first_nonces.remove(&sender); + } else { + self.first_nonces.insert(sender.clone(), lowest); + } + + // return to future + to_future + }; + + for k in to_move_to_future { + if let Some(v) = self.current.drop(&sender, &k) { + // TODO [todr] Recalculate height? + self.future.insert(sender.clone(), k, v); } } + self.future.enforce_limit(&self.by_hash); } /// Returns top transactions from the queue pub fn top_transactions(&self, size: usize) -> Vec { - self.current.priority + self.current.by_priority .iter() .take(size) - .map(|t| t.tx.clone()).collect() + .map(|t| self.by_hash.get(&t.hash).expect("Transaction Queue Inconsistency")) + .map(|t| t.transaction.clone()) + .collect() } /// Removes all elements (in any state) from the queue pub fn clear(&mut self) { self.current.clear(); self.future.clear(); + self.by_hash.clear(); self.last_nonces.clear(); self.first_nonces.clear(); } @@ -305,31 +319,30 @@ impl TransactionQueue { fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { let mut current_nonce = current_nonce + U256::one(); { - let txs_by_nonce = self.future.address.row_mut(&address); - if let None = txs_by_nonce { + let by_nonce = self.future.by_address.row_mut(&address); + if let None = by_nonce { return None; } - let mut txs_by_nonce = txs_by_nonce.unwrap(); - - while let Some(tx) = txs_by_nonce.remove(¤t_nonce) { - // remove also from priority - self.future.priority.remove(&tx); + let mut by_nonce = by_nonce.unwrap(); + while let Some(order) = by_nonce.remove(¤t_nonce) { + // remove also from priority and hash + self.future.by_priority.remove(&order); // Put to current - let height = current_nonce - first_nonce; - let verified_tx = VerifiedTransaction::new(tx.tx, U256::from(height)); - self.current.insert(address.clone(), verified_tx.tx.nonce, verified_tx); + let transaction = self.by_hash.get(&order.hash).expect("TransactionQueue Inconsistency"); + let order = TransactionOrder::for_transaction(transaction, first_nonce); + self.current.insert(address.clone(), transaction.nonce(), order); current_nonce = current_nonce + U256::one(); } } - self.future.address.clear_if_empty(&address); + self.future.by_address.clear_if_empty(&address); // Returns last inserted nonce Some(current_nonce - U256::one()) } - fn import_tx(&mut self, tx: SignedTransaction, fetch_nonce: &T) + fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { - let nonce = tx.nonce; - let address = tx.sender().unwrap(); + let nonce = tx.nonce(); + let address = tx.sender(); let next_nonce = U256::one() + self.last_nonces .get(&address) @@ -338,11 +351,12 @@ impl TransactionQueue { // Check height if nonce > next_nonce { - let height = nonce - next_nonce; - let verified_tx = VerifiedTransaction::new(tx, height); + let order = TransactionOrder::for_transaction(&tx, next_nonce); + // Insert to by_hash + self.by_hash.insert(tx.hash(), tx); // We have a gap - put to future - self.future.insert(address, nonce, verified_tx); - self.future.enforce_limit(); + self.future.insert(address, nonce, order); + self.future.enforce_limit(&self.by_hash); return; } else if next_nonce > nonce { // Droping transaction @@ -354,29 +368,34 @@ impl TransactionQueue { .cloned() .unwrap_or_else(|| nonce.clone()); - let height = nonce - first_nonce; - let verified_tx = VerifiedTransaction::new(tx, height); + let order = TransactionOrder::for_transaction(&tx, first_nonce); + // Insert to by_hash + self.by_hash.insert(tx.hash(), tx); + // Insert to current - self.current.insert(address.clone(), nonce, verified_tx); + self.current.insert(address.clone(), nonce, order); // But maybe there are some more items waiting in future? let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce); self.first_nonces.insert(address.clone(), first_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit - self.current.enforce_limit(); + self.current.enforce_limit(&self.by_hash); } } + #[cfg(test)] mod test { extern crate rustc_serialize; use self::rustc_serialize::hex::FromHex; - + use std::collections::{HashMap, BTreeSet}; use util::crypto::KeyPair; use util::numbers::{U256, Uint}; use util::hash::{Address}; + use util::table::*; use ethcore::transaction::*; use super::*; + use super::{TransactionSet, TransactionOrder, VerifiedTransaction}; fn new_unsigned_tx(nonce: U256) -> Transaction { Transaction { @@ -408,6 +427,46 @@ mod test { (tx.sign(secret), tx2.sign(secret)) } + #[test] + fn should_create_transaction_set() { + // given + let mut set = TransactionSet { + by_priority: BTreeSet::new(), + by_address: Table::new(), + limit: 1 + }; + let (tx1, tx2) = new_txs(U256::from(1)); + let tx1 = VerifiedTransaction::new(tx1); + let tx2 = VerifiedTransaction::new(tx2); + let by_hash = { + let mut x = HashMap::new(); + let tx1 = VerifiedTransaction::new(tx1.transaction.clone()); + let tx2 = VerifiedTransaction::new(tx2.transaction.clone()); + x.insert(tx1.hash(), tx1); + x.insert(tx2.hash(), tx2); + x + }; + // Insert both transactions + let order1 = TransactionOrder::for_transaction(&tx1, U256::zero()); + set.insert(tx1.sender(), tx1.nonce(), order1.clone()); + let order2 = TransactionOrder::for_transaction(&tx2, U256::zero()); + set.insert(tx2.sender(), tx2.nonce(), order2.clone()); + assert_eq!(set.by_priority.len(), 2); + assert_eq!(set.by_address.len(), 2); + + // when + set.enforce_limit(&by_hash); + + // then + assert_eq!(set.by_priority.len(), 1); + assert_eq!(set.by_address.len(), 1); + assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1); + set.clear(); + assert_eq!(set.by_priority.len(), 0); + assert_eq!(set.by_address.len(), 0); + } + + #[test] fn should_import_tx() { // given @@ -495,8 +554,8 @@ mod test { assert_eq!(txq2.status().future, 1); // when - txq2.remove(&tx); - txq2.remove(&tx2); + txq2.remove(&tx.hash()); + txq2.remove(&tx2.hash()); // then @@ -518,7 +577,7 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx); + txq.remove(&tx.hash()); // then let stats = txq.status(); @@ -608,14 +667,15 @@ mod test { assert_eq!(txq.status().pending, 2); // when - txq.remove(&tx1); + txq.remove(&tx1.hash()); + assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); txq.add(tx1.clone(), &default_nonce); // then let stats = txq.status(); - assert_eq!(stats.pending, 2); assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); } From b320ff46020ff8a3f517901969d17a28fd040de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 15:02:11 +0100 Subject: [PATCH 4/5] Getting rid of first_nonces (we can fetch it from state) --- sync/src/transaction_queue.rs | 76 ++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 1bbfbe36d..a12d75ed4 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -158,10 +158,8 @@ pub struct TransactionQueue { future: TransactionSet, /// All transactions managed by queue indexed by hash by_hash: HashMap, - /// Last nonce of transaction in current + /// Last nonce of transaction in current (to quickly check next expected transaction) last_nonces: HashMap, - /// First nonce of transaction in current (used to determine priority) - first_nonces: HashMap, } impl TransactionQueue { @@ -188,7 +186,6 @@ impl TransactionQueue { future: future, by_hash: HashMap::new(), last_nonces: HashMap::new(), - first_nonces: HashMap::new(), } } @@ -217,16 +214,18 @@ impl TransactionQueue { /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, txs: &[H256]) { + pub fn remove_all(&mut self, txs: &[H256], fetch_nonce: T) + where T: Fn(&Address) -> U256 { for tx in txs { - self.remove(&tx); + self.remove(&tx, &fetch_nonce); } } /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, hash: &H256) { + pub fn remove(&mut self, hash: &H256, fetch_nonce: &T) + where T: Fn(&Address) -> U256 { let transaction = self.by_hash.remove(hash); if transaction.is_none() { // We don't know this transaction @@ -249,11 +248,10 @@ impl TransactionQueue { if !self.current.by_address.has_row(&sender) { // Clear last & first nonces self.last_nonces.remove(&sender); - self.first_nonces.remove(&sender); return; } - // Let's find those with higher nonce (TODO [todr] optimize?) + // Let's find those with higher nonce that should be moved to future (TODO [todr] optimize?) let to_move_to_future = { let row_map = self.current.by_address.row(&sender).unwrap(); let mut to_future = Vec::new(); @@ -271,23 +269,17 @@ impl TransactionQueue { } } - // Update first_nonces and last_nonces + // Update last_nonces if highest == U256::zero() { self.last_nonces.remove(&sender); } else { self.last_nonces.insert(sender.clone(), highest); } - if lowest == nonce { - self.first_nonces.remove(&sender); - } else { - self.first_nonces.insert(sender.clone(), lowest); - } - - // return to future to_future }; + // Move to future for k in to_move_to_future { if let Some(v) = self.current.drop(&sender, &k) { // TODO [todr] Recalculate height? @@ -295,6 +287,8 @@ impl TransactionQueue { } } self.future.enforce_limit(&self.by_hash); + + // But maybe some transactions } /// Returns top transactions from the queue @@ -313,7 +307,6 @@ impl TransactionQueue { self.future.clear(); self.by_hash.clear(); self.last_nonces.clear(); - self.first_nonces.clear(); } fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { @@ -344,9 +337,10 @@ impl TransactionQueue { let nonce = tx.nonce(); let address = tx.sender(); - let next_nonce = U256::one() + self.last_nonces + let next_nonce = self.last_nonces .get(&address) .cloned() + .map(|n| n + U256::one()) .unwrap_or_else(|| fetch_nonce(&address)); // Check height @@ -363,20 +357,15 @@ impl TransactionQueue { return; } - let first_nonce = self.first_nonces - .get(&address) - .cloned() - .unwrap_or_else(|| nonce.clone()); - - let order = TransactionOrder::for_transaction(&tx, first_nonce); + let base_nonce = fetch_nonce(&address); + let order = TransactionOrder::for_transaction(&tx, base_nonce); // Insert to by_hash self.by_hash.insert(tx.hash(), tx); // Insert to current self.current.insert(address.clone(), nonce, order); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce); - self.first_nonces.insert(address.clone(), first_nonce); + let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit self.current.enforce_limit(&self.by_hash); @@ -414,7 +403,7 @@ mod test { } fn default_nonce(_address: &Address) -> U256 { - U256::from(122) + U256::from(123) } fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) { @@ -554,8 +543,8 @@ mod test { assert_eq!(txq2.status().future, 1); // when - txq2.remove(&tx.hash()); - txq2.remove(&tx2.hash()); + txq2.remove(&tx.hash(), &default_nonce); + txq2.remove(&tx2.hash(), &default_nonce); // then @@ -577,7 +566,7 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx.hash()); + txq.remove(&tx.hash(), &default_nonce); // then let stats = txq.status(); @@ -645,7 +634,7 @@ mod test { fn should_drop_transactions_with_old_nonces() { let mut txq = TransactionQueue::new(); let tx = new_tx(); - let last_nonce = tx.nonce.clone(); + let last_nonce = tx.nonce.clone() + U256::one(); let fetch_last_nonce = |_a: &Address| last_nonce; // when @@ -667,7 +656,7 @@ mod test { assert_eq!(txq.status().pending, 2); // when - txq.remove(&tx1.hash()); + txq.remove(&tx1.hash(), &default_nonce); assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); txq.add(tx1.clone(), &default_nonce); @@ -676,7 +665,28 @@ mod test { let stats = txq.status(); assert_eq!(stats.future, 0); assert_eq!(stats.pending, 2); + } + #[test] + fn should_not_move_to_future_if_state_nonce_is_higher() { + // given + let next_nonce = |a: &Address| default_nonce(a) + U256::one(); + let mut txq = TransactionQueue::new(); + let (tx, tx2) = new_txs(U256::from(1)); + let tx3 = new_tx(); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx3.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce); + assert_eq!(txq.status().pending, 3); + + // when + txq.remove(&tx.hash(), &next_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); } } From 677c3996b9aca31debd3059b1e3e575dce99ffb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 16:09:05 +0100 Subject: [PATCH 5/5] Taking expected nonce from state into consideration when removing txs --- sync/src/transaction_queue.rs | 79 ++++++++++++++++------------------- 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index a12d75ed4..4f5622a2f 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -34,13 +34,18 @@ struct TransactionOrder { } impl TransactionOrder { - pub fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self { + fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self { TransactionOrder { nonce_height: tx.nonce() - base_nonce, gas_price: tx.transaction.gas_price, hash: tx.hash(), } } + + fn update_height(mut self, nonce: U256, base_nonce: U256) -> Self { + self.nonce_height = nonce - base_nonce; + self + } } impl Eq for TransactionOrder {} @@ -235,6 +240,7 @@ impl TransactionQueue { let sender = transaction.sender(); let nonce = transaction.nonce(); + println!("Removing tx: {:?}", transaction.transaction); // Remove from future self.future.drop(&sender, &nonce); @@ -244,51 +250,35 @@ impl TransactionQueue { return; } - // Are there any other transactions from this sender? - if !self.current.by_address.has_row(&sender) { - // Clear last & first nonces - self.last_nonces.remove(&sender); - return; - } + // Let's remove transactions where tx.nonce < current_nonce + // and if there are any future transactions matching current_nonce+1 - move to current + let current_nonce = fetch_nonce(&sender); + // We will either move transaction to future or remove it completely + // so there will be no transactions from this sender in current + self.last_nonces.remove(&sender); - // Let's find those with higher nonce that should be moved to future (TODO [todr] optimize?) - let to_move_to_future = { - let row_map = self.current.by_address.row(&sender).unwrap(); - let mut to_future = Vec::new(); - let mut highest = U256::zero(); - let mut lowest = nonce.clone(); - - // Search nonces to remove and track lowest and highest - for (current_nonce, _) in row_map.iter() { - if current_nonce > &nonce { - to_future.push(current_nonce.clone()); - } else if current_nonce > &highest { - highest = current_nonce.clone(); - } else if current_nonce < &lowest { - lowest = current_nonce.clone(); - } - } - - // Update last_nonces - if highest == U256::zero() { - self.last_nonces.remove(&sender); - } else { - self.last_nonces.insert(sender.clone(), highest); - } - - to_future + let all_nonces_from_sender = match self.current.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], }; - // Move to future - for k in to_move_to_future { - if let Some(v) = self.current.drop(&sender, &k) { - // TODO [todr] Recalculate height? - self.future.insert(sender.clone(), k, v); + for k in all_nonces_from_sender { + // Goes to future or is removed + let order = self.current.drop(&sender, &k).unwrap(); + if k >= current_nonce { + println!("Moving to future: {:?}", order); + self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + } else { + self.by_hash.remove(&order.hash); } } self.future.enforce_limit(&self.by_hash); - // But maybe some transactions + // And now lets check if there is some chain of transactions in future + // that should be placed in current + if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) { + self.last_nonces.insert(sender, new_current_top); + } } /// Returns top transactions from the queue @@ -310,6 +300,7 @@ impl TransactionQueue { } fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { + println!("Moving from future for: {:?} base: {:?}", current_nonce, first_nonce); let mut current_nonce = current_nonce + U256::one(); { let by_nonce = self.future.by_address.row_mut(&address); @@ -321,9 +312,9 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current - let transaction = self.by_hash.get(&order.hash).expect("TransactionQueue Inconsistency"); - let order = TransactionOrder::for_transaction(transaction, first_nonce); - self.current.insert(address.clone(), transaction.nonce(), order); + println!("Moved: {:?}", order); + let order = order.update_height(current_nonce.clone(), first_nonce); + self.current.insert(address.clone(), current_nonce, order); current_nonce = current_nonce + U256::one(); } } @@ -340,9 +331,9 @@ impl TransactionQueue { let next_nonce = self.last_nonces .get(&address) .cloned() - .map(|n| n + U256::one()) - .unwrap_or_else(|| fetch_nonce(&address)); + .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); + println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); // Check height if nonce > next_nonce { let order = TransactionOrder::for_transaction(&tx, next_nonce);