diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 6ee1b88d5..71823ca00 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -317,7 +317,7 @@ impl Client where V: Verifier { } { - if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { + if !imported_blocks.is_empty() { let (enacted, retracted) = self.calculate_enacted_retracted(import_results); io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { imported: imported_blocks, diff --git a/miner/src/miner.rs b/miner/src/miner.rs index ccf45f61a..8272339b9 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -18,6 +18,7 @@ use rayon::prelude::*; use std::sync::{Mutex, RwLock, Arc}; use std::sync::atomic; use std::sync::atomic::AtomicBool; +use std::collections::HashSet; use util::{H256, U256, Address, Bytes, Uint}; use ethcore::views::{BlockView}; @@ -176,21 +177,10 @@ impl MinerService for Miner { let block = BlockView::new(&block); block.transactions() } - { - let in_chain = vec![imported, enacted, invalid]; - let in_chain = in_chain - .par_iter() - .flat_map(|h| h.par_iter().map(|h| fetch_transactions(chain, h))); let out_of_chain = retracted .par_iter() .map(|h| fetch_transactions(chain, h)); - - in_chain.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); out_of_chain.for_each(|txs| { // populate sender for tx in &txs { @@ -200,6 +190,28 @@ impl MinerService for Miner { let _ = transaction_queue.add_all(txs, |a| chain.nonce(a)); }); } + // First import all transactions and after that remove old ones + { + let in_chain = { + let mut in_chain = HashSet::new(); + in_chain.extend(imported); + in_chain.extend(enacted); + in_chain.extend(invalid); + in_chain + .into_iter() + .collect::>() + }; + + let in_chain = in_chain + .par_iter() + .map(|h: &H256| fetch_transactions(chain, h)); + + in_chain.for_each(|txs| { + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + } if self.sealing_enabled.load(atomic::Ordering::Relaxed) { self.prepare_sealing(chain); diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index 324a46364..04febb84d 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -351,6 +351,8 @@ impl TransactionQueue { /// If gap is introduced marks subsequent transactions as future pub fn remove(&mut self, transaction_hash: &H256, fetch_nonce: &T) where T: Fn(&Address) -> U256 { + + println!("Removing transaction: (hash: {:?})", transaction_hash); let transaction = self.by_hash.remove(transaction_hash); if transaction.is_none() { // We don't know this transaction @@ -362,6 +364,8 @@ impl TransactionQueue { let nonce = transaction.nonce(); let current_nonce = fetch_nonce(&sender); + println!("Removing transaction: ({:?}, {:?}, hash: {:?})", sender, nonce, transaction.hash()); + // Remove from future let order = self.future.drop(&sender, &nonce); if order.is_some() { @@ -489,12 +493,14 @@ impl TransactionQueue { fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { + if self.by_hash.get(&tx.hash()).is_some() { // Transaction is already imported. trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash()); return; } + let address = tx.sender(); let nonce = tx.nonce(); @@ -506,6 +512,7 @@ impl TransactionQueue { // Check height if nonce > next_nonce { + println!("[F] Importing transaction: ({:?}, {:?}, hash: {:?}, gas: {:?})", tx.sender(), tx.nonce(), tx.hash(), tx.transaction.gas_price); // We have a gap - put to future Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash); self.future.enforce_limit(&mut self.by_hash); @@ -515,6 +522,7 @@ impl TransactionQueue { trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); return; } + println!("[C] Importing transaction: ({:?}, {:?}, hash: {:?}, gas: {:?})", tx.sender(), tx.nonce(), tx.hash(), tx.transaction.gas_price); Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash); self.last_nonces.insert(address, nonce); @@ -540,11 +548,13 @@ impl TransactionQueue { let new_fee = order.gas_price; if old_fee.cmp(&new_fee) == Ordering::Greater { // Put back old transaction since it has greater priority (higher gas_price) + println!("Didn't replace tx (h:{:?}, h:{:?})", hash, old.hash); set.by_address.insert(address, nonce, old); // and remove new one set.by_priority.remove(&order); by_hash.remove(&hash); } else { + println!("Replaced h:{:?} with h:{:?}, ", old.hash, hash); // Make sure we remove old transaction entirely set.by_priority.remove(&old); by_hash.remove(&old.hash);