diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 73c1c9cf9..182234280 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -292,6 +292,7 @@ impl Miner { }; let mut invalid_transactions = HashSet::new(); + let mut transactions_to_penalize = HashSet::new(); let block_number = open_block.block().fields().header.number(); // TODO: push new uncles, too. for tx in transactions { @@ -299,6 +300,12 @@ impl Miner { match open_block.push_transaction(tx, None) { Err(Error::Execution(ExecutionError::BlockGasLimitReached { gas_limit, gas_used, gas })) => { debug!(target: "miner", "Skipping adding transaction to block because of gas limit: {:?} (limit: {:?}, used: {:?}, gas: {:?})", hash, gas_limit, gas_used, gas); + + // Penalize transaction if it's above current gas limit + if gas > gas_limit { + transactions_to_penalize.insert(hash); + } + // Exit early if gas left is smaller then min_tx_gas let min_tx_gas: U256 = 21000.into(); // TODO: figure this out properly. if gas_limit - gas_used < min_tx_gas { @@ -334,6 +341,9 @@ impl Miner { for hash in invalid_transactions.into_iter() { queue.remove_invalid(&hash, &fetch_account); } + for hash in transactions_to_penalize { + queue.penalize(&hash); + } } (block, original_work_hash) } diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index af054aa98..7db65eacb 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -134,6 +134,8 @@ struct TransactionOrder { hash: H256, /// Origin of the transaction origin: TransactionOrigin, + /// Penalties + penalties: usize, } @@ -144,6 +146,7 @@ impl TransactionOrder { gas_price: tx.transaction.gas_price, hash: tx.hash(), origin: tx.origin, + penalties: 0, } } @@ -151,6 +154,11 @@ impl TransactionOrder { self.nonce_height = nonce - base_nonce; self } + + fn penalize(mut self) -> Self { + self.penalties = self.penalties.saturating_add(1); + self + } } impl Eq for TransactionOrder {} @@ -167,6 +175,11 @@ impl PartialOrd for TransactionOrder { impl Ord for TransactionOrder { fn cmp(&self, b: &TransactionOrder) -> Ordering { + // First check number of penalties + if self.penalties != b.penalties { + return self.penalties.cmp(&b.penalties); + } + // First check nonce_height if self.nonce_height != b.nonce_height { return self.nonce_height.cmp(&b.nonce_height); @@ -387,7 +400,7 @@ pub struct AccountDetails { } /// Transactions with `gas > (gas_limit + gas_limit * Factor(in percents))` are not imported to the queue. -const GAS_LIMIT_HYSTERESIS: usize = 10; // % +const GAS_LIMIT_HYSTERESIS: usize = 10; // (100/GAS_LIMIT_HYSTERESIS) % /// `TransactionQueue` implementation pub struct TransactionQueue { @@ -506,8 +519,6 @@ impl TransactionQueue { pub fn add(&mut self, tx: SignedTransaction, fetch_account: &T, origin: TransactionOrigin) -> Result where T: Fn(&Address) -> AccountDetails { - trace!(target: "txqueue", "Importing: {:?}", tx.hash()); - if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local { trace!(target: "txqueue", "Dropping transaction below minimal gas price threshold: {:?} (gp: {} < {})", @@ -593,6 +604,39 @@ impl TransactionQueue { assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len()); } + /// Penalize transactions from sender of transaction with given hash. + /// I.e. it should change the priority of the transaction in the queue. + /// + /// NOTE: We need to penalize all transactions from particular sender + /// to avoid breaking invariants in queue (ordered by nonces). + /// Consecutive transactions from this sender would fail otherwise (because of invalid nonce). + pub fn penalize(&mut self, transaction_hash: &H256) { + let transaction = match self.by_hash.get(transaction_hash) { + None => return, + Some(t) => t, + }; + let sender = transaction.sender(); + + // Penalize all transactions from this sender + let nonces_from_sender = match self.current.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], + }; + for k in nonces_from_sender { + let order = self.current.drop(&sender, &k).unwrap(); + self.current.insert(sender, k, order.penalize()); + } + // Same thing for future + let nonces_from_sender = match self.future.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], + }; + for k in nonces_from_sender { + let order = self.future.drop(&sender, &k).unwrap(); + self.current.insert(sender, k, order.penalize()); + } + } + /// Removes invalid transaction identified by hash from queue. /// Assumption is that this transaction nonce is not related to client nonce, /// so transactions left in queue are processed according to client nonce. @@ -764,6 +808,7 @@ impl TransactionQueue { let address = tx.sender(); let nonce = tx.nonce(); + let hash = tx.hash(); let next_nonce = self.last_nonces .get(&address) @@ -785,6 +830,9 @@ impl TransactionQueue { try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, &mut self.future, &mut self.by_hash))); // Return an error if this transaction is not imported because of limit. try!(check_if_removed(&address, &nonce, self.future.enforce_limit(&mut self.by_hash))); + + debug!(target: "txqueue", "Importing transaction to future: {:?}", hash); + debug!(target: "txqueue", "status: {:?}", self.status()); return Ok(TransactionImportResult::Future); } try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash))); @@ -811,7 +859,8 @@ impl TransactionQueue { // Trigger error if the transaction we are importing was removed. try!(check_if_removed(&address, &nonce, removed)); - trace!(target: "txqueue", "status: {:?}", self.status()); + debug!(target: "txqueue", "Imported transaction to current: {:?}", hash); + debug!(target: "txqueue", "status: {:?}", self.status()); Ok(TransactionImportResult::Current) } @@ -945,6 +994,17 @@ mod test { (tx1.sign(secret), tx2.sign(secret)) } + /// Returns two consecutive transactions, both with increased gas price + fn new_tx_pair_with_gas_price_increment(gas_price_increment: U256) -> (SignedTransaction, SignedTransaction) { + let gas = default_gas_price() + gas_price_increment; + let tx1 = new_unsigned_tx(default_nonce(), gas); + let tx2 = new_unsigned_tx(default_nonce() + 1.into(), gas); + + let keypair = Random.generate().unwrap(); + let secret = &keypair.secret(); + (tx1.sign(secret), tx2.sign(secret)) + } + fn new_tx_pair_default(nonce_increment: U256, gas_price_increment: U256) -> (SignedTransaction, SignedTransaction) { new_tx_pair(default_nonce(), default_gas_price(), nonce_increment, gas_price_increment) } @@ -1332,6 +1392,39 @@ mod test { assert_eq!(top.len(), 2); } + #[test] + fn should_penalize_transactions_from_sender() { + // given + let mut txq = TransactionQueue::new(); + // txa, txb - slightly bigger gas price to have consistent ordering + let (txa, txb) = new_tx_pair_default(1.into(), 0.into()); + let (tx1, tx2) = new_tx_pair_with_gas_price_increment(3.into()); + + // insert everything + txq.add(txa.clone(), &default_account_details, TransactionOrigin::External).unwrap(); + txq.add(txb.clone(), &default_account_details, TransactionOrigin::External).unwrap(); + txq.add(tx1.clone(), &default_account_details, TransactionOrigin::External).unwrap(); + txq.add(tx2.clone(), &default_account_details, TransactionOrigin::External).unwrap(); + + let top = txq.top_transactions(); + assert_eq!(top[0], tx1); + assert_eq!(top[1], txa); + assert_eq!(top[2], tx2); + assert_eq!(top[3], txb); + assert_eq!(top.len(), 4); + + // when + txq.penalize(&tx1.hash()); + + // then + let top = txq.top_transactions(); + assert_eq!(top[0], txa); + assert_eq!(top[1], txb); + assert_eq!(top[2], tx1); + assert_eq!(top[3], tx2); + assert_eq!(top.len(), 4); + } + #[test] fn should_return_pending_hashes() { // given diff --git a/rpc/src/v1/impls/ethcore.rs b/rpc/src/v1/impls/ethcore.rs index b9f8c4bf5..220ead3dd 100644 --- a/rpc/src/v1/impls/ethcore.rs +++ b/rpc/src/v1/impls/ethcore.rs @@ -30,7 +30,7 @@ use ethcore::client::{MiningBlockChainClient}; use jsonrpc_core::*; use v1::traits::Ethcore; -use v1::types::{Bytes, U256, H160, H512, Peers}; +use v1::types::{Bytes, U256, H160, H512, Peers, Transaction}; use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings}; use v1::helpers::params::expect_no_params; @@ -226,4 +226,11 @@ impl Ethcore for EthcoreClient where M: MinerService + Ok(to_value(&Bytes::from(s))) }) } + + fn pending_transactions(&self, params: Params) -> Result { + try!(self.active()); + try!(expect_no_params(params)); + + Ok(to_value(&take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::>())) + } } diff --git a/rpc/src/v1/tests/mocked/ethcore.rs b/rpc/src/v1/tests/mocked/ethcore.rs index a9d0c4e1d..811ccced4 100644 --- a/rpc/src/v1/tests/mocked/ethcore.rs +++ b/rpc/src/v1/tests/mocked/ethcore.rs @@ -286,3 +286,18 @@ fn rpc_ethcore_unsigned_transactions_count_when_signer_disabled() { assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } + +#[test] +fn rpc_ethcore_pending_transactions() { + let miner = miner_service(); + let client = client_service(); + let sync = sync_provider(); + let net = network_service(); + let io = IoHandler::new(); + io.add_delegate(ethcore_client(&client, &miner, &sync, &net).to_delegate()); + + let request = r#"{"jsonrpc": "2.0", "method": "ethcore_pendingTransactions", "params":[], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":[],"id":1}"#; + + assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); +} diff --git a/rpc/src/v1/traits/ethcore.rs b/rpc/src/v1/traits/ethcore.rs index 90661129b..56c27534a 100644 --- a/rpc/src/v1/traits/ethcore.rs +++ b/rpc/src/v1/traits/ethcore.rs @@ -80,6 +80,9 @@ pub trait Ethcore: Sized + Send + Sync + 'static { /// First parameter is the 512-byte destination public key, second is the message. fn encrypt_message(&self, _: Params) -> Result; + /// Returns all pending (current) transactions from transaction queue. + fn pending_transactions(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); @@ -103,7 +106,7 @@ pub trait Ethcore: Sized + Send + Sync + 'static { delegate.add_method("ethcore_phraseToAddress", Ethcore::phrase_to_address); delegate.add_method("ethcore_registryAddress", Ethcore::registry_address); delegate.add_method("ethcore_encryptMessage", Ethcore::encrypt_message); - + delegate.add_method("ethcore_pendingTransactions", Ethcore::pending_transactions); delegate } }