Transaction queue limited by gas (#2528)
This commit is contained in:
parent
5502340dea
commit
d27ecb6527
@ -211,7 +211,7 @@ impl Miner {
|
|||||||
pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option<Arc<AccountProvider>>) -> Arc<Miner> {
|
pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option<Arc<AccountProvider>>) -> Arc<Miner> {
|
||||||
let work_poster = if !options.new_work_notify.is_empty() { Some(WorkPoster::new(&options.new_work_notify)) } else { None };
|
let work_poster = if !options.new_work_notify.is_empty() { Some(WorkPoster::new(&options.new_work_notify)) } else { None };
|
||||||
let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(
|
let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(
|
||||||
options.tx_queue_strategy, options.tx_queue_size, options.tx_gas_limit
|
options.tx_queue_strategy, options.tx_queue_size, !U256::zero(), options.tx_gas_limit
|
||||||
)));
|
)));
|
||||||
Arc::new(Miner {
|
Arc::new(Miner {
|
||||||
transaction_queue: txq,
|
transaction_queue: txq,
|
||||||
@ -401,6 +401,8 @@ impl Miner {
|
|||||||
let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit();
|
let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit();
|
||||||
let mut queue = self.transaction_queue.lock();
|
let mut queue = self.transaction_queue.lock();
|
||||||
queue.set_gas_limit(gas_limit);
|
queue.set_gas_limit(gas_limit);
|
||||||
|
// Set total qx queue gas limit to be 2x the block gas limit.
|
||||||
|
queue.set_total_gas_limit(gas_limit << 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if we had to prepare new pending block
|
/// Returns true if we had to prepare new pending block
|
||||||
|
@ -279,6 +279,7 @@ struct TransactionSet {
|
|||||||
by_priority: BTreeSet<TransactionOrder>,
|
by_priority: BTreeSet<TransactionOrder>,
|
||||||
by_address: Table<Address, U256, TransactionOrder>,
|
by_address: Table<Address, U256, TransactionOrder>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
|
gas_limit: U256,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionSet {
|
impl TransactionSet {
|
||||||
@ -299,14 +300,18 @@ impl TransactionSet {
|
|||||||
/// It drops transactions from this set but also removes associated `VerifiedTransaction`.
|
/// It drops transactions from this set but also removes associated `VerifiedTransaction`.
|
||||||
/// Returns addresses and lowest nonces of transactions removed because of limit.
|
/// Returns addresses and lowest nonces of transactions removed because of limit.
|
||||||
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> Option<HashMap<Address, U256>> {
|
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> Option<HashMap<Address, U256>> {
|
||||||
let len = self.by_priority.len();
|
let mut count = 0;
|
||||||
if len <= self.limit {
|
let mut gas: U256 = 0.into();
|
||||||
return None;
|
|
||||||
}
|
|
||||||
let to_drop : Vec<(Address, U256)> = {
|
let to_drop : Vec<(Address, U256)> = {
|
||||||
self.by_priority
|
self.by_priority
|
||||||
.iter()
|
.iter()
|
||||||
.skip(self.limit)
|
.skip_while(|order| {
|
||||||
|
count = count + 1;
|
||||||
|
let r = gas.overflowing_add(order.gas);
|
||||||
|
if r.1 { return false }
|
||||||
|
gas = r.0;
|
||||||
|
count <= self.limit && gas <= self.gas_limit
|
||||||
|
})
|
||||||
.map(|order| by_hash.get(&order.hash)
|
.map(|order| by_hash.get(&order.hash)
|
||||||
.expect("All transactions in `self.by_priority` and `self.by_address` are kept in sync with `by_hash`."))
|
.expect("All transactions in `self.by_priority` and `self.by_address` are kept in sync with `by_hash`."))
|
||||||
.map(|tx| (tx.sender(), tx.nonce()))
|
.map(|tx| (tx.sender(), tx.nonce()))
|
||||||
@ -423,21 +428,23 @@ impl Default for TransactionQueue {
|
|||||||
impl TransactionQueue {
|
impl TransactionQueue {
|
||||||
/// Creates new instance of this Queue
|
/// Creates new instance of this Queue
|
||||||
pub fn new(strategy: PrioritizationStrategy) -> Self {
|
pub fn new(strategy: PrioritizationStrategy) -> Self {
|
||||||
Self::with_limits(strategy, 1024, !U256::zero())
|
Self::with_limits(strategy, 1024, !U256::zero(), !U256::zero())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create new instance of this Queue with specified limits
|
/// Create new instance of this Queue with specified limits
|
||||||
pub fn with_limits(strategy: PrioritizationStrategy, limit: usize, tx_gas_limit: U256) -> Self {
|
pub fn with_limits(strategy: PrioritizationStrategy, limit: usize, gas_limit: U256, tx_gas_limit: U256) -> Self {
|
||||||
let current = TransactionSet {
|
let current = TransactionSet {
|
||||||
by_priority: BTreeSet::new(),
|
by_priority: BTreeSet::new(),
|
||||||
by_address: Table::new(),
|
by_address: Table::new(),
|
||||||
limit: limit,
|
limit: limit,
|
||||||
|
gas_limit: gas_limit,
|
||||||
};
|
};
|
||||||
|
|
||||||
let future = TransactionSet {
|
let future = TransactionSet {
|
||||||
by_priority: BTreeSet::new(),
|
by_priority: BTreeSet::new(),
|
||||||
by_address: Table::new(),
|
by_address: Table::new(),
|
||||||
limit: limit,
|
limit: limit,
|
||||||
|
gas_limit: gas_limit,
|
||||||
};
|
};
|
||||||
|
|
||||||
TransactionQueue {
|
TransactionQueue {
|
||||||
@ -488,6 +495,13 @@ impl TransactionQueue {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets new total gas limit.
|
||||||
|
pub fn set_total_gas_limit(&mut self, gas_limit: U256) {
|
||||||
|
self.future.gas_limit = gas_limit;
|
||||||
|
self.current.gas_limit = gas_limit;
|
||||||
|
self.future.enforce_limit(&mut self.by_hash);
|
||||||
|
}
|
||||||
|
|
||||||
/// Set the new limit for the amount of gas any individual transaction may have.
|
/// Set the new limit for the amount of gas any individual transaction may have.
|
||||||
/// Any transaction already imported to the queue is not affected.
|
/// Any transaction already imported to the queue is not affected.
|
||||||
pub fn set_tx_gas_limit(&mut self, limit: U256) {
|
pub fn set_tx_gas_limit(&mut self, limit: U256) {
|
||||||
@ -797,6 +811,16 @@ impl TransactionQueue {
|
|||||||
let nonce = tx.nonce();
|
let nonce = tx.nonce();
|
||||||
let hash = tx.hash();
|
let hash = tx.hash();
|
||||||
|
|
||||||
|
{
|
||||||
|
// Rough size sanity check
|
||||||
|
let gas = &tx.transaction.gas;
|
||||||
|
if U256::from(tx.transaction.data.len()) > *gas {
|
||||||
|
// Droping transaction
|
||||||
|
trace!(target: "txqueue", "Dropping oversized transaction: {:?} (gas: {} < size {})", hash, gas, tx.transaction.data.len());
|
||||||
|
return Err(TransactionError::LimitReached);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// The transaction might be old, let's check that.
|
// The transaction might be old, let's check that.
|
||||||
// This has to be the first test, otherwise calculating
|
// This has to be the first test, otherwise calculating
|
||||||
// nonce height would result in overflow.
|
// nonce height would result in overflow.
|
||||||
@ -1049,7 +1073,8 @@ mod test {
|
|||||||
let mut set = TransactionSet {
|
let mut set = TransactionSet {
|
||||||
by_priority: BTreeSet::new(),
|
by_priority: BTreeSet::new(),
|
||||||
by_address: Table::new(),
|
by_address: Table::new(),
|
||||||
limit: 1
|
limit: 1,
|
||||||
|
gas_limit: !U256::zero(),
|
||||||
};
|
};
|
||||||
let (tx1, tx2) = new_txs(U256::from(1));
|
let (tx1, tx2) = new_txs(U256::from(1));
|
||||||
let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External).unwrap();
|
let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External).unwrap();
|
||||||
@ -1088,7 +1113,8 @@ mod test {
|
|||||||
let mut set = TransactionSet {
|
let mut set = TransactionSet {
|
||||||
by_priority: BTreeSet::new(),
|
by_priority: BTreeSet::new(),
|
||||||
by_address: Table::new(),
|
by_address: Table::new(),
|
||||||
limit: 1
|
limit: 1,
|
||||||
|
gas_limit: !U256::zero(),
|
||||||
};
|
};
|
||||||
// Create two transactions with same nonce
|
// Create two transactions with same nonce
|
||||||
// (same hash)
|
// (same hash)
|
||||||
@ -1668,7 +1694,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_drop_old_transactions_when_hitting_the_limit() {
|
fn should_drop_old_transactions_when_hitting_the_limit() {
|
||||||
// given
|
// given
|
||||||
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 1, !U256::zero());
|
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 1, !U256::zero(), !U256::zero());
|
||||||
let (tx, tx2) = new_txs(U256::one());
|
let (tx, tx2) = new_txs(U256::one());
|
||||||
let sender = tx.sender().unwrap();
|
let sender = tx.sender().unwrap();
|
||||||
let nonce = tx.nonce;
|
let nonce = tx.nonce;
|
||||||
@ -1690,7 +1716,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_return_correct_nonces_when_dropped_because_of_limit() {
|
fn should_return_correct_nonces_when_dropped_because_of_limit() {
|
||||||
// given
|
// given
|
||||||
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 2, !U256::zero());
|
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 2, !U256::zero(), !U256::zero());
|
||||||
let tx = new_tx();
|
let tx = new_tx();
|
||||||
let (tx1, tx2) = new_txs(U256::one());
|
let (tx1, tx2) = new_txs(U256::one());
|
||||||
let sender = tx1.sender().unwrap();
|
let sender = tx1.sender().unwrap();
|
||||||
@ -1711,7 +1737,7 @@ mod test {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_limit_future_transactions() {
|
fn should_limit_future_transactions() {
|
||||||
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 1, !U256::zero());
|
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 1, !U256::zero(), !U256::zero());
|
||||||
txq.current.set_limit(10);
|
txq.current.set_limit(10);
|
||||||
let (tx1, tx2) = new_txs_with_gas_price_diff(U256::from(4), U256::from(1));
|
let (tx1, tx2) = new_txs_with_gas_price_diff(U256::from(4), U256::from(1));
|
||||||
let (tx3, tx4) = new_txs_with_gas_price_diff(U256::from(4), U256::from(2));
|
let (tx3, tx4) = new_txs_with_gas_price_diff(U256::from(4), U256::from(2));
|
||||||
@ -1728,6 +1754,16 @@ mod test {
|
|||||||
assert_eq!(txq.status().future, 1);
|
assert_eq!(txq.status().future, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_limit_by_gas() {
|
||||||
|
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 100, default_gas_val() * U256::from(2), !U256::zero());
|
||||||
|
let (tx1, _) = new_txs_with_gas_price_diff(U256::from(4), U256::from(1));
|
||||||
|
let (tx3, _) = new_txs_with_gas_price_diff(U256::from(4), U256::from(2));
|
||||||
|
txq.add(tx1.clone(), &default_nonce, TransactionOrigin::External).unwrap();
|
||||||
|
txq.add(tx3.clone(), &default_nonce, TransactionOrigin::External).unwrap();
|
||||||
|
assert_eq!(txq.status().pending, 2);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_drop_transactions_with_old_nonces() {
|
fn should_drop_transactions_with_old_nonces() {
|
||||||
let mut txq = TransactionQueue::default();
|
let mut txq = TransactionQueue::default();
|
||||||
@ -1971,7 +2007,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_keep_right_order_in_future() {
|
fn should_keep_right_order_in_future() {
|
||||||
// given
|
// given
|
||||||
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 1, !U256::zero());
|
let mut txq = TransactionQueue::with_limits(PrioritizationStrategy::GasPriceOnly, 1, !U256::zero(), !U256::zero());
|
||||||
let (tx1, tx2) = new_txs(U256::from(1));
|
let (tx1, tx2) = new_txs(U256::from(1));
|
||||||
let prev_nonce = |a: &Address| AccountDetails { nonce: default_nonce(a).nonce - U256::one(), balance:
|
let prev_nonce = |a: &Address| AccountDetails { nonce: default_nonce(a).nonce - U256::one(), balance:
|
||||||
default_nonce(a).balance };
|
default_nonce(a).balance };
|
||||||
|
@ -121,6 +121,7 @@ const MAX_ROUND_PARENTS: usize = 32;
|
|||||||
const MAX_NEW_HASHES: usize = 64;
|
const MAX_NEW_HASHES: usize = 64;
|
||||||
const MAX_TX_TO_IMPORT: usize = 512;
|
const MAX_TX_TO_IMPORT: usize = 512;
|
||||||
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
|
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
|
||||||
|
const MAX_TRANSACTION_SIZE: usize = 300*1024;
|
||||||
|
|
||||||
const STATUS_PACKET: u8 = 0x00;
|
const STATUS_PACKET: u8 = 0x00;
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
||||||
@ -1079,7 +1080,12 @@ impl ChainSync {
|
|||||||
item_count = min(item_count, MAX_TX_TO_IMPORT);
|
item_count = min(item_count, MAX_TX_TO_IMPORT);
|
||||||
let mut transactions = Vec::with_capacity(item_count);
|
let mut transactions = Vec::with_capacity(item_count);
|
||||||
for i in 0 .. item_count {
|
for i in 0 .. item_count {
|
||||||
let tx = try!(r.at(i)).as_raw().to_vec();
|
let rlp = try!(r.at(i));
|
||||||
|
if rlp.as_raw().len() > MAX_TRANSACTION_SIZE {
|
||||||
|
debug!("Skipped oversized transaction of {} bytes", rlp.as_raw().len());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let tx = rlp.as_raw().to_vec();
|
||||||
transactions.push(tx);
|
transactions.push(tx);
|
||||||
}
|
}
|
||||||
io.chain().queue_transactions(transactions);
|
io.chain().queue_transactions(transactions);
|
||||||
|
Loading…
Reference in New Issue
Block a user