diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index a3a379463..02cd6678b 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -65,6 +65,10 @@ pub enum ExecutionError { #[derive(Debug)] /// Errors concerning transaction processing. pub enum TransactionError { + /// Transaction is already imported to the queue + AlreadyImported, + /// Transaction is not valid anymore (state already has higher nonce) + Old, /// Transaction's gas price is below threshold. InsufficientGasPrice { /// Minimal expected gas price diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 648e9b3b5..b79a13e24 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -79,7 +79,7 @@ pub trait MinerService : Send + Sync { fn status(&self) -> MinerStatus; /// Imports transactions to transaction queue. - fn import_transactions(&self, transactions: Vec, fetch_account: T) -> Result<(), Error> + fn import_transactions(&self, transactions: Vec, fetch_account: T) -> Vec> where T: Fn(&Address) -> AccountDetails; /// Returns hashes of transactions currently in pending @@ -92,7 +92,7 @@ pub trait MinerService : Send + Sync { fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]); /// New chain head event. Restart mining operation. - fn prepare_sealing(&self, chain: &BlockChainClient); + fn update_sealing(&self, chain: &BlockChainClient); /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex>; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 0307c56fc..022d436ca 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -95,42 +95,8 @@ impl Miner { self.transaction_queue.lock().unwrap().set_minimal_gas_price(min_gas_price); } - fn update_gas_limit(&self, chain: &BlockChainClient) { - let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit(); - let mut queue = self.transaction_queue.lock().unwrap(); - queue.set_gas_limit(gas_limit); - } -} - -impl MinerService for Miner { - - fn clear_and_reset(&self, chain: &BlockChainClient) { - self.transaction_queue.lock().unwrap().clear(); - self.prepare_sealing(chain); - } - - fn status(&self) -> MinerStatus { - let status = self.transaction_queue.lock().unwrap().status(); - let block = self.sealing_block.lock().unwrap(); - MinerStatus { - transactions_in_pending_queue: status.pending, - transactions_in_future_queue: status.future, - transactions_in_pending_block: block.as_ref().map_or(0, |b| b.transactions().len()), - } - } - - fn import_transactions(&self, transactions: Vec, fetch_account: T) -> Result<(), Error> - where T: Fn(&Address) -> AccountDetails { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(transactions, fetch_account) - } - - fn pending_transactions_hashes(&self) -> Vec { - let transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.pending_hashes() - } - - fn prepare_sealing(&self, chain: &BlockChainClient) { + /// Prepares new block for sealing including top transactions from queue. + pub fn prepare_sealing(&self, chain: &BlockChainClient) { let transactions = self.transaction_queue.lock().unwrap().top_transactions(); let b = chain.prepare_sealing( self.author(), @@ -152,6 +118,47 @@ impl MinerService for Miner { }); } + fn update_gas_limit(&self, chain: &BlockChainClient) { + let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit(); + let mut queue = self.transaction_queue.lock().unwrap(); + queue.set_gas_limit(gas_limit); + } +} + +impl MinerService for Miner { + + fn clear_and_reset(&self, chain: &BlockChainClient) { + self.transaction_queue.lock().unwrap().clear(); + self.update_sealing(chain); + } + + fn status(&self) -> MinerStatus { + let status = self.transaction_queue.lock().unwrap().status(); + let block = self.sealing_block.lock().unwrap(); + MinerStatus { + transactions_in_pending_queue: status.pending, + transactions_in_future_queue: status.future, + transactions_in_pending_block: block.as_ref().map_or(0, |b| b.transactions().len()), + } + } + + fn import_transactions(&self, transactions: Vec, fetch_account: T) -> Vec> + where T: Fn(&Address) -> AccountDetails { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(transactions, fetch_account) + } + + fn pending_transactions_hashes(&self) -> Vec { + let transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.pending_hashes() + } + + fn update_sealing(&self, chain: &BlockChainClient) { + if self.sealing_enabled.load(atomic::Ordering::Relaxed) { + self.prepare_sealing(chain); + } + } + fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex> { if self.sealing_block.lock().unwrap().is_none() { self.sealing_enabled.store(true, atomic::Ordering::Relaxed); @@ -239,9 +246,6 @@ impl MinerService for Miner { }); } - // Update mined block - if self.sealing_enabled.load(atomic::Ordering::Relaxed) { - self.prepare_sealing(chain); - } + self.update_sealing(chain); } } diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index 0ac3f8be2..a07395349 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -333,12 +333,12 @@ impl TransactionQueue { } /// Adds all signed transactions to queue to be verified and imported - pub fn add_all(&mut self, txs: Vec, fetch_account: T) -> Result<(), Error> + pub fn add_all(&mut self, txs: Vec, fetch_account: T) -> Vec> where T: Fn(&Address) -> AccountDetails { - for tx in txs.into_iter() { - try!(self.add(tx, &fetch_account)); - } - Ok(()) + + txs.into_iter() + .map(|tx| self.add(tx, &fetch_account)) + .collect() } /// Add signed transaction to queue to be verified and imported @@ -385,8 +385,7 @@ impl TransactionQueue { })); } - self.import_tx(vtx, account.nonce); - Ok(()) + self.import_tx(vtx, account.nonce).map_err(Error::Transaction) } /// Removes all transactions identified by hashes given in slice @@ -540,12 +539,14 @@ impl TransactionQueue { /// /// It ignores transactions that has already been imported (same `hash`) and replaces the transaction /// iff `(address, nonce)` is the same but `gas_price` is higher. - fn import_tx(&mut self, tx: VerifiedTransaction, state_nonce: U256) { + /// + /// Returns `true` when transaction was imported successfuly + fn import_tx(&mut self, tx: VerifiedTransaction, state_nonce: U256) -> Result<(), TransactionError> { if self.by_hash.get(&tx.hash()).is_some() { // Transaction is already imported. trace!(target: "miner", "Dropping already imported transaction: {:?}", tx.hash()); - return; + return Err(TransactionError::AlreadyImported); } @@ -562,11 +563,11 @@ impl TransactionQueue { // We have a gap - put to future Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash); self.future.enforce_limit(&mut self.by_hash); - return; + return Ok(()); } else if nonce < state_nonce { // Droping transaction trace!(target: "miner", "Dropping old transaction: {:?} (nonce: {} < {})", tx.hash(), nonce, next_nonce); - return; + return Err(TransactionError::Old); } Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash); @@ -576,6 +577,7 @@ impl TransactionQueue { self.current.enforce_limit(&mut self.by_hash); trace!(target: "miner", "status: {:?}", self.status()); + Ok(()) } /// Replaces transaction in given set (could be `future` or `current`). @@ -1008,7 +1010,7 @@ mod test { let fetch_last_nonce = |_a: &Address| AccountDetails{ nonce: last_nonce, balance: !U256::zero() }; // when - txq.add(tx, &fetch_last_nonce).unwrap(); + txq.add(tx, &fetch_last_nonce).unwrap_err(); // then let stats = txq.status(); @@ -1028,7 +1030,7 @@ mod test { assert_eq!(txq.status().pending, 0); // when - txq.add(tx2.clone(), &nonce).unwrap(); + txq.add(tx2.clone(), &nonce).unwrap_err(); // then let stats = txq.status(); diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 7ecfb86de..5cd1b2966 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -385,7 +385,7 @@ impl Eth for EthClient nonce: client.nonce(a), balance: client.balance(a), }); - match import { + match import.into_iter().collect::, _>>() { Ok(_) => to_value(&hash), Err(e) => { warn!("Error sending transaction: {:?}", e); diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 44b94ac14..ad9dcaedd 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -48,7 +48,7 @@ impl MinerService for TestMinerService { } /// Imports transactions to transaction queue. - fn import_transactions(&self, _transactions: Vec, _fetch_account: T) -> Result<(), Error> + fn import_transactions(&self, _transactions: Vec, _fetch_account: T) -> Vec> where T: Fn(&Address) -> AccountDetails { unimplemented!(); } /// Returns hashes of transactions currently in pending @@ -61,7 +61,7 @@ impl MinerService for TestMinerService { fn chain_new_blocks(&self, _chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { unimplemented!(); } /// New chain head event. Restart mining operation. - fn prepare_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); } + fn update_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); } /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. fn sealing_block(&self, _chain: &BlockChainClient) -> &Mutex> { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 57eb101cb..4fd386333 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1297,7 +1297,7 @@ impl ChainSync { } pub fn chain_new_head(&mut self, io: &mut SyncIo) { - self.miner.prepare_sealing(io.chain()); + self.miner.update_sealing(io.chain()); } }