Importing transactions from hashset. Notifying about every block

This commit is contained in:
Tomasz Drwięga 2016-03-15 23:01:36 +01:00 committed by arkpar
parent df4b326cb3
commit fca8eb5810
3 changed files with 34 additions and 12 deletions

View File

@ -317,7 +317,7 @@ impl<V> Client<V> where V: Verifier {
} }
{ {
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { if !imported_blocks.is_empty() {
let (enacted, retracted) = self.calculate_enacted_retracted(import_results); let (enacted, retracted) = self.calculate_enacted_retracted(import_results);
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
imported: imported_blocks, imported: imported_blocks,

View File

@ -18,6 +18,7 @@ use rayon::prelude::*;
use std::sync::{Mutex, RwLock, Arc}; use std::sync::{Mutex, RwLock, Arc};
use std::sync::atomic; use std::sync::atomic;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::collections::HashSet;
use util::{H256, U256, Address, Bytes, Uint}; use util::{H256, U256, Address, Bytes, Uint};
use ethcore::views::{BlockView}; use ethcore::views::{BlockView};
@ -176,21 +177,10 @@ impl MinerService for Miner {
let block = BlockView::new(&block); let block = BlockView::new(&block);
block.transactions() block.transactions()
} }
{ {
let in_chain = vec![imported, enacted, invalid];
let in_chain = in_chain
.par_iter()
.flat_map(|h| h.par_iter().map(|h| fetch_transactions(chain, h)));
let out_of_chain = retracted let out_of_chain = retracted
.par_iter() .par_iter()
.map(|h| fetch_transactions(chain, h)); .map(|h| fetch_transactions(chain, h));
in_chain.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
out_of_chain.for_each(|txs| { out_of_chain.for_each(|txs| {
// populate sender // populate sender
for tx in &txs { for tx in &txs {
@ -200,6 +190,28 @@ impl MinerService for Miner {
let _ = transaction_queue.add_all(txs, |a| chain.nonce(a)); let _ = transaction_queue.add_all(txs, |a| chain.nonce(a));
}); });
} }
// First import all transactions and after that remove old ones
{
let in_chain = {
let mut in_chain = HashSet::new();
in_chain.extend(imported);
in_chain.extend(enacted);
in_chain.extend(invalid);
in_chain
.into_iter()
.collect::<Vec<H256>>()
};
let in_chain = in_chain
.par_iter()
.map(|h: &H256| fetch_transactions(chain, h));
in_chain.for_each(|txs| {
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
}
if self.sealing_enabled.load(atomic::Ordering::Relaxed) { if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
self.prepare_sealing(chain); self.prepare_sealing(chain);

View File

@ -351,6 +351,8 @@ impl TransactionQueue {
/// If gap is introduced marks subsequent transactions as future /// If gap is introduced marks subsequent transactions as future
pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T) pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
println!("Removing transaction: (hash: {:?})", transaction_hash);
let transaction = self.by_hash.remove(transaction_hash); let transaction = self.by_hash.remove(transaction_hash);
if transaction.is_none() { if transaction.is_none() {
// We don't know this transaction // We don't know this transaction
@ -362,6 +364,8 @@ impl TransactionQueue {
let nonce = transaction.nonce(); let nonce = transaction.nonce();
let current_nonce = fetch_nonce(&sender); let current_nonce = fetch_nonce(&sender);
println!("Removing transaction: ({:?}, {:?}, hash: {:?})", sender, nonce, transaction.hash());
// Remove from future // Remove from future
let order = self.future.drop(&sender, &nonce); let order = self.future.drop(&sender, &nonce);
if order.is_some() { if order.is_some() {
@ -489,12 +493,14 @@ impl TransactionQueue {
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
if self.by_hash.get(&tx.hash()).is_some() { if self.by_hash.get(&tx.hash()).is_some() {
// Transaction is already imported. // Transaction is already imported.
trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash()); trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash());
return; return;
} }
let address = tx.sender(); let address = tx.sender();
let nonce = tx.nonce(); let nonce = tx.nonce();
@ -506,6 +512,7 @@ impl TransactionQueue {
// Check height // Check height
if nonce > next_nonce { if nonce > next_nonce {
println!("[F] Importing transaction: ({:?}, {:?}, hash: {:?}, gas: {:?})", tx.sender(), tx.nonce(), tx.hash(), tx.transaction.gas_price);
// We have a gap - put to future // We have a gap - put to future
Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash); Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash);
self.future.enforce_limit(&mut self.by_hash); self.future.enforce_limit(&mut self.by_hash);
@ -515,6 +522,7 @@ impl TransactionQueue {
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
return; return;
} }
println!("[C] Importing transaction: ({:?}, {:?}, hash: {:?}, gas: {:?})", tx.sender(), tx.nonce(), tx.hash(), tx.transaction.gas_price);
Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash); Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash);
self.last_nonces.insert(address, nonce); self.last_nonces.insert(address, nonce);
@ -540,11 +548,13 @@ impl TransactionQueue {
let new_fee = order.gas_price; let new_fee = order.gas_price;
if old_fee.cmp(&new_fee) == Ordering::Greater { if old_fee.cmp(&new_fee) == Ordering::Greater {
// Put back old transaction since it has greater priority (higher gas_price) // Put back old transaction since it has greater priority (higher gas_price)
println!("Didn't replace tx (h:{:?}, h:{:?})", hash, old.hash);
set.by_address.insert(address, nonce, old); set.by_address.insert(address, nonce, old);
// and remove new one // and remove new one
set.by_priority.remove(&order); set.by_priority.remove(&order);
by_hash.remove(&hash); by_hash.remove(&hash);
} else { } else {
println!("Replaced h:{:?} with h:{:?}, ", old.hash, hash);
// Make sure we remove old transaction entirely // Make sure we remove old transaction entirely
set.by_priority.remove(&old); set.by_priority.remove(&old);
by_hash.remove(&old.hash); by_hash.remove(&old.hash);