Merge branch 'tx_queue_integration' into tx_queue_rpc

This commit is contained in:
Tomasz Drwięga
2016-03-04 12:34:34 +01:00
66 changed files with 1379 additions and 710 deletions

View File

@@ -487,19 +487,19 @@ impl ChainSync {
// TODO: Decompose block and add to self.headers and self.bodies instead
if header.number == From::from(self.current_base_block() + 1) {
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(ImportError::AlreadyInChain) => {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h);
},
Err(ImportError::AlreadyQueued) => {
Err(Error::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
self.last_imported_block = Some(header.number);
trace!(target: "sync", "New block queued {:?}", h);
},
Err(ImportError::UnknownParent) => {
Err(Error::Block(BlockError::UnknownParent(p))) => {
unknown = true;
trace!(target: "sync", "New block with unknown parent {:?}", h);
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
},
Err(e) => {
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
@@ -645,16 +645,7 @@ impl ChainSync {
match self.last_imported_block { None => 0, Some(x) => x }
}
/// Find some headers or blocks to download for a peer.
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) {
self.clear_peer_download(peer_id);
if io.chain().queue_info().is_full() {
self.pause_sync();
return;
}
// check to see if we need to download any block bodies first
fn find_block_bodies_hashes_to_request(&self, ignore_others: bool) -> (Vec<H256>, Vec<BlockNumber>) {
let mut needed_bodies: Vec<H256> = Vec::new();
let mut needed_numbers: Vec<BlockNumber> = Vec::new();
@@ -674,74 +665,88 @@ impl ChainSync {
}
}
}
(needed_bodies, needed_numbers)
}
/// Find some headers or blocks to download for a peer.
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) {
self.clear_peer_download(peer_id);
if io.chain().queue_info().is_full() {
self.pause_sync();
return;
}
// check to see if we need to download any block bodies first
let (needed_bodies, needed_numbers) = self.find_block_bodies_hashes_to_request(ignore_others);
if !needed_bodies.is_empty() {
let (head, _) = self.headers.range_iter().next().unwrap();
if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head);
self.request_blocks(io, peer_id, true);
return;
} else {
self.downloading_bodies.extend(needed_numbers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers);
self.request_bodies(io, peer_id, needed_bodies);
}
return;
}
// check if need to download headers
let mut start = 0;
if !self.have_common_block {
// download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info();
start = chain_info.best_block_number;
if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
}
if start == 0 {
self.have_common_block = true; //reached genesis
self.last_imported_hash = Some(chain_info.genesis_hash);
self.last_imported_block = Some(0);
}
}
if self.have_common_block {
let mut headers: Vec<BlockNumber> = Vec::new();
let mut prev = self.current_base_block() + 1;
let head = self.headers.range_iter().next().map(|(h, _)| h);
for (next, ref items) in self.headers.range_iter() {
if !headers.is_empty() {
break;
}
if next <= prev {
prev = next + items.len() as BlockNumber;
continue;
}
let mut block = prev;
while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) {
headers.push(block as BlockNumber);
}
block += 1;
}
prev = next + items.len() as BlockNumber;
}
if !headers.is_empty() {
start = headers[0];
if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap());
self.request_blocks(io, peer_id, true);
return;
}
let count = headers.len();
self.downloading_headers.extend(headers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers);
assert!(!self.headers.have_item(&start));
self.request_headers_by_number(io, peer_id, start, count, 0, false);
}
self.downloading_bodies.extend(needed_numbers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers);
self.request_bodies(io, peer_id, needed_bodies);
}
else {
// check if need to download headers
let mut start = 0;
if !self.have_common_block {
// download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info();
start = chain_info.best_block_number;
if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
}
if start == 0 {
self.have_common_block = true; //reached genesis
self.last_imported_hash = Some(chain_info.genesis_hash);
self.last_imported_block = Some(0);
}
}
if self.have_common_block {
let mut headers: Vec<BlockNumber> = Vec::new();
let mut prev = self.current_base_block() + 1;
let head = self.headers.range_iter().next().map(|(h, _)| h);
for (next, ref items) in self.headers.range_iter() {
if !headers.is_empty() {
break;
}
if next <= prev {
prev = next + items.len() as BlockNumber;
continue;
}
let mut block = prev;
while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) {
headers.push(block as BlockNumber);
}
block += 1;
}
prev = next + items.len() as BlockNumber;
}
if !headers.is_empty() {
start = headers[0];
if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap());
self.request_blocks(io, peer_id, true);
return;
}
let count = headers.len();
self.downloading_headers.extend(headers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers);
assert!(!self.headers.have_item(&start));
self.request_headers_by_number(io, peer_id, start, count, 0, false);
}
}
else {
// continue search for common block
self.downloading_headers.insert(start);
self.request_headers_by_number(io, peer_id, start, 1, 0, false);
}
// continue search for common block
self.downloading_headers.insert(start);
self.request_headers_by_number(io, peer_id, start, 1, 0, false);
}
}
@@ -791,12 +796,12 @@ impl ChainSync {
}
match io.chain().import_block(block_rlp.out()) {
Err(ImportError::AlreadyInChain) => {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "Block already in chain {:?}", h);
self.last_imported_block = Some(headers.0 + i as BlockNumber);
self.last_imported_hash = Some(h.clone());
},
Err(ImportError::AlreadyQueued) => {
Err(Error::Import(ImportError::AlreadyQueued)) => {
trace!(target: "sync", "Block already queued {:?}", h);
self.last_imported_block = Some(headers.0 + i as BlockNumber);
self.last_imported_hash = Some(h.clone());
@@ -1189,8 +1194,8 @@ impl ChainSync {
.collect::<Vec<_>>()
}
/// propagades latest block to lagging peers
fn propagade_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
let updated_peers = {
let lagging_peers = self.get_lagging_peers(io);
@@ -1216,8 +1221,8 @@ impl ChainSync {
sent
}
/// propagades new known hashes to all peers
fn propagade_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
/// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(io);
let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash();
@@ -1252,8 +1257,8 @@ impl ChainSync {
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
let chain = io.chain().chain_info();
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagade_blocks(&chain.best_block_hash, chain.best_block_number, io);
let hashes = self.propagade_new_hashes(&chain.best_block_hash, chain.best_block_number, io);
let blocks = self.propagate_blocks(&chain.best_block_hash, chain.best_block_number, io);
let hashes = self.propagate_new_hashes(&chain.best_block_hash, chain.best_block_number, io);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
@@ -1277,7 +1282,8 @@ impl ChainSync {
good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.remove_all(&txs);
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes);
});
bad.for_each(|txs| {
// populate sender
@@ -1465,7 +1471,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagade_new_hashes(&best_hash, best_number, &mut io);
let peer_count = sync.propagate_new_hashes(&best_hash, best_number, &mut io);
// 1 message should be send
assert_eq!(1, io.queue.len());
@@ -1485,7 +1491,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagade_blocks(&best_hash, best_number, &mut io);
let peer_count = sync.propagate_blocks(&best_hash, best_number, &mut io);
// 1 message should be send
assert_eq!(1, io.queue.len());
@@ -1591,7 +1597,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagade_new_hashes(&best_hash, best_number, &mut io);
sync.propagate_new_hashes(&best_hash, best_number, &mut io);
let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data));
@@ -1610,7 +1616,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagade_blocks(&best_hash, best_number, &mut io);
sync.propagate_blocks(&best_hash, best_number, &mut io);
let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data));

