diff --git a/Cargo.lock b/Cargo.lock
index bca236813..e558606eb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -259,6 +259,7 @@ dependencies = [
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
]
diff --git a/sync/Cargo.toml b/sync/Cargo.toml
index 26a7d463c..2ce65ca77 100644
--- a/sync/Cargo.toml
+++ b/sync/Cargo.toml
@@ -16,6 +16,7 @@ env_logger = "0.3"
time = "0.1.34"
rand = "0.3.13"
heapsize = "0.3"
+rustc-serialize = "0.3"
[features]
default = []
diff --git a/sync/src/lib.rs b/sync/src/lib.rs
index 6f28fc320..74541660d 100644
--- a/sync/src/lib.rs
+++ b/sync/src/lib.rs
@@ -70,6 +70,8 @@ use io::NetSyncIo;
mod chain;
mod io;
mod range_collection;
+// TODO [todr] Made public to suppress dead code warnings
+pub mod transaction_queue;
#[cfg(test)]
mod tests;
diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs
new file mode 100644
index 000000000..341607afe
--- /dev/null
+++ b/sync/src/transaction_queue.rs
@@ -0,0 +1,622 @@
+// Copyright 2015, 2016 Ethcore (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+// TODO [todr] - own transactions should have higher priority
+
+//! Transaction Queue
+
+use std::vec::Vec;
+use std::cmp::{Ordering};
+use std::collections::{HashMap, BTreeSet};
+use util::uint::{Uint, U256};
+use util::hash::{Address};
+use util::table::*;
+use ethcore::transaction::*;
+
+
+#[derive(Clone, Debug)]
+struct VerifiedTransaction {
+ tx: SignedTransaction,
+ nonce_height: U256
+}
+
+impl VerifiedTransaction {
+ pub fn new(tx: SignedTransaction, nonce_height: U256) -> VerifiedTransaction {
+ VerifiedTransaction {
+ tx: tx,
+ nonce_height: nonce_height
+ }
+ }
+
+ pub fn sender(&self) -> Address {
+ self.tx.sender().unwrap()
+ }
+}
+
+impl Eq for VerifiedTransaction {}
+impl PartialEq for VerifiedTransaction {
+ fn eq(&self, other: &VerifiedTransaction) -> bool {
+ self.cmp(other) == Ordering::Equal
+ }
+}
+impl PartialOrd for VerifiedTransaction {
+ fn partial_cmp(&self, other: &VerifiedTransaction) -> Option {
+ Some(self.cmp(other))
+ }
+}
+impl Ord for VerifiedTransaction {
+ fn cmp(&self, b: &VerifiedTransaction) -> Ordering {
+ // First check nonce_height
+ if self.nonce_height != b.nonce_height {
+ return self.nonce_height.cmp(&b.nonce_height);
+ }
+
+ // Then compare gas_prices
+ let a_gas = self.tx.gas_price;
+ let b_gas = b.tx.gas_price;
+ if a_gas != b_gas {
+ return a_gas.cmp(&b_gas);
+ }
+
+ // Compare nonce
+ let a_nonce = self.tx.nonce;
+ let b_nonce = b.tx.nonce;
+ if a_nonce != b_nonce {
+ return a_nonce.cmp(&b_nonce);
+ }
+
+ // and senders
+ let a_sender = self.sender();
+ let b_sender = b.sender();
+ a_sender.cmp(&b_sender)
+ }
+}
+
+struct TransactionsByPriorityAndAddress {
+ priority: BTreeSet,
+ address: Table,
+ limit: usize,
+}
+
+impl TransactionsByPriorityAndAddress {
+ fn insert(&mut self, address: Address, nonce: U256, verified_tx: VerifiedTransaction) {
+ self.priority.insert(verified_tx.clone());
+ self.address.insert(address, nonce, verified_tx);
+ }
+
+ fn enforce_limit(&mut self) {
+ let len = self.priority.len();
+ if len <= self.limit {
+ return;
+ }
+
+ let to_remove : Vec = {
+ self.priority
+ .iter()
+ .skip(self.limit)
+ .map(|v_tx| v_tx.tx.clone())
+ .collect()
+ };
+
+ for tx in to_remove {
+ self.remove(&tx);
+ }
+ }
+
+ fn remove_by_address(&mut self, sender: &Address, nonce: &U256) -> Option {
+ if let Some(verified_tx) = self.address.remove(sender, nonce) {
+ self.priority.remove(&verified_tx);
+ return Some(verified_tx);
+ }
+ None
+ }
+
+ fn remove(&mut self, tx: &SignedTransaction) -> Option {
+ // First find the transaction by address
+ let address = tx.sender().unwrap();
+ self.remove_by_address(&address, &tx.nonce)
+ }
+
+ fn clear(&mut self) {
+ self.priority.clear();
+ self.address.clear();
+ }
+}
+
+#[derive(Debug)]
+/// Current status of the queue
+pub struct TransactionQueueStatus {
+ /// Number of pending transactions (ready to go to block)
+ pub pending: usize,
+ /// Number of future transactions (waiting for transactions with lower nonces first)
+ pub future: usize,
+}
+
+/// TransactionQueue implementation
+pub struct TransactionQueue {
+ /// Priority queue for transactions that can go to block
+ current: TransactionsByPriorityAndAddress,
+ /// Priority queue for transactions that has been received but are not yet valid to go to block
+ future: TransactionsByPriorityAndAddress,
+ /// Last nonce of transaction in current
+ last_nonces: HashMap,
+ /// First nonce of transaction in current (used to determine priority)
+ first_nonces: HashMap,
+}
+
+impl TransactionQueue {
+ /// Creates new instance of this Queue
+ pub fn new() -> Self {
+ Self::with_limits(1024, 1024)
+ }
+
+ /// Create new instance of this Queue with specified limits
+ pub fn with_limits(current_limit: usize, future_limit: usize) -> Self {
+ let current = TransactionsByPriorityAndAddress {
+ address: Table::new(),
+ priority: BTreeSet::new(),
+ limit: current_limit,
+ };
+ let future = TransactionsByPriorityAndAddress {
+ address: Table::new(),
+ priority: BTreeSet::new(),
+ limit: future_limit,
+ };
+
+ TransactionQueue {
+ current: current,
+ future: future,
+ last_nonces: HashMap::new(),
+ first_nonces: HashMap::new(),
+ }
+ }
+
+ /// Returns current status for this queue
+ pub fn status(&self) -> TransactionQueueStatus {
+ TransactionQueueStatus {
+ pending: self.current.priority.len(),
+ future: self.future.priority.len(),
+ }
+ }
+
+ /// Adds all signed transactions to queue to be verified and imported
+ pub fn add_all(&mut self, txs: Vec, fetch_nonce: T)
+ where T: Fn(&Address) -> U256 {
+ for tx in txs.into_iter() {
+ self.add(tx, &fetch_nonce);
+ }
+ }
+
+ /// Add signed transaction to queue to be verified and imported
+ pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T)
+ where T: Fn(&Address) -> U256 {
+ self.import_tx(tx, fetch_nonce);
+ }
+
+ /// Removes all transactions in given slice
+ ///
+ /// If gap is introduced marks subsequent transactions as future
+ pub fn remove_all(&mut self, txs: &[SignedTransaction]) {
+ for tx in txs {
+ self.remove(&tx);
+ }
+ }
+
+ /// Removes transaction from queue.
+ ///
+ /// If gap is introduced marks subsequent transactions as future
+ pub fn remove(&mut self, tx: &SignedTransaction) {
+ // Remove from current
+ let removed = self.current.remove(tx);
+ if let Some(verified_tx) = removed {
+ let sender = verified_tx.sender();
+
+ // Are there any other transactions from this sender?
+ if !self.current.address.has_row(&sender) {
+ // Clear last & first nonces
+ self.last_nonces.remove(&sender);
+ self.first_nonces.remove(&sender);
+ return;
+ }
+
+ // Let's find those with higher nonce (TODO [todr] optimize?)
+ let to_move_to_future = {
+ let row_map = self.current.address.row(&sender).unwrap();
+ let tx_nonce = verified_tx.tx.nonce;
+ let mut to_future = Vec::new();
+ let mut highest = U256::zero();
+ let mut lowest = tx_nonce.clone();
+
+ // Search nonces to remove and track lowest and highest
+ for (nonce, _) in row_map.iter() {
+ if nonce > &tx_nonce {
+ to_future.push(nonce.clone());
+ } else if nonce > &highest {
+ highest = nonce.clone();
+ } else if nonce < &lowest {
+ lowest = nonce.clone();
+ }
+ }
+
+ // Update first_nonces and last_nonces
+ if highest == U256::zero() {
+ self.last_nonces.remove(&sender);
+ } else {
+ self.last_nonces.insert(sender.clone(), highest);
+ }
+
+ if lowest == tx_nonce {
+ self.first_nonces.remove(&sender);
+ } else {
+ self.first_nonces.insert(sender.clone(), lowest);
+ }
+
+ // return to future
+ to_future
+ };
+
+ for k in to_move_to_future {
+ if let Some(v) = self.current.remove_by_address(&sender, &k) {
+ self.future.insert(sender.clone(), v.tx.nonce, v);
+ }
+ }
+ self.future.enforce_limit();
+ return;
+ }
+
+ // Remove from future
+ {
+ let sender = tx.sender().unwrap();
+ if let Some(_) = self.future.remove_by_address(&sender, &tx.nonce) {
+ return;
+ }
+ }
+ }
+
+ /// Returns top transactions from the queue
+ pub fn top_transactions(&self, size: usize) -> Vec {
+ self.current.priority
+ .iter()
+ .take(size)
+ .map(|t| t.tx.clone()).collect()
+ }
+
+ /// Removes all elements (in any state) from the queue
+ pub fn clear(&mut self) {
+ self.current.clear();
+ self.future.clear();
+ self.last_nonces.clear();
+ self.first_nonces.clear();
+ }
+
+ fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option {
+ let mut current_nonce = current_nonce + U256::one();
+ {
+ let txs_by_nonce = self.future.address.row_mut(&address);
+ if let None = txs_by_nonce {
+ return None;
+ }
+ let mut txs_by_nonce = txs_by_nonce.unwrap();
+
+ while let Some(tx) = txs_by_nonce.remove(¤t_nonce) {
+ // remove also from priority
+ self.future.priority.remove(&tx);
+ // Put to current
+ let height = current_nonce - first_nonce;
+ let verified_tx = VerifiedTransaction::new(tx.tx, U256::from(height));
+ self.current.insert(address.clone(), verified_tx.tx.nonce, verified_tx);
+ current_nonce = current_nonce + U256::one();
+ }
+ }
+ self.future.address.clear_if_empty(&address);
+ // Returns last inserted nonce
+ Some(current_nonce - U256::one())
+ }
+
+ fn import_tx(&mut self, tx: SignedTransaction, fetch_nonce: &T)
+ where T: Fn(&Address) -> U256 {
+ let nonce = tx.nonce;
+ let address = tx.sender().unwrap();
+
+ let next_nonce = U256::one() + self.last_nonces
+ .get(&address)
+ .cloned()
+ .unwrap_or_else(|| fetch_nonce(&address));
+
+ // Check height
+ if nonce > next_nonce {
+ let height = nonce - next_nonce;
+ let verified_tx = VerifiedTransaction::new(tx, height);
+ // We have a gap - put to future
+ self.future.insert(address, nonce, verified_tx);
+ self.future.enforce_limit();
+ return;
+ } else if next_nonce > nonce {
+ // Droping transaction
+ return;
+ }
+
+ let first_nonce = self.first_nonces
+ .get(&address)
+ .cloned()
+ .unwrap_or_else(|| nonce.clone());
+
+ let height = nonce - first_nonce;
+ let verified_tx = VerifiedTransaction::new(tx, height);
+ // Insert to current
+ self.current.insert(address.clone(), nonce, verified_tx);
+ // But maybe there are some more items waiting in future?
+ let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce);
+ self.first_nonces.insert(address.clone(), first_nonce);
+ self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce));
+ // Enforce limit
+ self.current.enforce_limit();
+ }
+}
+
+#[cfg(test)]
+mod test {
+ extern crate rustc_serialize;
+ use self::rustc_serialize::hex::FromHex;
+
+ use util::crypto::KeyPair;
+ use util::uint::{U256, Uint};
+ use util::hash::{Address};
+ use ethcore::transaction::*;
+ use super::*;
+
+ fn new_unsigned_tx(nonce: U256) -> Transaction {
+ Transaction {
+ action: Action::Create,
+ value: U256::from(100),
+ data: "3331600055".from_hex().unwrap(),
+ gas: U256::from(100_000),
+ gas_price: U256::one(),
+ nonce: nonce
+ }
+ }
+
+ fn new_tx() -> SignedTransaction {
+ let keypair = KeyPair::create().unwrap();
+ new_unsigned_tx(U256::from(123)).sign(&keypair.secret())
+ }
+
+ fn default_nonce(_address: &Address) -> U256 {
+ U256::from(122)
+ }
+
+ fn new_txs(second_nonce: U256) -> (SignedTransaction, SignedTransaction) {
+ let keypair = KeyPair::create().unwrap();
+ let secret = &keypair.secret();
+ let nonce = U256::from(123);
+ let tx = new_unsigned_tx(nonce);
+ let tx2 = new_unsigned_tx(nonce + second_nonce);
+
+ (tx.sign(secret), tx2.sign(secret))
+ }
+
+ #[test]
+ fn should_import_tx() {
+ // given
+ let mut txq = TransactionQueue::new();
+ let tx = new_tx();
+
+ // when
+ txq.add(tx, &default_nonce);
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.pending, 1);
+ }
+
+ #[test]
+ fn should_import_txs_from_same_sender() {
+ // given
+ let mut txq = TransactionQueue::new();
+
+ let (tx, tx2) = new_txs(U256::from(1));
+
+ // when
+ txq.add(tx.clone(), &default_nonce);
+ txq.add(tx2.clone(), &default_nonce);
+
+ // then
+ let top = txq.top_transactions(5);
+ assert_eq!(top[0], tx);
+ assert_eq!(top[1], tx2);
+ assert_eq!(top.len(), 2);
+ }
+
+ #[test]
+ fn should_put_transaction_to_futures_if_gap_detected() {
+ // given
+ let mut txq = TransactionQueue::new();
+
+ let (tx, tx2) = new_txs(U256::from(2));
+
+ // when
+ txq.add(tx.clone(), &default_nonce);
+ txq.add(tx2.clone(), &default_nonce);
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.pending, 1);
+ assert_eq!(stats.future, 1);
+ let top = txq.top_transactions(5);
+ assert_eq!(top.len(), 1);
+ assert_eq!(top[0], tx);
+ }
+
+ #[test]
+ fn should_move_transactions_if_gap_filled() {
+ // given
+ let mut txq = TransactionQueue::new();
+ let kp = KeyPair::create().unwrap();
+ let secret = kp.secret();
+ let tx = new_unsigned_tx(U256::from(123)).sign(&secret);
+ let tx1 = new_unsigned_tx(U256::from(124)).sign(&secret);
+ let tx2 = new_unsigned_tx(U256::from(125)).sign(&secret);
+
+ txq.add(tx, &default_nonce);
+ assert_eq!(txq.status().pending, 1);
+ txq.add(tx2, &default_nonce);
+ assert_eq!(txq.status().future, 1);
+
+ // when
+ txq.add(tx1, &default_nonce);
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.pending, 3);
+ assert_eq!(stats.future, 0);
+ }
+
+ #[test]
+ fn should_remove_transaction() {
+ // given
+ let mut txq2 = TransactionQueue::new();
+ let (tx, tx2) = new_txs(U256::from(3));
+ txq2.add(tx.clone(), &default_nonce);
+ txq2.add(tx2.clone(), &default_nonce);
+ assert_eq!(txq2.status().pending, 1);
+ assert_eq!(txq2.status().future, 1);
+
+ // when
+ txq2.remove(&tx);
+ txq2.remove(&tx2);
+
+
+ // then
+ let stats = txq2.status();
+ assert_eq!(stats.pending, 0);
+ assert_eq!(stats.future, 0);
+ }
+
+ #[test]
+ fn should_move_transactions_to_future_if_gap_introduced() {
+ // given
+ let mut txq = TransactionQueue::new();
+ let (tx, tx2) = new_txs(U256::from(1));
+ let tx3 = new_tx();
+ txq.add(tx2.clone(), &default_nonce);
+ assert_eq!(txq.status().future, 1);
+ txq.add(tx3.clone(), &default_nonce);
+ txq.add(tx.clone(), &default_nonce);
+ assert_eq!(txq.status().pending, 3);
+
+ // when
+ txq.remove(&tx);
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.future, 1);
+ assert_eq!(stats.pending, 1);
+ }
+
+ #[test]
+ fn should_clear_queue() {
+ // given
+ let mut txq = TransactionQueue::new();
+ let (tx, tx2) = new_txs(U256::one());
+
+ // add
+ txq.add(tx2.clone(), &default_nonce);
+ txq.add(tx.clone(), &default_nonce);
+ let stats = txq.status();
+ assert_eq!(stats.pending, 2);
+
+ // when
+ txq.clear();
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.pending, 0);
+ }
+
+ #[test]
+ fn should_drop_old_transactions_when_hitting_the_limit() {
+ // given
+ let mut txq = TransactionQueue::with_limits(1, 1);
+ let (tx, tx2) = new_txs(U256::one());
+ txq.add(tx.clone(), &default_nonce);
+ assert_eq!(txq.status().pending, 1);
+
+ // when
+ txq.add(tx2.clone(), &default_nonce);
+
+ // then
+ let t = txq.top_transactions(2);
+ assert_eq!(txq.status().pending, 1);
+ assert_eq!(t.len(), 1);
+ assert_eq!(t[0], tx);
+ }
+
+ #[test]
+ fn should_limit_future_transactions() {
+ let mut txq = TransactionQueue::with_limits(10, 1);
+ let (tx1, tx2) = new_txs(U256::from(4));
+ let (tx3, tx4) = new_txs(U256::from(4));
+ txq.add(tx1.clone(), &default_nonce);
+ txq.add(tx3.clone(), &default_nonce);
+ assert_eq!(txq.status().pending, 2);
+
+ // when
+ txq.add(tx2.clone(), &default_nonce);
+ assert_eq!(txq.status().future, 1);
+ txq.add(tx4.clone(), &default_nonce);
+
+ // then
+ assert_eq!(txq.status().future, 1);
+ }
+
+ #[test]
+ fn should_drop_transactions_with_old_nonces() {
+ let mut txq = TransactionQueue::new();
+ let tx = new_tx();
+ let last_nonce = tx.nonce.clone();
+ let fetch_last_nonce = |_a: &Address| last_nonce;
+
+ // when
+ txq.add(tx, &fetch_last_nonce);
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.pending, 0);
+ assert_eq!(stats.future, 0);
+ }
+
+ #[test]
+ fn should_accept_same_transaction_twice() {
+ // given
+ let mut txq = TransactionQueue::new();
+ let (tx1, tx2) = new_txs(U256::from(1));
+ txq.add(tx1.clone(), &default_nonce);
+ txq.add(tx2.clone(), &default_nonce);
+ assert_eq!(txq.status().pending, 2);
+
+ // when
+ txq.remove(&tx1);
+ assert_eq!(txq.status().future, 1);
+ txq.add(tx1.clone(), &default_nonce);
+
+ // then
+ let stats = txq.status();
+ assert_eq!(stats.pending, 2);
+ assert_eq!(stats.future, 0);
+
+ }
+
+}