diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index e05210af2..24bb772d7 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -238,26 +238,50 @@ impl TransactionQueue { // We don't know this transaction return; } + let transaction = transaction.unwrap(); let sender = transaction.sender(); let nonce = transaction.nonce(); + let current_nonce = fetch_nonce(&sender); // Remove from future - self.future.drop(&sender, &nonce); - - // Remove from current - let order = self.current.drop(&sender, &nonce); - if order.is_none() { + let order = self.future.drop(&sender, &nonce); + if order.is_some() { + self.recalculate_future_for_sender(&sender, current_nonce); + // And now lets check if there is some chain of transactions in future + // that should be placed in current + self.move_future_txs(sender.clone(), current_nonce, current_nonce); return; } - // Let's remove transactions where tx.nonce < current_nonce - // and if there are any future transactions matching current_nonce+1 - move to current - let current_nonce = fetch_nonce(&sender); - // We will either move transaction to future or remove it completely - // so there will be no transactions from this sender in current - self.last_nonces.remove(&sender); + // Remove from current + let order = self.current.drop(&sender, &nonce); + if order.is_some() { + // We will either move transaction to future or remove it completely + // so there will be no transactions from this sender in current + self.last_nonces.remove(&sender); + // This should move all current transactions to future and remove old transactions + self.move_all_to_future(&sender, current_nonce); + // And now lets check if there is some chain of transactions in future + // that should be placed in current. It should also update last_nonces. + self.move_future_txs(sender.clone(), current_nonce, current_nonce); + return; + } + } + fn recalculate_future_for_sender(&mut self, sender: &Address, current_nonce: U256) { + // We need to drain all transactions for current sender from future and reinsert them with updated height + let all_nonces_from_sender = match self.future.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], + }; + for k in all_nonces_from_sender { + let order = self.future.drop(&sender, &k).unwrap(); + self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + } + } + + fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) { let all_nonces_from_sender = match self.current.by_address.row(&sender) { Some(row_map) => row_map.keys().cloned().collect::>(), None => vec![], @@ -273,14 +297,9 @@ impl TransactionQueue { } } self.future.enforce_limit(&mut self.by_hash); - - // And now lets check if there is some chain of transactions in future - // that should be placed in current - if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) { - self.last_nonces.insert(sender, new_current_top); - } } + /// Returns top transactions from the queue pub fn top_transactions(&self, size: usize) -> Vec { self.current.by_priority @@ -299,11 +318,11 @@ impl TransactionQueue { self.last_nonces.clear(); } - fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option { + fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { { let by_nonce = self.future.by_address.row_mut(&address); if let None = by_nonce { - return None; + return; } let mut by_nonce = by_nonce.unwrap(); while let Some(order) = by_nonce.remove(¤t_nonce) { @@ -316,8 +335,8 @@ impl TransactionQueue { } } self.future.by_address.clear_if_empty(&address); - // Returns last inserted nonce - Some(current_nonce - U256::one()) + // Update last inserted nonce + self.last_nonces.insert(address, current_nonce - U256::one()); } fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) @@ -353,9 +372,9 @@ impl TransactionQueue { let base_nonce = fetch_nonce(&address); Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); + self.last_nonces.insert(address.clone(), nonce); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); - self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); + self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); self.current.enforce_limit(&mut self.by_hash); } @@ -777,6 +796,7 @@ mod test { fn should_recalculate_height_when_removing_from_future() { // given let previous_nonce = |a: &Address| default_nonce(a) - U256::one(); + let next_nonce = |a: &Address| default_nonce(a) + U256::one(); let mut txq = TransactionQueue::new(); let (tx1, tx2) = new_txs(U256::one()); txq.add(tx1.clone(), &previous_nonce); @@ -784,7 +804,7 @@ mod test { assert_eq!(txq.status().future, 2); // when - txq.remove(&tx1.hash(), &default_nonce); + txq.remove(&tx1.hash(), &next_nonce); // then let stats = txq.status();