View File

@@ -122,7 +122,7 @@ fn status_packet() {
}
#[test]
fn propagade_hashes() {
fn propagate_hashes() {
let mut net = TestNet::new(6);
net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle);
net.sync();
@@ -148,7 +148,7 @@ fn propagade_hashes() {
}
#[test]
fn propagade_blocks() {
fn propagate_blocks() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle);
net.sync();

View File

@@ -18,121 +18,126 @@
//! Transaction Queue
use std::vec::Vec;
use std::cmp::{Ordering};
use std::collections::{HashMap, BTreeSet};
use util::uint::{Uint, U256};
use util::hash::{Address};
use util::numbers::{Uint, U256};
use util::hash::{Address, H256};
use util::table::*;
use ethcore::transaction::*;
#[derive(Clone, Debug)]
struct VerifiedTransaction {
tx: SignedTransaction,
nonce_height: U256
struct TransactionOrder {
nonce_height: U256,
gas_price: U256,
hash: H256,
}
impl VerifiedTransaction {
pub fn new(tx: SignedTransaction, nonce_height: U256) -> VerifiedTransaction {
VerifiedTransaction {
tx: tx,
nonce_height: nonce_height
impl TransactionOrder {
pub fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self {
TransactionOrder {
nonce_height: tx.nonce() - base_nonce,
gas_price: tx.transaction.gas_price,
hash: tx.hash(),
}
}
pub fn sender(&self) -> Address {
self.tx.sender().unwrap()
}
}
impl Eq for VerifiedTransaction {}
impl PartialEq for VerifiedTransaction {
fn eq(&self, other: &VerifiedTransaction) -> bool {
impl Eq for TransactionOrder {}
impl PartialEq for TransactionOrder {
fn eq(&self, other: &TransactionOrder) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl PartialOrd for VerifiedTransaction {
fn partial_cmp(&self, other: &VerifiedTransaction) -> Option<Ordering> {
impl PartialOrd for TransactionOrder {
fn partial_cmp(&self, other: &TransactionOrder) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for VerifiedTransaction {
fn cmp(&self, b: &VerifiedTransaction) -> Ordering {
impl Ord for TransactionOrder {
fn cmp(&self, b: &TransactionOrder) -> Ordering {
// First check nonce_height
if self.nonce_height != b.nonce_height {
return self.nonce_height.cmp(&b.nonce_height);
}
// Then compare gas_prices
let a_gas = self.tx.gas_price;
let b_gas = b.tx.gas_price;
let a_gas = self.gas_price;
let b_gas = b.gas_price;
if a_gas != b_gas {
return a_gas.cmp(&b_gas);
}
// Compare nonce
let a_nonce = self.tx.nonce;
let b_nonce = b.tx.nonce;
if a_nonce != b_nonce {
return a_nonce.cmp(&b_nonce);
}
// and senders
let a_sender = self.sender();
let b_sender = b.sender();
a_sender.cmp(&b_sender)
// Compare hashes
self.hash.cmp(&b.hash)
}
}
struct TransactionsByPriorityAndAddress {
priority: BTreeSet<VerifiedTransaction>,
address: Table<Address, U256, VerifiedTransaction>,
struct VerifiedTransaction {
transaction: SignedTransaction
}
impl VerifiedTransaction {
fn new(transaction: SignedTransaction) -> Self {
VerifiedTransaction {
transaction: transaction
}
}
fn hash(&self) -> H256 {
self.transaction.hash()
}
fn nonce(&self) -> U256 {
self.transaction.nonce
}
fn sender(&self) -> Address {
self.transaction.sender().unwrap()
}
}
struct TransactionSet {
by_priority: BTreeSet<TransactionOrder>,
by_address: Table<Address, U256, TransactionOrder>,
limit: usize,
}
impl TransactionsByPriorityAndAddress {
fn insert(&mut self, address: Address, nonce: U256, verified_tx: VerifiedTransaction) {
self.priority.insert(verified_tx.clone());
self.address.insert(address, nonce, verified_tx);
impl TransactionSet {
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) {
self.by_priority.insert(order.clone());
self.by_address.insert(sender, nonce, order);
}
fn enforce_limit(&mut self) {
let len = self.priority.len();
fn enforce_limit(&mut self, by_hash: &HashMap<H256, VerifiedTransaction>) {
let len = self.by_priority.len();
if len <= self.limit {
return;
}
let to_remove : Vec<SignedTransaction> = {
self.priority
let to_drop : Vec<&VerifiedTransaction> = {
self.by_priority
.iter()
.skip(self.limit)
.map(|v_tx| v_tx.tx.clone())
.map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected."))
.collect()
};
for tx in to_remove {
self.remove(&tx);
for tx in to_drop {
self.drop(&tx.sender(), &tx.nonce());
}
}
fn remove_by_address(&mut self, sender: &Address, nonce: &U256) -> Option<VerifiedTransaction> {
if let Some(verified_tx) = self.address.remove(sender, nonce) {
self.priority.remove(&verified_tx);
return Some(verified_tx);
fn drop(&mut self, sender: &Address, nonce: &U256) -> Option<TransactionOrder> {
if let Some(tx_order) = self.by_address.remove(sender, nonce) {
self.by_priority.remove(&tx_order);
return Some(tx_order);
}
None
}
fn remove(&mut self, tx: &SignedTransaction) -> Option<VerifiedTransaction> {
// First find the transaction by address
let address = tx.sender().unwrap();
self.remove_by_address(&address, &tx.nonce)
}
fn clear(&mut self) {
self.priority.clear();
self.address.clear();
self.by_priority.clear();
self.by_address.clear();
}
}
@@ -148,9 +153,11 @@ pub struct TransactionQueueStatus {
/// TransactionQueue implementation
pub struct TransactionQueue {
/// Priority queue for transactions that can go to block
current: TransactionsByPriorityAndAddress,
current: TransactionSet,
/// Priority queue for transactions that has been received but are not yet valid to go to block
future: TransactionsByPriorityAndAddress,
future: TransactionSet,
/// All transactions managed by queue indexed by hash
by_hash: HashMap<H256, VerifiedTransaction>,
/// Last nonce of transaction in current
last_nonces: HashMap<Address, U256>,
/// First nonce of transaction in current (used to determine priority)
@@ -165,20 +172,21 @@ impl TransactionQueue {
/// Create new instance of this Queue with specified limits
pub fn with_limits(current_limit: usize, future_limit: usize) -> Self {
let current = TransactionsByPriorityAndAddress {
address: Table::new(),
priority: BTreeSet::new(),
let current = TransactionSet {
by_priority: BTreeSet::new(),
by_address: Table::new(),
limit: current_limit,
};
let future = TransactionsByPriorityAndAddress {
address: Table::new(),
priority: BTreeSet::new(),
let future = TransactionSet {
by_priority: BTreeSet::new(),
by_address: Table::new(),
limit: future_limit,
};
TransactionQueue {
current: current,
future: future,
by_hash: HashMap::new(),
last_nonces: HashMap::new(),
first_nonces: HashMap::new(),
}
@@ -187,8 +195,8 @@ impl TransactionQueue {
/// Returns current status for this queue
pub fn status(&self) -> TransactionQueueStatus {
TransactionQueueStatus {
pending: self.current.priority.len(),
future: self.future.priority.len(),
pending: self.current.by_priority.len(),
future: self.future.by_priority.len(),
}
}
@@ -203,101 +211,107 @@ impl TransactionQueue {
/// Add signed transaction to queue to be verified and imported
pub fn add<T>(&mut self, tx: SignedTransaction, fetch_nonce: &T)
where T: Fn(&Address) -> U256 {
self.import_tx(tx, fetch_nonce);
self.import_tx(VerifiedTransaction::new(tx), fetch_nonce);
}
/// Removes all transactions in given slice
/// Removes all transactions identified by hashes given in slice
///
/// If gap is introduced marks subsequent transactions as future
pub fn remove_all(&mut self, txs: &[SignedTransaction]) {
for tx in txs {
self.remove(&tx);
pub fn remove_all(&mut self, transaction_hashes: &[H256]) {
for transaction_hash in transaction_hashes {
self.remove(&transaction_hash);
}
}
/// Removes transaction from queue.
/// Removes transaction identified by hashes from queue.
///
/// If gap is introduced marks subsequent transactions as future
pub fn remove(&mut self, tx: &SignedTransaction) {
pub fn remove(&mut self, transaction_hash: &H256) {
let transaction = self.by_hash.remove(transaction_hash);
if transaction.is_none() {
// We don't know this transaction
return;
}
let transaction = transaction.unwrap();
let sender = transaction.sender();
let nonce = transaction.nonce();
// Remove from future
self.future.drop(&sender, &nonce);
// Remove from current
let removed = self.current.remove(tx);
if let Some(verified_tx) = removed {
let sender = verified_tx.sender();
// Are there any other transactions from this sender?
if !self.current.address.has_row(&sender) {
// Clear last & first nonces
self.last_nonces.remove(&sender);
self.first_nonces.remove(&sender);
return;
}
// Let's find those with higher nonce (TODO [todr] optimize?)
let to_move_to_future = {
let row_map = self.current.address.row(&sender).unwrap();
let tx_nonce = verified_tx.tx.nonce;
let mut to_future = Vec::new();
let mut highest = U256::zero();
let mut lowest = tx_nonce.clone();
// Search nonces to remove and track lowest and highest
for (nonce, _) in row_map.iter() {
if nonce > &tx_nonce {
to_future.push(nonce.clone());
} else if nonce > &highest {
highest = nonce.clone();
} else if nonce < &lowest {
lowest = nonce.clone();
}
}
// Update first_nonces and last_nonces
if highest == U256::zero() {
self.last_nonces.remove(&sender);
} else {
self.last_nonces.insert(sender.clone(), highest);
}
if lowest == tx_nonce {
self.first_nonces.remove(&sender);
} else {
self.first_nonces.insert(sender.clone(), lowest);
}
// return to future
to_future
};
for k in to_move_to_future {
if let Some(v) = self.current.remove_by_address(&sender, &k) {
self.future.insert(sender.clone(), v.tx.nonce, v);
}
}
self.future.enforce_limit();
let order = self.current.drop(&sender, &nonce);
if order.is_none() {
return;
}
// Remove from future
{
let sender = tx.sender().unwrap();
if let Some(_) = self.future.remove_by_address(&sender, &tx.nonce) {
return;
// Are there any other transactions from this sender?
if !self.current.by_address.has_row(&sender) {
// Clear last & first nonces
self.last_nonces.remove(&sender);
self.first_nonces.remove(&sender);
return;
}
// Let's find those with higher nonce (TODO [todr] optimize?)
let to_move_to_future = {
let row_map = self.current.by_address.row(&sender).unwrap();
let mut to_future = Vec::new();
let mut highest = U256::zero();
let mut lowest = nonce.clone();
// Search nonces to remove and track lowest and highest
for (current_nonce, _) in row_map.iter() {
if current_nonce > &nonce {
to_future.push(current_nonce.clone());
} else if current_nonce > &highest {
highest = current_nonce.clone();
} else if current_nonce < &lowest {
lowest = current_nonce.clone();
}
}
// Update first_nonces and last_nonces
if highest == U256::zero() {
self.last_nonces.remove(&sender);
} else {
self.last_nonces.insert(sender.clone(), highest);
}
if lowest == nonce {
self.first_nonces.remove(&sender);
} else {
self.first_nonces.insert(sender.clone(), lowest);
}
// return to future
to_future
};
for k in to_move_to_future {
if let Some(v) = self.current.drop(&sender, &k) {
// TODO [todr] Recalculate height?
self.future.insert(sender.clone(), k, v);
}
}
self.future.enforce_limit(&self.by_hash);
}
/// Returns top transactions from the queue
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
self.current.priority
self.current.by_priority
.iter()
.take(size)
.map(|t| t.tx.clone()).collect()
.map(|t| self.by_hash.get(&t.hash).expect("Transaction Queue Inconsistency"))
.map(|t| t.transaction.clone())
.collect()
}
/// Removes all elements (in any state) from the queue
pub fn clear(&mut self) {
self.current.clear();
self.future.clear();
self.by_hash.clear();
self.last_nonces.clear();
self.first_nonces.clear();
}
@@ -305,31 +319,30 @@ impl TransactionQueue {
fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option<U256> {
let mut current_nonce = current_nonce + U256::one();
{
let txs_by_nonce = self.future.address.row_mut(&address);
if let None = txs_by_nonce {
let by_nonce = self.future.by_address.row_mut(&address);
if let None = by_nonce {
return None;
}
let mut txs_by_nonce = txs_by_nonce.unwrap();
while let Some(tx) = txs_by_nonce.remove(&current_nonce) {
// remove also from priority
self.future.priority.remove(&tx);
let mut by_nonce = by_nonce.unwrap();
while let Some(order) = by_nonce.remove(&current_nonce) {
// remove also from priority and hash
self.future.by_priority.remove(&order);
// Put to current
let height = current_nonce - first_nonce;
let verified_tx = VerifiedTransaction::new(tx.tx, U256::from(height));
self.current.insert(address.clone(), verified_tx.tx.nonce, verified_tx);
let transaction = self.by_hash.get(&order.hash).expect("TransactionQueue Inconsistency");
let order = TransactionOrder::for_transaction(transaction, first_nonce);
self.current.insert(address.clone(), transaction.nonce(), order);
current_nonce = current_nonce + U256::one();
}
}
self.future.address.clear_if_empty(&address);
self.future.by_address.clear_if_empty(&address);
// Returns last inserted nonce
Some(current_nonce - U256::one())
}
fn import_tx<T>(&mut self, tx: SignedTransaction, fetch_nonce: &T)
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
where T: Fn(&Address) -> U256 {
let nonce = tx.nonce;
let address = tx.sender().unwrap();
let nonce = tx.nonce();
let address = tx.sender();
let next_nonce = U256::one() + self.last_nonces
.get(&address)
@@ -338,11 +351,12 @@ impl TransactionQueue {
// Check height
if nonce > next_nonce {
let height = nonce - next_nonce;
let verified_tx = VerifiedTransaction::new(tx, height);
let order = TransactionOrder::for_transaction(&tx, next_nonce);
// Insert to by_hash
self.by_hash.insert(tx.hash(), tx);
// We have a gap - put to future
self.future.insert(address, nonce, verified_tx);
self.future.enforce_limit();
self.future.insert(address, nonce, order);
self.future.enforce_limit(&self.by_hash);
return;
} else if next_nonce > nonce {
// Droping transaction
@@ -355,29 +369,34 @@ impl TransactionQueue {
.cloned()
.unwrap_or_else(|| nonce.clone());
let height = nonce - first_nonce;
let verified_tx = VerifiedTransaction::new(tx, height);
let order = TransactionOrder::for_transaction(&tx, first_nonce);
// Insert to by_hash
self.by_hash.insert(tx.hash(), tx);
// Insert to current
self.current.insert(address.clone(), nonce, verified_tx);
self.current.insert(address.clone(), nonce, order);
// But maybe there are some more items waiting in future?
let new_last_nonce = self.move_future_txs(address.clone(), nonce, first_nonce);
self.first_nonces.insert(address.clone(), first_nonce);
self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce));
// Enforce limit
self.current.enforce_limit();
self.current.enforce_limit(&self.by_hash);
}
}
#[cfg(test)]
mod test {
extern crate rustc_serialize;
use self::rustc_serialize::hex::FromHex;
use std::collections::{HashMap, BTreeSet};
use util::crypto::KeyPair;
use util::uint::{U256, Uint};
use util::numbers::{U256, Uint};
use util::hash::{Address};
use util::table::*;
use ethcore::transaction::*;
use super::*;
use super::{TransactionSet, TransactionOrder, VerifiedTransaction};
fn new_unsigned_tx(nonce: U256) -> Transaction {
Transaction {
@@ -409,6 +428,46 @@ mod test {
(tx.sign(secret), tx2.sign(secret))
}
#[test]
fn should_create_transaction_set() {
// given
let mut set = TransactionSet {
by_priority: BTreeSet::new(),
by_address: Table::new(),
limit: 1
};
let (tx1, tx2) = new_txs(U256::from(1));
let tx1 = VerifiedTransaction::new(tx1);
let tx2 = VerifiedTransaction::new(tx2);
let by_hash = {
let mut x = HashMap::new();
let tx1 = VerifiedTransaction::new(tx1.transaction.clone());
let tx2 = VerifiedTransaction::new(tx2.transaction.clone());
x.insert(tx1.hash(), tx1);
x.insert(tx2.hash(), tx2);
x
};
// Insert both transactions
let order1 = TransactionOrder::for_transaction(&tx1, U256::zero());
set.insert(tx1.sender(), tx1.nonce(), order1.clone());
let order2 = TransactionOrder::for_transaction(&tx2, U256::zero());
set.insert(tx2.sender(), tx2.nonce(), order2.clone());
assert_eq!(set.by_priority.len(), 2);
assert_eq!(set.by_address.len(), 2);
// when
set.enforce_limit(&by_hash);
// then
assert_eq!(set.by_priority.len(), 1);
assert_eq!(set.by_address.len(), 1);
assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1);
set.clear();
assert_eq!(set.by_priority.len(), 0);
assert_eq!(set.by_address.len(), 0);
}
#[test]
fn should_import_tx() {
// given
@@ -496,8 +555,8 @@ mod test {
assert_eq!(txq2.status().future, 1);
// when
txq2.remove(&tx);
txq2.remove(&tx2);
txq2.remove(&tx.hash());
txq2.remove(&tx2.hash());
// then
@@ -519,7 +578,7 @@ mod test {
assert_eq!(txq.status().pending, 3);
// when
txq.remove(&tx);
txq.remove(&tx.hash());
// then
let stats = txq.status();
@@ -609,14 +668,15 @@ mod test {
assert_eq!(txq.status().pending, 2);
// when
txq.remove(&tx1);
txq.remove(&tx1.hash());
assert_eq!(txq.status().pending, 0);
assert_eq!(txq.status().future, 1);
txq.add(tx1.clone(), &default_nonce);
// then
let stats = txq.status();
assert_eq!(stats.pending, 2);
assert_eq!(stats.future, 0);
assert_eq!(stats.pending, 2);
}