diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index df5587719..339ae214b 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -48,6 +48,8 @@ pub struct TestBlockChainClient { pub difficulty: RwLock, /// Balances. pub balances: RwLock>, + /// Nonces. + pub nonces: RwLock>, /// Storage. pub storage: RwLock>, /// Code. @@ -90,6 +92,7 @@ impl TestBlockChainClient { last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), balances: RwLock::new(HashMap::new()), + nonces: RwLock::new(HashMap::new()), storage: RwLock::new(HashMap::new()), code: RwLock::new(HashMap::new()), execution_result: RwLock::new(None), @@ -116,6 +119,11 @@ impl TestBlockChainClient { self.balances.write().unwrap().insert(address, balance); } + /// Set nonce of account `address` to `nonce`. + pub fn set_nonce(&self, address: Address, nonce: U256) { + self.nonces.write().unwrap().insert(address, nonce); + } + /// Set `code` at `address`. pub fn set_code(&self, address: Address, code: Bytes) { self.code.write().unwrap().insert(address, code); @@ -157,6 +165,8 @@ impl TestBlockChainClient { EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { let mut txs = RlpStream::new_list(1); let keypair = KeyPair::create().unwrap(); + // Update nonces value + self.nonces.write().unwrap().insert(keypair.address(), U256::one()); let tx = Transaction { action: Action::Create, value: U256::from(100), @@ -222,8 +232,8 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn nonce(&self, _address: &Address) -> U256 { - U256::zero() + fn nonce(&self, address: &Address) -> U256 { + self.nonces.read().unwrap().get(address).cloned().unwrap_or_else(U256::zero) } fn code(&self, address: &Address) -> Option { diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 5169be2c5..af89a7345 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -133,13 +133,13 @@ impl Miner { } }; let mut queue = self.transaction_queue.lock().unwrap(); - queue.remove_all( - &invalid_transactions.into_iter().collect::>(), - |a: &Address| AccountDetails { - nonce: chain.nonce(a), - balance: chain.balance(a), - } - ); + let fetch_account = |a: &Address| AccountDetails { + nonce: chain.nonce(a), + balance: chain.balance(a), + }; + for hash in invalid_transactions.into_iter() { + queue.remove_invalid(&hash, &fetch_account); + } if let Some(block) = b { if sealing_work.peek_last_ref().map_or(true, |pb| pb.block().fields().header.hash() != block.block().fields().header.hash()) { trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash()); @@ -299,7 +299,7 @@ impl MinerService for Miner { } } - fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) { + fn chain_new_blocks(&self, chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(*hash)) @@ -309,6 +309,11 @@ impl MinerService for Miner { block.transactions() } + // 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions + // should be still available in the queue. + // 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that + // are in those blocks + // First update gas limit in transaction queue self.update_gas_limit(chain); @@ -330,29 +335,23 @@ impl MinerService for Miner { }); } - // ...and after that remove old ones + // ...and at the end remove old ones { - let in_chain = { - let mut in_chain = HashSet::new(); - in_chain.extend(imported); - in_chain.extend(enacted); - in_chain.extend(invalid); - in_chain - .into_iter() - .collect::>() - }; - - let in_chain = in_chain + let in_chain = enacted .par_iter() .map(|h: &H256| fetch_transactions(chain, h)); - in_chain.for_each(|txs| { - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + in_chain.for_each(|mut txs| { let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.remove_all(&hashes, |a| AccountDetails { - nonce: chain.nonce(a), - balance: chain.balance(a) - }); + + let to_remove = txs.drain(..) + .map(|tx| { + tx.sender().expect("Transaction is in block, so sender has to be defined.") + }) + .collect::>(); + for sender in to_remove.into_iter() { + transaction_queue.remove_all(sender, chain.nonce(&sender)); + } }); } diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index fad864a5a..95f2056b5 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -65,8 +65,8 @@ //! assert_eq!(top[1], st2); //! //! // And when transaction is removed (but nonce haven't changed) -//! // it will move invalid transactions to future -//! txq.remove(&st1.hash(), &default_nonce); +//! // it will move subsequent transactions to future +//! txq.remove_invalid(&st1.hash(), &default_nonce); //! assert_eq!(txq.status().pending, 0); //! assert_eq!(txq.status().future, 1); //! assert_eq!(txq.top_transactions().len(), 0); @@ -76,11 +76,13 @@ //! # Maintaing valid state //! //! 1. Whenever transaction is imported to queue (to queue) all other transactions from this sender are revalidated in current. It means that they are moved to future and back again (height recalculation & gap filling). -//! 2. Whenever transaction is removed: +//! 2. Whenever invalid transaction is removed: //! - When it's removed from `future` - all `future` transactions heights are recalculated and then //! we check if the transactions should go to `current` (comparing state nonce) //! - When it's removed from `current` - all transactions from this sender (`current` & `future`) are recalculated. -//! +//! 3. `remove_all` is used to inform the queue about client (state) nonce changes. +//! - It removes all transactions (either from `current` or `future`) with nonce < client nonce +//! - It moves matching `future` transactions to `current` use std::default::Default; use std::cmp::{Ordering}; @@ -398,22 +400,28 @@ impl TransactionQueue { self.import_tx(vtx, client_account.nonce).map_err(Error::Transaction) } - /// Removes all transactions identified by hashes given in slice - /// - /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, transaction_hashes: &[H256], fetch_account: T) - where T: Fn(&Address) -> AccountDetails { - for hash in transaction_hashes { - self.remove(&hash, &fetch_account); - } + /// Removes all transactions from particular sender up to (excluding) given client (state) nonce. + /// Client (State) Nonce = next valid nonce for this sender. + pub fn remove_all(&mut self, sender: Address, client_nonce: U256) { + // We will either move transaction to future or remove it completely + // so there will be no transactions from this sender in current + self.last_nonces.remove(&sender); + // First update height of transactions in future to avoid collisions + self.update_future(&sender, client_nonce); + // This should move all current transactions to future and remove old transactions + self.move_all_to_future(&sender, client_nonce); + // And now lets check if there is some batch of transactions in future + // that should be placed in current. It should also update last_nonces. + self.move_matching_future_to_current(sender, client_nonce, client_nonce); } - /// Removes transaction identified by hashes from queue. + /// Removes invalid transaction identified by hash from queue. + /// Assumption is that this transaction nonce is not related to client nonce, + /// so transactions left in queue are processed according to client nonce. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, transaction_hash: &H256, fetch_account: &T) + pub fn remove_invalid(&mut self, transaction_hash: &H256, fetch_account: &T) where T: Fn(&Address) -> AccountDetails { - let transaction = self.by_hash.remove(transaction_hash); if transaction.is_none() { // We don't know this transaction @@ -425,7 +433,6 @@ impl TransactionQueue { let nonce = transaction.nonce(); let current_nonce = fetch_account(&sender).nonce; - // Remove from future let order = self.future.drop(&sender, &nonce); if order.is_some() { @@ -465,7 +472,7 @@ impl TransactionQueue { if k >= current_nonce { self.future.insert(*sender, k, order.update_height(k, current_nonce)); } else { - trace!(target: "miner", "Dropping old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); + trace!(target: "miner", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); // Remove the transaction completely self.by_hash.remove(&order.hash); } @@ -486,7 +493,7 @@ impl TransactionQueue { if k >= current_nonce { self.future.insert(*sender, k, order.update_height(k, current_nonce)); } else { - trace!(target: "miner", "Dropping old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); + trace!(target: "miner", "Removing old transaction: {:?} (nonce: {} < {})", order.hash, k, current_nonce); self.by_hash.remove(&order.hash); } } @@ -665,9 +672,14 @@ mod test { new_unsigned_tx(U256::from(123)).sign(&keypair.secret()) } + + fn default_nonce_val() -> U256 { + U256::from(123) + } + fn default_nonce(_address: &Address) -> AccountDetails { AccountDetails { - nonce: U256::from(123), + nonce: default_nonce_val(), balance: !U256::zero() } } @@ -965,8 +977,7 @@ mod test { // given let prev_nonce = |a: &Address| AccountDetails{ nonce: default_nonce(a).nonce - U256::one(), balance: !U256::zero() }; - let next2_nonce = |a: &Address| AccountDetails{ nonce: default_nonce(a).nonce + U256::from(2), balance: - !U256::zero() }; + let next2_nonce = default_nonce_val() + U256::from(3); let mut txq = TransactionQueue::new(); @@ -976,7 +987,7 @@ mod test { assert_eq!(txq.status().future, 2); // when - txq.remove(&tx.hash(), &next2_nonce); + txq.remove_all(tx.sender().unwrap(), next2_nonce); // should remove both transactions since they are not valid // then @@ -1019,8 +1030,8 @@ mod test { assert_eq!(txq2.status().future, 1); // when - txq2.remove(&tx.hash(), &default_nonce); - txq2.remove(&tx2.hash(), &default_nonce); + txq2.remove_all(tx.sender().unwrap(), tx.nonce + U256::one()); + txq2.remove_all(tx2.sender().unwrap(), tx2.nonce + U256::one()); // then @@ -1042,7 +1053,7 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx.hash(), &default_nonce); + txq.remove_invalid(&tx.hash(), &default_nonce); // then let stats = txq.status(); @@ -1152,7 +1163,7 @@ mod test { assert_eq!(txq.status().pending, 2); // when - txq.remove(&tx1.hash(), &default_nonce); + txq.remove_invalid(&tx1.hash(), &default_nonce); assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); txq.add(tx1.clone(), &default_nonce).unwrap(); @@ -1166,8 +1177,6 @@ mod test { #[test] fn should_not_move_to_future_if_state_nonce_is_higher() { // given - let next_nonce = |a: &Address| AccountDetails { nonce: default_nonce(a).nonce + U256::one(), balance: - !U256::zero() }; let mut txq = TransactionQueue::new(); let (tx, tx2) = new_txs(U256::from(1)); let tx3 = new_tx(); @@ -1178,7 +1187,8 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx.hash(), &next_nonce); + let sender = tx.sender().unwrap(); + txq.remove_all(sender, default_nonce_val() + U256::one()); // then let stats = txq.status(); @@ -1254,7 +1264,7 @@ mod test { assert_eq!(txq.status().future, 2); // when - txq.remove(&tx1.hash(), &next_nonce); + txq.remove_invalid(&tx1.hash(), &next_nonce); // then let stats = txq.status(); @@ -1286,4 +1296,22 @@ mod test { // then assert_eq!(txq.last_nonce(&from), Some(nonce)); } + + #[test] + fn should_remove_old_transaction_even_if_newer_transaction_was_not_known() { + // given + let mut txq = TransactionQueue::new(); + let (tx1, tx2) = new_txs(U256::one()); + let (nonce1, nonce2) = (tx1.nonce, tx2.nonce); + let details1 = |_a: &Address| AccountDetails { nonce: nonce1, balance: !U256::zero() }; + + // Insert first transaction + txq.add(tx1, &details1).unwrap(); + + // when + txq.remove_all(tx2.sender().unwrap(), nonce2 + U256::one()); + + // then + assert!(txq.top_transactions().is_empty()); + } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e2c11af1a..6313c7022 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1694,21 +1694,34 @@ mod tests { let good_blocks = vec![client.block_hash_delta_minus(2)]; let retracted_blocks = vec![client.block_hash_delta_minus(1)]; - // Add some balance to clients + // Add some balance to clients and reset nonces for h in &[good_blocks[0], retracted_blocks[0]] { let block = client.block(BlockId::Hash(*h)).unwrap(); let view = BlockView::new(&block); client.set_balance(view.transactions()[0].sender().unwrap(), U256::from(1_000_000_000)); + client.set_nonce(view.transactions()[0].sender().unwrap(), U256::from(0)); } - let mut queue = VecDeque::new(); - let mut io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); - assert_eq!(sync.miner.status().transactions_in_future_queue, 0); - assert_eq!(sync.miner.status().transactions_in_pending_queue, 1); - sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks); + { + let mut queue = VecDeque::new(); + let mut io = TestIo::new(&mut client, &mut queue, None); + sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); + assert_eq!(sync.miner.status().transactions_in_future_queue, 0); + assert_eq!(sync.miner.status().transactions_in_pending_queue, 1); + } + // We need to update nonce status (because we say that the block has been imported) + for h in &[good_blocks[0]] { + let block = client.block(BlockId::Hash(*h)).unwrap(); + let view = BlockView::new(&block); + client.set_nonce(view.transactions()[0].sender().unwrap(), U256::from(1)); + } + { + let mut queue = VecDeque::new(); + let mut io = TestIo::new(&mut client, &mut queue, None); + sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks); + } // then let status = sync.miner.status(); @@ -1735,7 +1748,7 @@ mod tests { sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); assert_eq!(sync.miner.status().transactions_in_future_queue, 0); assert_eq!(sync.miner.status().transactions_in_pending_queue, 0); - sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks); + sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks); // then let status = sync.miner.status();