Merge branch 'tx_queue' into tx_queue_integration
Conflicts: sync/src/transaction_queue.rs
This commit is contained in:
commit
bd7614f445
@ -1271,7 +1271,8 @@ impl ChainSync {
|
|||||||
.expect("Expected in-chain blocks.");
|
.expect("Expected in-chain blocks.");
|
||||||
let block = BlockView::new(&block);
|
let block = BlockView::new(&block);
|
||||||
block.transactions()
|
block.transactions()
|
||||||
};
|
}
|
||||||
|
|
||||||
|
|
||||||
let chain = io.chain();
|
let chain = io.chain();
|
||||||
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
||||||
@ -1280,7 +1281,7 @@ impl ChainSync {
|
|||||||
good.for_each(|txs| {
|
good.for_each(|txs| {
|
||||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
||||||
transaction_queue.remove_all(&hashes);
|
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
||||||
});
|
});
|
||||||
bad.for_each(|txs| {
|
bad.for_each(|txs| {
|
||||||
// populate sender
|
// populate sender
|
||||||
|
@ -87,7 +87,7 @@ impl TestBlockChainClient {
|
|||||||
data: "3331600055".from_hex().unwrap(),
|
data: "3331600055".from_hex().unwrap(),
|
||||||
gas: U256::from(100_000),
|
gas: U256::from(100_000),
|
||||||
gas_price: U256::one(),
|
gas_price: U256::one(),
|
||||||
nonce: U256::one()
|
nonce: U256::zero()
|
||||||
};
|
};
|
||||||
let signed_tx = tx.sign(&keypair.secret());
|
let signed_tx = tx.sign(&keypair.secret());
|
||||||
txs.append(&signed_tx);
|
txs.append(&signed_tx);
|
||||||
|
@ -34,13 +34,18 @@ struct TransactionOrder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionOrder {
|
impl TransactionOrder {
|
||||||
pub fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self {
|
fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self {
|
||||||
TransactionOrder {
|
TransactionOrder {
|
||||||
nonce_height: tx.nonce() - base_nonce,
|
nonce_height: tx.nonce() - base_nonce,
|
||||||
gas_price: tx.transaction.gas_price,
|
gas_price: tx.transaction.gas_price,
|
||||||
hash: tx.hash(),
|
hash: tx.hash(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_height(mut self, nonce: U256, base_nonce: U256) -> Self {
|
||||||
|
self.nonce_height = nonce - base_nonce;
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Eq for TransactionOrder {}
|
impl Eq for TransactionOrder {}
|
||||||
@ -158,10 +163,8 @@ pub struct TransactionQueue {
|
|||||||
future: TransactionSet,
|
future: TransactionSet,
|
||||||
/// All transactions managed by queue indexed by hash
|
/// All transactions managed by queue indexed by hash
|
||||||
by_hash: HashMap<H256, VerifiedTransaction>,
|
by_hash: HashMap<H256, VerifiedTransaction>,
|
||||||
/// Last nonce of transaction in current
|
/// Last nonce of transaction in current (to quickly check next expected transaction)
|
||||||
last_nonces: HashMap<Address, U256>,
|
last_nonces: HashMap<Address, U256>,
|
||||||
/// First nonce of transaction in current (used to determine priority)
|
|
||||||
first_nonces: HashMap<Address, U256>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionQueue {
|
impl TransactionQueue {
|
||||||
@ -188,7 +191,6 @@ impl TransactionQueue {
|
|||||||
future: future,
|
future: future,
|
||||||
by_hash: HashMap::new(),
|
by_hash: HashMap::new(),
|
||||||
last_nonces: HashMap::new(),
|
last_nonces: HashMap::new(),
|
||||||
first_nonces: HashMap::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -217,16 +219,18 @@ impl TransactionQueue {
|
|||||||
/// Removes all transactions identified by hashes given in slice
|
/// Removes all transactions identified by hashes given in slice
|
||||||
///
|
///
|
||||||
/// If gap is introduced marks subsequent transactions as future
|
/// If gap is introduced marks subsequent transactions as future
|
||||||
pub fn remove_all(&mut self, transaction_hashes: &[H256]) {
|
pub fn remove_all<T>(&mut self, transaction_hashes: &[H256], fetch_nonce: T)
|
||||||
for transaction_hash in transaction_hashes {
|
where T: Fn(&Address) -> U256 {
|
||||||
self.remove(&transaction_hash);
|
for hash in transaction_hashes {
|
||||||
|
self.remove(&hash, &fetch_nonce);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes transaction identified by hashes from queue.
|
/// Removes transaction identified by hashes from queue.
|
||||||
///
|
///
|
||||||
/// If gap is introduced marks subsequent transactions as future
|
/// If gap is introduced marks subsequent transactions as future
|
||||||
pub fn remove(&mut self, transaction_hash: &H256) {
|
pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
|
||||||
|
where T: Fn(&Address) -> U256 {
|
||||||
let transaction = self.by_hash.remove(transaction_hash);
|
let transaction = self.by_hash.remove(transaction_hash);
|
||||||
if transaction.is_none() {
|
if transaction.is_none() {
|
||||||
// We don't know this transaction
|
// We don't know this transaction
|
||||||
@ -236,6 +240,7 @@ impl TransactionQueue {
|
|||||||
let sender = transaction.sender();
|
let sender = transaction.sender();
|
||||||
let nonce = transaction.nonce();
|
let nonce = transaction.nonce();
|
||||||
|
|
||||||
|
println!("Removing tx: {:?}", transaction.transaction);
|
||||||
// Remove from future
|
// Remove from future
|
||||||
self.future.drop(&sender, &nonce);
|
self.future.drop(&sender, &nonce);
|
||||||
|
|
||||||
@ -245,56 +250,35 @@ impl TransactionQueue {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Are there any other transactions from this sender?
|
// Let's remove transactions where tx.nonce < current_nonce
|
||||||
if !self.current.by_address.has_row(&sender) {
|
// and if there are any future transactions matching current_nonce+1 - move to current
|
||||||
// Clear last & first nonces
|
let current_nonce = fetch_nonce(&sender);
|
||||||
|
// We will either move transaction to future or remove it completely
|
||||||
|
// so there will be no transactions from this sender in current
|
||||||
self.last_nonces.remove(&sender);
|
self.last_nonces.remove(&sender);
|
||||||
self.first_nonces.remove(&sender);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Let's find those with higher nonce (TODO [todr] optimize?)
|
let all_nonces_from_sender = match self.current.by_address.row(&sender) {
|
||||||
let to_move_to_future = {
|
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
|
||||||
let row_map = self.current.by_address.row(&sender).unwrap();
|
None => vec![],
|
||||||
let mut to_future = Vec::new();
|
|
||||||
let mut highest = U256::zero();
|
|
||||||
let mut lowest = nonce.clone();
|
|
||||||
|
|
||||||
// Search nonces to remove and track lowest and highest
|
|
||||||
for (current_nonce, _) in row_map.iter() {
|
|
||||||
if current_nonce > &nonce {
|
|
||||||
to_future.push(current_nonce.clone());
|
|
||||||
} else if current_nonce > &highest {
|
|
||||||
highest = current_nonce.clone();
|
|
||||||
} else if current_nonce < &lowest {
|
|
||||||
lowest = current_nonce.clone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update first_nonces and last_nonces
|
|
||||||
if highest == U256::zero() {
|
|
||||||
self.last_nonces.remove(&sender);
|
|
||||||
} else {
|
|
||||||
self.last_nonces.insert(sender.clone(), highest);
|
|
||||||
}
|
|
||||||
|
|
||||||
if lowest == nonce {
|
|
||||||
self.first_nonces.remove(&sender);
|
|
||||||
} else {
|
|
||||||
self.first_nonces.insert(sender.clone(), lowest);
|
|
||||||
}
|
|
||||||
|
|
||||||
// return to future
|
|
||||||
to_future
|
|
||||||
};
|
};
|
||||||
|
|
||||||
for k in to_move_to_future {
|
for k in all_nonces_from_sender {
|
||||||
if let Some(v) = self.current.drop(&sender, &k) {
|
// Goes to future or is removed
|
||||||
// TODO [todr] Recalculate height?
|
let order = self.current.drop(&sender, &k).unwrap();
|
||||||
self.future.insert(sender.clone(), k, v);
|
if k >= current_nonce {
|
||||||
|
println!("Moving to future: {:?}", order);
|
||||||
|
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
|
||||||
|
} else {
|
||||||
|
self.by_hash.remove(&order.hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.future.enforce_limit(&self.by_hash);
|
self.future.enforce_limit(&self.by_hash);
|
||||||
|
|
||||||
|
// And now lets check if there is some chain of transactions in future
|
||||||
|
// that should be placed in current
|
||||||
|
if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) {
|
||||||
|
self.last_nonces.insert(sender, new_current_top);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns top transactions from the queue
|
/// Returns top transactions from the queue
|
||||||
@ -313,11 +297,9 @@ impl TransactionQueue {
|
|||||||
self.future.clear();
|
self.future.clear();
|
||||||
self.by_hash.clear();
|
self.by_hash.clear();
|
||||||
self.last_nonces.clear();
|
self.last_nonces.clear();
|
||||||
self.first_nonces.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option<U256> {
|
fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option<U256> {
|
||||||
let mut current_nonce = current_nonce + U256::one();
|
|
||||||
{
|
{
|
||||||
let by_nonce = self.future.by_address.row_mut(&address);
|
let by_nonce = self.future.by_address.row_mut(&address);
|
||||||
if let None = by_nonce {
|
if let None = by_nonce {
|
||||||
@ -328,9 +310,9 @@ impl TransactionQueue {
|
|||||||
// remove also from priority and hash
|
// remove also from priority and hash
|
||||||
self.future.by_priority.remove(&order);
|
self.future.by_priority.remove(&order);
|
||||||
// Put to current
|
// Put to current
|
||||||
let transaction = self.by_hash.get(&order.hash).expect("TransactionQueue Inconsistency");
|
println!("Moved: {:?}", order);
|
||||||
let order = TransactionOrder::for_transaction(transaction, first_nonce);
|
let order = order.update_height(current_nonce.clone(), first_nonce);
|
||||||
self.current.insert(address.clone(), transaction.nonce(), order);
|
self.current.insert(address.clone(), current_nonce, order);
|
||||||
current_nonce = current_nonce + U256::one();
|
current_nonce = current_nonce + U256::one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -344,11 +326,12 @@ impl TransactionQueue {
|
|||||||
let nonce = tx.nonce();
|
let nonce = tx.nonce();
|
||||||
let address = tx.sender();
|
let address = tx.sender();
|
||||||
|
|
||||||
let next_nonce = U256::one() + self.last_nonces
|
let next_nonce = self.last_nonces
|
||||||
.get(&address)
|
.get(&address)
|
||||||
.cloned()
|
.cloned()
|
||||||
.unwrap_or_else(|| fetch_nonce(&address));
|
.map_or_else(|| fetch_nonce(&address), |n| n + U256::one());
|
||||||
|
|
||||||
|
println!("Expected next: {:?}, got: {:?}", next_nonce, nonce);
|
||||||
// Check height
|
// Check height
|
||||||
if nonce > next_nonce {
|
if nonce > next_nonce {
|
||||||
let order = TransactionOrder::for_transaction(&tx, next_nonce);
|
let order = TransactionOrder::for_transaction(&tx, next_nonce);
|
||||||
@ -364,20 +347,15 @@ impl TransactionQueue {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let first_nonce = self.first_nonces
|
let base_nonce = fetch_nonce(&address);
|
||||||
.get(&address)
|
let order = TransactionOrder::for_transaction(&tx, base_nonce);
|
||||||
.cloned()
|
|
||||||
.unwrap_or_else(|| nonce.clone());
|
|
||||||
|
|
||||||
let order = TransactionOrder::for_transaction(&tx, first_nonce);
|
|
||||||
// Insert to by_hash
|
// Insert to by_hash
|
||||||
self.by_hash.insert(tx.hash(), tx);
|
self.by_hash.insert(tx.hash(), tx);
|
||||||
|
|
||||||
// Insert to current
|
// Insert to current
|
||||||
self.current.insert(address.clone(), nonce, order);
|
self.current.insert(address.clone(), nonce, order);
|
||||||
// But maybe there are some more items waiting in future?
|
// But maybe there are some more items waiting in future?
|
||||||
let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce);
|
let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce);
|
||||||
self.first_nonces.insert(address.clone(), first_nonce);
|
|
||||||
self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce));
|
self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce));
|
||||||
// Enforce limit
|
// Enforce limit
|
||||||
self.current.enforce_limit(&self.by_hash);
|
self.current.enforce_limit(&self.by_hash);
|
||||||
@ -415,7 +393,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn default_nonce(_address: &Address) -> U256 {
|
fn default_nonce(_address: &Address) -> U256 {
|
||||||
U256::from(122)
|
U256::from(123)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) {
|
fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) {
|
||||||
@ -555,8 +533,8 @@ mod test {
|
|||||||
assert_eq!(txq2.status().future, 1);
|
assert_eq!(txq2.status().future, 1);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
txq2.remove(&tx.hash());
|
txq2.remove(&tx.hash(), &default_nonce);
|
||||||
txq2.remove(&tx2.hash());
|
txq2.remove(&tx2.hash(), &default_nonce);
|
||||||
|
|
||||||
|
|
||||||
// then
|
// then
|
||||||
@ -578,7 +556,7 @@ mod test {
|
|||||||
assert_eq!(txq.status().pending, 3);
|
assert_eq!(txq.status().pending, 3);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
txq.remove(&tx.hash());
|
txq.remove(&tx.hash(), &default_nonce);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let stats = txq.status();
|
let stats = txq.status();
|
||||||
@ -646,7 +624,7 @@ mod test {
|
|||||||
fn should_drop_transactions_with_old_nonces() {
|
fn should_drop_transactions_with_old_nonces() {
|
||||||
let mut txq = TransactionQueue::new();
|
let mut txq = TransactionQueue::new();
|
||||||
let tx = new_tx();
|
let tx = new_tx();
|
||||||
let last_nonce = tx.nonce.clone();
|
let last_nonce = tx.nonce.clone() + U256::one();
|
||||||
let fetch_last_nonce = |_a: &Address| last_nonce;
|
let fetch_last_nonce = |_a: &Address| last_nonce;
|
||||||
|
|
||||||
// when
|
// when
|
||||||
@ -668,7 +646,7 @@ mod test {
|
|||||||
assert_eq!(txq.status().pending, 2);
|
assert_eq!(txq.status().pending, 2);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
txq.remove(&tx1.hash());
|
txq.remove(&tx1.hash(), &default_nonce);
|
||||||
assert_eq!(txq.status().pending, 0);
|
assert_eq!(txq.status().pending, 0);
|
||||||
assert_eq!(txq.status().future, 1);
|
assert_eq!(txq.status().future, 1);
|
||||||
txq.add(tx1.clone(), &default_nonce);
|
txq.add(tx1.clone(), &default_nonce);
|
||||||
@ -677,7 +655,28 @@ mod test {
|
|||||||
let stats = txq.status();
|
let stats = txq.status();
|
||||||
assert_eq!(stats.future, 0);
|
assert_eq!(stats.future, 0);
|
||||||
assert_eq!(stats.pending, 2);
|
assert_eq!(stats.pending, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_not_move_to_future_if_state_nonce_is_higher() {
|
||||||
|
// given
|
||||||
|
let next_nonce = |a: &Address| default_nonce(a) + U256::one();
|
||||||
|
let mut txq = TransactionQueue::new();
|
||||||
|
let (tx, tx2) = new_txs(U256::from(1));
|
||||||
|
let tx3 = new_tx();
|
||||||
|
txq.add(tx2.clone(), &default_nonce);
|
||||||
|
assert_eq!(txq.status().future, 1);
|
||||||
|
txq.add(tx3.clone(), &default_nonce);
|
||||||
|
txq.add(tx.clone(), &default_nonce);
|
||||||
|
assert_eq!(txq.status().pending, 3);
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.remove(&tx.hash(), &next_nonce);
|
||||||
|
|
||||||
|
// then
|
||||||
|
let stats = txq.status();
|
||||||
|
assert_eq!(stats.future, 0);
|
||||||
|
assert_eq!(stats.pending, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user