From b320ff46020ff8a3f517901969d17a28fd040de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 15:02:11 +0100 Subject: [PATCH 1/3] Getting rid of first_nonces (we can fetch it from state) --- sync/src/transaction_queue.rs | 76 ++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 1bbfbe36d..a12d75ed4 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -158,10 +158,8 @@ pub struct TransactionQueue { future: TransactionSet, /// All transactions managed by queue indexed by hash by_hash: HashMap, - /// Last nonce of transaction in current + /// Last nonce of transaction in current (to quickly check next expected transaction) last_nonces: HashMap, - /// First nonce of transaction in current (used to determine priority) - first_nonces: HashMap, } impl TransactionQueue { @@ -188,7 +186,6 @@ impl TransactionQueue { future: future, by_hash: HashMap::new(), last_nonces: HashMap::new(), - first_nonces: HashMap::new(), } } @@ -217,16 +214,18 @@ impl TransactionQueue { /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, txs: &[H256]) { + pub fn remove_all(&mut self, txs: &[H256], fetch_nonce: T) + where T: Fn(&Address) -> U256 { for tx in txs { - self.remove(&tx); + self.remove(&tx, &fetch_nonce); } } /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, hash: &H256) { + pub fn remove(&mut self, hash: &H256, fetch_nonce: &T) + where T: Fn(&Address) -> U256 { let transaction = self.by_hash.remove(hash); if transaction.is_none() { // We don't know this transaction @@ -249,11 +248,10 @@ impl TransactionQueue { if !self.current.by_address.has_row(&sender) { // Clear last & first nonces self.last_nonces.remove(&sender); - self.first_nonces.remove(&sender); return; } - // Let's find those with higher nonce (TODO [todr] optimize?) + // Let's find those with higher nonce that should be moved to future (TODO [todr] optimize?) let to_move_to_future = { let row_map = self.current.by_address.row(&sender).unwrap(); let mut to_future = Vec::new(); @@ -271,23 +269,17 @@ impl TransactionQueue { } } - // Update first_nonces and last_nonces + // Update last_nonces if highest == U256::zero() { self.last_nonces.remove(&sender); } else { self.last_nonces.insert(sender.clone(), highest); } - if lowest == nonce { - self.first_nonces.remove(&sender); - } else { - self.first_nonces.insert(sender.clone(), lowest); - } - - // return to future to_future }; + // Move to future for k in to_move_to_future { if let Some(v) = self.current.drop(&sender, &k) { // TODO [todr] Recalculate height? @@ -295,6 +287,8 @@ impl TransactionQueue { } } self.future.enforce_limit(&self.by_hash); + + // But maybe some transactions } /// Returns top transactions from the queue @@ -313,7 +307,6 @@ impl TransactionQueue { self.future.clear(); self.by_hash.clear(); self.last_nonces.clear(); - self.first_nonces.clear(); } fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { @@ -344,9 +337,10 @@ impl TransactionQueue { let nonce = tx.nonce(); let address = tx.sender(); - let next_nonce = U256::one() + self.last_nonces + let next_nonce = self.last_nonces .get(&address) .cloned() + .map(|n| n + U256::one()) .unwrap_or_else(|| fetch_nonce(&address)); // Check height @@ -363,20 +357,15 @@ impl TransactionQueue { return; } - let first_nonce = self.first_nonces - .get(&address) - .cloned() - .unwrap_or_else(|| nonce.clone()); - - let order = TransactionOrder::for_transaction(&tx, first_nonce); + let base_nonce = fetch_nonce(&address); + let order = TransactionOrder::for_transaction(&tx, base_nonce); // Insert to by_hash self.by_hash.insert(tx.hash(), tx); // Insert to current self.current.insert(address.clone(), nonce, order); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce); - self.first_nonces.insert(address.clone(), first_nonce); + let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit self.current.enforce_limit(&self.by_hash); @@ -414,7 +403,7 @@ mod test { } fn default_nonce(_address: &Address) -> U256 { - U256::from(122) + U256::from(123) } fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) { @@ -554,8 +543,8 @@ mod test { assert_eq!(txq2.status().future, 1); // when - txq2.remove(&tx.hash()); - txq2.remove(&tx2.hash()); + txq2.remove(&tx.hash(), &default_nonce); + txq2.remove(&tx2.hash(), &default_nonce); // then @@ -577,7 +566,7 @@ mod test { assert_eq!(txq.status().pending, 3); // when - txq.remove(&tx.hash()); + txq.remove(&tx.hash(), &default_nonce); // then let stats = txq.status(); @@ -645,7 +634,7 @@ mod test { fn should_drop_transactions_with_old_nonces() { let mut txq = TransactionQueue::new(); let tx = new_tx(); - let last_nonce = tx.nonce.clone(); + let last_nonce = tx.nonce.clone() + U256::one(); let fetch_last_nonce = |_a: &Address| last_nonce; // when @@ -667,7 +656,7 @@ mod test { assert_eq!(txq.status().pending, 2); // when - txq.remove(&tx1.hash()); + txq.remove(&tx1.hash(), &default_nonce); assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); txq.add(tx1.clone(), &default_nonce); @@ -676,7 +665,28 @@ mod test { let stats = txq.status(); assert_eq!(stats.future, 0); assert_eq!(stats.pending, 2); + } + #[test] + fn should_not_move_to_future_if_state_nonce_is_higher() { + // given + let next_nonce = |a: &Address| default_nonce(a) + U256::one(); + let mut txq = TransactionQueue::new(); + let (tx, tx2) = new_txs(U256::from(1)); + let tx3 = new_tx(); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx3.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce); + assert_eq!(txq.status().pending, 3); + + // when + txq.remove(&tx.hash(), &next_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); } } From 677c3996b9aca31debd3059b1e3e575dce99ffb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 16:09:05 +0100 Subject: [PATCH 2/3] Taking expected nonce from state into consideration when removing txs --- sync/src/transaction_queue.rs | 79 ++++++++++++++++------------------- 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index a12d75ed4..4f5622a2f 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -34,13 +34,18 @@ struct TransactionOrder { } impl TransactionOrder { - pub fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self { + fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self { TransactionOrder { nonce_height: tx.nonce() - base_nonce, gas_price: tx.transaction.gas_price, hash: tx.hash(), } } + + fn update_height(mut self, nonce: U256, base_nonce: U256) -> Self { + self.nonce_height = nonce - base_nonce; + self + } } impl Eq for TransactionOrder {} @@ -235,6 +240,7 @@ impl TransactionQueue { let sender = transaction.sender(); let nonce = transaction.nonce(); + println!("Removing tx: {:?}", transaction.transaction); // Remove from future self.future.drop(&sender, &nonce); @@ -244,51 +250,35 @@ impl TransactionQueue { return; } - // Are there any other transactions from this sender? - if !self.current.by_address.has_row(&sender) { - // Clear last & first nonces - self.last_nonces.remove(&sender); - return; - } + // Let's remove transactions where tx.nonce < current_nonce + // and if there are any future transactions matching current_nonce+1 - move to current + let current_nonce = fetch_nonce(&sender); + // We will either move transaction to future or remove it completely + // so there will be no transactions from this sender in current + self.last_nonces.remove(&sender); - // Let's find those with higher nonce that should be moved to future (TODO [todr] optimize?) - let to_move_to_future = { - let row_map = self.current.by_address.row(&sender).unwrap(); - let mut to_future = Vec::new(); - let mut highest = U256::zero(); - let mut lowest = nonce.clone(); - - // Search nonces to remove and track lowest and highest - for (current_nonce, _) in row_map.iter() { - if current_nonce > &nonce { - to_future.push(current_nonce.clone()); - } else if current_nonce > &highest { - highest = current_nonce.clone(); - } else if current_nonce < &lowest { - lowest = current_nonce.clone(); - } - } - - // Update last_nonces - if highest == U256::zero() { - self.last_nonces.remove(&sender); - } else { - self.last_nonces.insert(sender.clone(), highest); - } - - to_future + let all_nonces_from_sender = match self.current.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], }; - // Move to future - for k in to_move_to_future { - if let Some(v) = self.current.drop(&sender, &k) { - // TODO [todr] Recalculate height? - self.future.insert(sender.clone(), k, v); + for k in all_nonces_from_sender { + // Goes to future or is removed + let order = self.current.drop(&sender, &k).unwrap(); + if k >= current_nonce { + println!("Moving to future: {:?}", order); + self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + } else { + self.by_hash.remove(&order.hash); } } self.future.enforce_limit(&self.by_hash); - // But maybe some transactions + // And now lets check if there is some chain of transactions in future + // that should be placed in current + if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) { + self.last_nonces.insert(sender, new_current_top); + } } /// Returns top transactions from the queue @@ -310,6 +300,7 @@ impl TransactionQueue { } fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { + println!("Moving from future for: {:?} base: {:?}", current_nonce, first_nonce); let mut current_nonce = current_nonce + U256::one(); { let by_nonce = self.future.by_address.row_mut(&address); @@ -321,9 +312,9 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current - let transaction = self.by_hash.get(&order.hash).expect("TransactionQueue Inconsistency"); - let order = TransactionOrder::for_transaction(transaction, first_nonce); - self.current.insert(address.clone(), transaction.nonce(), order); + println!("Moved: {:?}", order); + let order = order.update_height(current_nonce.clone(), first_nonce); + self.current.insert(address.clone(), current_nonce, order); current_nonce = current_nonce + U256::one(); } } @@ -340,9 +331,9 @@ impl TransactionQueue { let next_nonce = self.last_nonces .get(&address) .cloned() - .map(|n| n + U256::one()) - .unwrap_or_else(|| fetch_nonce(&address)); + .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); + println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); // Check height if nonce > next_nonce { let order = TransactionOrder::for_transaction(&tx, next_nonce); From bcaed67eaa0257c8998925287fd2dbc40df12698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 16:48:10 +0100 Subject: [PATCH 3/3] Swapping order of inserting block to chain and commiting to DB to avoid race conditions --- ethcore/src/client.rs | 12 +++++++----- sync/src/transaction_queue.rs | 4 ---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index d442a3d88..fdcd6c057 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -329,18 +329,14 @@ impl Client { bad_blocks.insert(header.hash()); continue; } - let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { bad_blocks.insert(header.hash()); break; } - - // Insert block - let closed_block = closed_block.unwrap(); - self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone()); good_blocks.push(header.hash()); + // Are we committing an era? let ancient = if header.number() >= HISTORY { let n = header.number() - HISTORY; let chain = self.chain.read().unwrap(); @@ -350,10 +346,16 @@ impl Client { }; // Commit results + let closed_block = closed_block.unwrap(); + let receipts = closed_block.block().receipts().clone(); closed_block.drain() .commit(header.number(), &header.hash(), ancient) .expect("State DB commit failed."); + // And update the chain + self.chain.write().unwrap() + .insert_block(&block.bytes, receipts); + self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index f551fe435..83665dfda 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -240,7 +240,6 @@ impl TransactionQueue { let sender = transaction.sender(); let nonce = transaction.nonce(); - println!("Removing tx: {:?}", transaction.transaction); // Remove from future self.future.drop(&sender, &nonce); @@ -266,7 +265,6 @@ impl TransactionQueue { // Goes to future or is removed let order = self.current.drop(&sender, &k).unwrap(); if k >= current_nonce { - println!("Moving to future: {:?}", order); self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); } else { self.by_hash.remove(&order.hash); @@ -310,7 +308,6 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current - println!("Moved: {:?}", order); let order = order.update_height(current_nonce.clone(), first_nonce); self.current.insert(address.clone(), current_nonce, order); current_nonce = current_nonce + U256::one(); @@ -331,7 +328,6 @@ impl TransactionQueue { .cloned() .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); - println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); // Check height if nonce > next_nonce { let order = TransactionOrder::for_transaction(&tx, next_nonce);