diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index c9afc4a2c..e838e24ca 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -23,33 +23,233 @@ //! accounts for which they create transactions, this queue is structured in an //! address-wise manner. -use ethcore::transaction::PendingTransaction; -use util::{Address, U256}; +use std::collections::{BTreeMap, HashMap}; +use std::collections::hash_map::Entry; + +use ethcore::transaction::{Condition, PendingTransaction, SignedTransaction}; +use util::{Address, U256, H256, H256FastMap}; + +// Knowledge of an account's current nonce. +#[derive(Debug, Clone, PartialEq, Eq)] +enum CurrentNonce { + // Assumed current nonce. + Assumed(U256), + // Known current nonce. + Known(U256), +} + +impl CurrentNonce { + // whether this nonce is assumed + fn is_assumed(&self) -> bool { + match *self { + CurrentNonce::Assumed(_) => true, + CurrentNonce::Known(_) => false, + } + } + + // whether this nonce is known for certain from an external source. + fn is_known(&self) -> bool { + !self.is_assumed() + } + + // the current nonce's value. + fn value(&self) -> &U256 { + match *self { + CurrentNonce::Assumed(ref val) => val, + CurrentNonce::Known(ref val) => val, + } + } +} + +// transactions associated with a specific account. +#[derive(Debug, Clone, PartialEq, Eq)] +struct AccountTransactions { + // believed current nonce (gotten from initial given TX or `cull` calls). + cur_nonce: CurrentNonce, + current: Vec, // ordered "current" transactions (cur_nonce onwards) + future: BTreeMap, // "future" transactions. +} + +impl AccountTransactions { + fn is_empty(&self) -> bool { + self.current.is_empty() && self.future.is_empty() + } + + fn next_nonce(&self) -> U256 { + self.current.last().map(|last| last.nonce) + .unwrap_or_else(|| *self.cur_nonce.value()) + 1.into() + } + + // attempt to move transactions from the future queue into the current queue. + fn adjust_future(&mut self) { + let mut next_nonce = self.next_nonce(); + + loop { + match self.future.remove(&next_nonce) { + Some(tx) => self.current.push(tx), + None => break, + } + + next_nonce = next_nonce + 1.into(); + } + } +} /// Light transaction queue. See module docs for more details. -pub struct TransactionQueue; +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct TransactionQueue { + by_account: HashMap, + by_hash: H256FastMap, +} impl TransactionQueue { /// Insert a pending transaction to be queued. pub fn insert(&mut self, tx: PendingTransaction) { - unimplemented!() + let sender = tx.sender(); + let hash = tx.hash(); + let nonce = tx.nonce; + + match self.by_account.entry(sender) { + Entry::Vacant(entry) => { + entry.insert(AccountTransactions { + cur_nonce: CurrentNonce::Assumed(nonce), + current: vec![tx.clone()], + future: BTreeMap::new(), + }); + } + Entry::Occupied(mut entry) => { + let acct_txs = entry.get_mut(); + if &nonce < acct_txs.cur_nonce.value() { + // don't accept txs from before known current nonce. + if acct_txs.cur_nonce.is_known() { return } + + // lower our assumption until corrected later. + acct_txs.cur_nonce = CurrentNonce::Assumed(nonce); + } + + match acct_txs.current.binary_search_by(|x| x.nonce.cmp(&nonce)) { + Ok(idx) => { + trace!(target: "txqueue", "Replacing existing transaction from {} with nonce {}", + sender, nonce); + + acct_txs.current[idx] = tx.clone(); + } + Err(idx) => { + let cur_len = acct_txs.current.len(); + let incr_nonce = nonce + 1.into(); + + // current is sorted with one tx per nonce, + // so if a tx with given nonce wasn't found that means it is either + // earlier in nonce than all other "current" transactions or later. + debug_assert!(idx == 0 || idx == cur_len); + + if idx == 0 && acct_txs.current.first().map_or(false, |f| f.nonce != incr_nonce) { + let old_cur = ::std::mem::replace(&mut acct_txs.current, vec![tx.clone()]); + + trace!(target: "txqueue", "Moving {} transactions with nonce > {} to future", + old_cur.len(), incr_nonce); + + for future in old_cur { + let future_nonce = future.nonce; + acct_txs.future.insert(future_nonce, future); + } + } else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) { + trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce); + let future_nonce = nonce; + acct_txs.future.insert(future_nonce, tx.clone()); + } else { + trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce); + + // insert, then check if we've filled any gaps. + acct_txs.current.insert(idx, tx.clone()); + acct_txs.adjust_future(); + } + } + } + } + } + + self.by_hash.insert(hash, tx); + } + + /// Get pending transaction by hash. + pub fn transaction(&self, hash: &H256) -> Option { + self.by_hash.get(hash).map(|tx| (&**tx).clone()) } /// Get the next nonce for a given address based on what's within the queue. - /// If the address has no queued transactions - pub fn next_nonce(&mut self, address: &Address) -> Option { - unimplemented!() + /// If the address has no queued transactions, then `None` will be returned + /// and the next nonce will have to be deduced via other means. + pub fn next_nonce(&self, address: &Address) -> Option { + self.by_account.get(address).map(AccountTransactions::next_nonce) } - /// Get pending transactions, ready to be propagated. + /// Get all transactions ready to be propagated. /// `best_block_number` and `best_block_timestamp` are used to filter out conditionally /// propagated transactions. - pub fn pending_transactions(&self, best_block_number: u64, best_block_timestamp: u64) -> Vec { - unimplemented!() + pub fn ready_transactions(&self, best_block_number: u64, best_block_timestamp: u64) -> Vec { + self.by_account.values().flat_map(|acct_txs| { + acct_txs.current.iter().take_while(|tx| match tx.condition { + None => true, + Some(Condition::Number(blk_num)) => blk_num >= best_block_number, + Some(Condition::Timestamp(time)) => time >= best_block_timestamp, + }).cloned() + }).collect() + } + + /// Addresses for which we store transactions. + pub fn queued_senders(&self) -> Vec
{ + self.by_account.keys().cloned().collect() } /// Cull out all transactions by the given address which are invalidated by the given nonce. - pub fn cull(&mut self, address: Address, last_nonce: U256) { - unimplemented!() + pub fn cull(&mut self, address: Address, cur_nonce: U256) { + let mut removed_hashes = vec![]; + if let Entry::Occupied(mut entry) = self.by_account.entry(address) { + { + let acct_txs = entry.get_mut(); + acct_txs.cur_nonce = CurrentNonce::Known(cur_nonce); + + // cull old "future" keys. + let old_future: Vec<_> = acct_txs.future.keys().take_while(|&&k| k < cur_nonce).cloned().collect(); + + for old in old_future { + let hash = acct_txs.future.remove(&old) + .expect("key extracted from keys iterator; known to exist; qed") + .hash(); + removed_hashes.push(hash); + } + + // then cull from "current". + let valid_pos = acct_txs.current.iter().position(|tx| tx.nonce >= cur_nonce); + match valid_pos { + None => + removed_hashes.extend(acct_txs.current.drain(..).map(|tx| tx.hash())), + Some(valid) => + removed_hashes.extend(acct_txs.current.drain(..valid).map(|tx| tx.hash())), + } + + // now try and move stuff out of future into current. + acct_txs.adjust_future(); + } + + if entry.get_mut().is_empty() { + trace!(target: "txqueue", "No more queued transactions for {} after nonce {}", + address, cur_nonce); + entry.remove(); + } + } + + trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})", + removed_hashes.len(), address, cur_nonce); + + for hash in removed_hashes { + self.by_hash.remove(&hash); + } } } + +#[cfg(test)] +mod tests { + +}