Importing transactions from hashset. Notifying about every block
This commit is contained in:
parent
eac73a4e54
commit
188e325b20
@ -317,7 +317,7 @@ impl<V> Client<V> where V: Verifier {
|
||||
}
|
||||
|
||||
{
|
||||
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
|
||||
if !imported_blocks.is_empty() {
|
||||
let (enacted, retracted) = self.calculate_enacted_retracted(import_results);
|
||||
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
|
||||
imported: imported_blocks,
|
||||
|
@ -18,6 +18,7 @@ use rayon::prelude::*;
|
||||
use std::sync::{Mutex, RwLock, Arc};
|
||||
use std::sync::atomic;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use util::{H256, U256, Address, Bytes, Uint};
|
||||
use ethcore::views::{BlockView};
|
||||
@ -174,21 +175,10 @@ impl MinerService for Miner {
|
||||
let block = BlockView::new(&block);
|
||||
block.transactions()
|
||||
}
|
||||
|
||||
{
|
||||
let in_chain = vec![imported, enacted, invalid];
|
||||
let in_chain = in_chain
|
||||
.par_iter()
|
||||
.flat_map(|h| h.par_iter().map(|h| fetch_transactions(chain, h)));
|
||||
let out_of_chain = retracted
|
||||
.par_iter()
|
||||
.map(|h| fetch_transactions(chain, h));
|
||||
|
||||
in_chain.for_each(|txs| {
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
||||
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
||||
});
|
||||
out_of_chain.for_each(|txs| {
|
||||
// populate sender
|
||||
for tx in &txs {
|
||||
@ -198,6 +188,28 @@ impl MinerService for Miner {
|
||||
let _ = transaction_queue.add_all(txs, |a| chain.nonce(a));
|
||||
});
|
||||
}
|
||||
// First import all transactions and after that remove old ones
|
||||
{
|
||||
let in_chain = {
|
||||
let mut in_chain = HashSet::new();
|
||||
in_chain.extend(imported);
|
||||
in_chain.extend(enacted);
|
||||
in_chain.extend(invalid);
|
||||
in_chain
|
||||
.into_iter()
|
||||
.collect::<Vec<H256>>()
|
||||
};
|
||||
|
||||
let in_chain = in_chain
|
||||
.par_iter()
|
||||
.map(|h: &H256| fetch_transactions(chain, h));
|
||||
|
||||
in_chain.for_each(|txs| {
|
||||
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
||||
});
|
||||
}
|
||||
|
||||
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
|
||||
self.prepare_sealing(chain);
|
||||
|
@ -351,6 +351,8 @@ impl TransactionQueue {
|
||||
/// If gap is introduced marks subsequent transactions as future
|
||||
pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
|
||||
where T: Fn(&Address) -> U256 {
|
||||
|
||||
println!("Removing transaction: (hash: {:?})", transaction_hash);
|
||||
let transaction = self.by_hash.remove(transaction_hash);
|
||||
if transaction.is_none() {
|
||||
// We don't know this transaction
|
||||
@ -362,6 +364,8 @@ impl TransactionQueue {
|
||||
let nonce = transaction.nonce();
|
||||
let current_nonce = fetch_nonce(&sender);
|
||||
|
||||
println!("Removing transaction: ({:?}, {:?}, hash: {:?})", sender, nonce, transaction.hash());
|
||||
|
||||
// Remove from future
|
||||
let order = self.future.drop(&sender, &nonce);
|
||||
if order.is_some() {
|
||||
@ -489,12 +493,14 @@ impl TransactionQueue {
|
||||
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
|
||||
where T: Fn(&Address) -> U256 {
|
||||
|
||||
|
||||
if self.by_hash.get(&tx.hash()).is_some() {
|
||||
// Transaction is already imported.
|
||||
trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash());
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
let address = tx.sender();
|
||||
let nonce = tx.nonce();
|
||||
|
||||
@ -506,6 +512,7 @@ impl TransactionQueue {
|
||||
|
||||
// Check height
|
||||
if nonce > next_nonce {
|
||||
println!("[F] Importing transaction: ({:?}, {:?}, hash: {:?}, gas: {:?})", tx.sender(), tx.nonce(), tx.hash(), tx.transaction.gas_price);
|
||||
// We have a gap - put to future
|
||||
Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash);
|
||||
self.future.enforce_limit(&mut self.by_hash);
|
||||
@ -515,6 +522,7 @@ impl TransactionQueue {
|
||||
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
|
||||
return;
|
||||
}
|
||||
println!("[C] Importing transaction: ({:?}, {:?}, hash: {:?}, gas: {:?})", tx.sender(), tx.nonce(), tx.hash(), tx.transaction.gas_price);
|
||||
|
||||
Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash);
|
||||
self.last_nonces.insert(address, nonce);
|
||||
@ -540,11 +548,13 @@ impl TransactionQueue {
|
||||
let new_fee = order.gas_price;
|
||||
if old_fee.cmp(&new_fee) == Ordering::Greater {
|
||||
// Put back old transaction since it has greater priority (higher gas_price)
|
||||
println!("Didn't replace tx (h:{:?}, h:{:?})", hash, old.hash);
|
||||
set.by_address.insert(address, nonce, old);
|
||||
// and remove new one
|
||||
set.by_priority.remove(&order);
|
||||
by_hash.remove(&hash);
|
||||
} else {
|
||||
println!("Replaced h:{:?} with h:{:?}, ", old.hash, hash);
|
||||
// Make sure we remove old transaction entirely
|
||||
set.by_priority.remove(&old);
|
||||
by_hash.remove(&old.hash);
|
||||
|
Loading…
Reference in New Issue
Block a user