diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 1bbfbe36d..a12d75ed4 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -158,10 +158,8 @@ pub struct TransactionQueue { future: TransactionSet, /// All transactions managed by queue indexed by hash by_hash: HashMap, - /// Last nonce of transaction in current + /// Last nonce of transaction in current (to quickly check next expected transaction) last_nonces: HashMap, - /// First nonce of transaction in current (used to determine priority) - first_nonces: HashMap, } impl TransactionQueue { @@ -188,7 +186,6 @@ impl TransactionQueue { future: future, by_hash: HashMap::new(), last_nonces: HashMap::new(), - first_nonces: HashMap::new(), } } @@ -217,16 +214,18 @@ impl TransactionQueue { /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, txs: &[H256]) { + pub fn remove_all(&mut self, txs: &[H256], fetch_nonce: T) + where T: Fn(&Address) -> U256 { for tx in txs { - self.remove(&tx); + self.remove(&tx, &fetch_nonce); } } /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, hash: &H256) { + pub fn remove(&mut self, hash: &H256, fetch_nonce: &T) + where T: Fn(&Address) -> U256 { let transaction = self.by_hash.remove(hash); if transaction.is_none() { // We don't know this transaction @@ -249,11 +248,10 @@ impl TransactionQueue { if !self.current.by_address.has_row(&sender) { // Clear last & first nonces self.last_nonces.remove(&sender); - self.first_nonces.remove(&sender); return; } - // Let's find those with higher nonce (TODO [todr] optimize?) + // Let's find those with higher nonce that should be moved to future (TODO [todr] optimize?) let to_move_to_future = { let row_map = self.current.by_address.row(&sender).unwrap(); let mut to_future = Vec::new(); @@ -271,23 +269,17 @@ impl TransactionQueue { } } - // Update first_nonces and last_nonces + // Update last_nonces if highest == U256::zero() { self.last_nonces.remove(&sender); } else { self.last_nonces.insert(sender.clone(), highest); } - if lowest == nonce { - self.first_nonces.remove(&sender); - } else { - self.first_nonces.insert(sender.clone(), lowest); - } - - // return to future to_future }; + // Move to future for k in to_move_to_future { if let Some(v) = self.current.drop(&sender, &k) { // TODO [todr] Recalculate height? @@ -295,6 +287,8 @@ impl TransactionQueue { } } self.future.enforce_limit(&self.by_hash); + + // But maybe some transactions } /// Returns top transactions from the queue @@ -313,7 +307,6 @@ impl TransactionQueue { self.future.clear(); self.by_hash.clear(); self.last_nonces.clear(); - self.first_nonces.clear(); } fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { @@ -344,9 +337,10 @@ impl TransactionQueue { let nonce = tx.nonce(); let address = tx.sender(); - let next_nonce = U256::one() + self.last_nonces + let next_nonce = self.last_nonces .get(&address) .cloned() + .map(|n| n + U256::one()) .unwrap_or_else(|| fetch_nonce(&address)); // Check height @@ -363,20 +357,15 @@ impl TransactionQueue { return; } - let first_nonce = self.first_nonces - .get(&address) - .cloned() - .unwrap_or_else(|| nonce.clone()); - - let order = TransactionOrder::for_transaction(&tx, first_nonce); + let base_nonce = fetch_nonce(&address); + let order = TransactionOrder::for_transaction(&tx, base_nonce); // Insert to by_hash self.by_hash.insert(tx.hash(), tx); // Insert to current self.current.insert(address.clone(), nonce, order); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce); - self.first_nonces.insert(address.clone(), first_nonce); + let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit self.current.enforce_limit(&self.by_hash); @@ -414,7 +403,7 @@ mod test { } fn default_nonce(_address: &Address) -> U256 { - U256::from(122) + U256::from(123) } fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) { @@ -554,8 +543,8 @@ mod test { assert_eq!(txq2.status().future, 1); // when - txq2.remove(&tx.hash()); - txq2.remove(&tx2.hash()); + txq2.remove(&tx.hash(), &default_nonce); + txq2.remove(&tx2.hash(), &default_nonce); // then @@ -577,7 +566,7 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx.hash()); + txq.remove(&tx.hash(), &default_nonce); // then let stats = txq.status(); @@ -645,7 +634,7 @@ mod test { fn should_drop_transactions_with_old_nonces() { let mut txq = TransactionQueue::new(); let tx = new_tx(); - let last_nonce = tx.nonce.clone(); + let last_nonce = tx.nonce.clone() + U256::one(); let fetch_last_nonce = |_a: &Address| last_nonce; // when @@ -667,7 +656,7 @@ mod test { assert_eq!(txq.status().pending, 2); // when - txq.remove(&tx1.hash()); + txq.remove(&tx1.hash(), &default_nonce); assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); txq.add(tx1.clone(), &default_nonce); @@ -676,7 +665,28 @@ mod test { let stats = txq.status(); assert_eq!(stats.future, 0); assert_eq!(stats.pending, 2); + } + #[test] + fn should_not_move_to_future_if_state_nonce_is_higher() { + // given + let next_nonce = |a: &Address| default_nonce(a) + U256::one(); + let mut txq = TransactionQueue::new(); + let (tx, tx2) = new_txs(U256::from(1)); + let tx3 = new_tx(); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx3.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce); + assert_eq!(txq.status().pending, 3); + + // when + txq.remove(&tx.hash(), &next_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); } }