From ef124fa3ef4ff5a007b692315d62364fdc234bfa Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 12 Jul 2016 09:52:46 +0200 Subject: [PATCH] Backport sealing fixes to beta (#1583) * Update sealing just once when externally importing many blocks (#1541) Fixes Issue #1372 * Fixing deadlock in miner (#1569) * Fixing deadlock in miner * Adding more comments --- ethcore/src/client/client.rs | 23 ++----- ethcore/src/client/mod.rs | 5 -- ethcore/src/client/test_client.rs | 29 ++++----- ethcore/src/miner/miner.rs | 79 +++++++++++++++++------ ethcore/src/miner/mod.rs | 10 ++- rpc/src/v1/impls/mod.rs | 9 +-- rpc/src/v1/tests/helpers/miner_service.rs | 14 ++-- 7 files changed, 88 insertions(+), 81 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index d5e509e62..84bfd1e96 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; use util::*; use util::panics::*; use views::BlockView; -use error::{Error, ImportError, ExecutionError, BlockError, ImportResult}; +use error::{ImportError, ExecutionError, BlockError, ImportResult}; use header::{BlockNumber, Header}; use state::State; use spec::Spec; @@ -38,7 +38,8 @@ use filter::Filter; use log_entry::LocalizedLogEntry; use block_queue::{BlockQueue, BlockQueueInfo}; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; -use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics}; +use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile, + BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics }; use client::Error as ClientError; use env_info::EnvInfo; use executive::{Executive, Executed, TransactOptions, contract_address}; @@ -49,7 +50,7 @@ use trace; pub use types::blockchain_info::BlockChainInfo; pub use types::block_status::BlockStatus; use evm::Factory as EvmFactory; -use miner::{Miner, MinerService, TransactionImportResult, AccountDetails}; +use miner::{Miner, MinerService}; const MAX_TX_QUEUE_SIZE: usize = 4096; @@ -384,12 +385,8 @@ impl Client { pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { let _timer = PerfTimer::new("import_queued_transactions"); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); - let fetch_account = |a: &Address| AccountDetails { - nonce: self.latest_nonce(a), - balance: self.latest_balance(a), - }; - let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); - let results = self.miner.import_transactions(self, tx, fetch_account); + let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); + let results = self.miner.import_external_transactions(self, txs); results.len() } @@ -772,14 +769,6 @@ impl BlockChainClient for Client { self.build_last_hashes(self.chain.best_block_hash()) } - fn import_transactions(&self, transactions: Vec) -> Vec> { - let fetch_account = |a: &Address| AccountDetails { - nonce: self.latest_nonce(a), - balance: self.latest_balance(a), - }; - self.miner.import_transactions(self, transactions, fetch_account) - } - fn queue_transactions(&self, transactions: Vec) { if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE { debug!("Ignoring {} transactions: queue is full", transactions.len()); diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 7f3c3bb3a..d820ed4a7 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -47,8 +47,6 @@ use error::{ImportResult, ExecutionError}; use receipt::LocalizedReceipt; use trace::LocalizedTrace; use evm::Factory as EvmFactory; -use miner::{TransactionImportResult}; -use error::Error as EthError; /// Options concerning what analytics we run on the call. #[derive(Eq, PartialEq, Default, Clone, Copy, Debug)] @@ -187,9 +185,6 @@ pub trait BlockChainClient : Sync + Send { /// Get last hashes starting from best block. fn last_hashes(&self) -> LastHashes; - /// import transactions from network/other 3rd party - fn import_transactions(&self, transactions: Vec) -> Vec>; - /// Queue transactions for importing. fn queue_transactions(&self, transactions: Vec); diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index f51f978de..c5ade9b55 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -20,7 +20,8 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder}; use util::*; use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action}; use blockchain::TreeRoute; -use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID, TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics}; +use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID, + TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics }; use header::{Header as BlockHeader, BlockNumber}; use filter::Filter; use log_entry::LocalizedLogEntry; @@ -37,9 +38,6 @@ use executive::Executed; use error::{ExecutionError}; use trace::LocalizedTrace; -use miner::{TransactionImportResult, AccountDetails}; -use error::Error as EthError; - /// Test client. pub struct TestBlockChainClient { /// Blocks. @@ -274,6 +272,10 @@ impl BlockChainClient for TestBlockChainClient { } } + fn latest_nonce(&self, address: &Address) -> U256 { + self.nonce(address, BlockID::Latest).unwrap() + } + fn code(&self, address: &Address) -> Option { self.code.read().unwrap().get(address).cloned() } @@ -286,6 +288,10 @@ impl BlockChainClient for TestBlockChainClient { } } + fn latest_balance(&self, address: &Address) -> U256 { + self.balance(address, BlockID::Latest).unwrap() + } + fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option { if let BlockID::Latest = id { Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new)) @@ -487,21 +493,10 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn import_transactions(&self, transactions: Vec) -> Vec> { - let nonces = self.nonces.read().unwrap(); - let balances = self.balances.read().unwrap(); - let fetch_account = |a: &Address| AccountDetails { - nonce: nonces[a], - balance: balances[a], - }; - - self.miner.import_transactions(self, transactions, &fetch_account) - } - fn queue_transactions(&self, transactions: Vec) { // import right here - let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); - self.import_transactions(tx); + let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); + self.miner.import_external_transactions(self, txs); } fn pending_transactions(&self) -> Vec { diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 8719e6658..d9f24e657 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -299,6 +299,10 @@ impl Miner { let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_some(); trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work); if !have_work { + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- self.sealing_enabled.store(true, atomic::Ordering::Relaxed); self.prepare_sealing(chain); } @@ -313,6 +317,19 @@ impl Miner { !have_work } + fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) -> + Vec> { + + let fetch_account = |a: &Address| AccountDetails { + nonce: chain.latest_nonce(a), + balance: chain.latest_balance(a), + }; + + transactions.into_iter() + .map(|tx| transaction_queue.add(tx, &fetch_account, origin)) + .collect() + } + /// Are we allowed to do a non-mandatory reseal? fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() } } @@ -323,6 +340,10 @@ impl MinerService for Miner { fn clear_and_reset(&self, chain: &MiningBlockChainClient) { self.transaction_queue.lock().unwrap().clear(); + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- self.update_sealing(chain); } @@ -476,27 +497,31 @@ impl MinerService for Miner { self.gas_range_target.read().unwrap().1 } - fn import_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec, fetch_account: T) -> - Vec> - where T: Fn(&Address) -> AccountDetails { - let results: Vec> = { + fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec) -> + Vec> { + + let results = { let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transactions.into_iter() - .map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External)) - .collect() + self.add_transactions_to_queue( + chain, transactions, TransactionOrigin::External, &mut transaction_queue + ) }; - if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() { + + if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() { + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- self.update_sealing(chain); } results } - fn import_own_transaction( + fn import_own_transaction( &self, chain: &MiningBlockChainClient, transaction: SignedTransaction, - fetch_account: T - ) -> Result where T: Fn(&Address) -> AccountDetails { + ) -> Result { let hash = transaction.hash(); trace!(target: "own_tx", "Importing transaction: {:?}", transaction); @@ -504,7 +529,7 @@ impl MinerService for Miner { let imported = { // Be sure to release the lock before we call enable_and_prepare_sealing let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let import = transaction_queue.add(transaction, &fetch_account, TransactionOrigin::Local); + let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap(); match import { Ok(ref res) => { @@ -520,6 +545,10 @@ impl MinerService for Miner { import }; + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- if imported.is_ok() && self.options.reseal_on_own_tx && self.tx_reseal_allowed() { // Make sure to do it after transaction is imported and lock is droped. // We need to create pending block and enable sealing @@ -614,6 +643,10 @@ impl MinerService for Miner { self.sealing_work.lock().unwrap().reset(); } else { *self.next_allowed_reseal.lock().unwrap() = Instant::now() + self.options.reseal_min_period; + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- self.prepare_sealing(chain); } } @@ -655,7 +688,12 @@ impl MinerService for Miner { // Client should send message after commit to db and inserting to chain. .expect("Expected in-chain blocks."); let block = BlockView::new(&block); - block.transactions() + let txs = block.transactions(); + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + txs } // 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions @@ -672,14 +710,9 @@ impl MinerService for Miner { .par_iter() .map(|h| fetch_transactions(chain, h)); out_of_chain.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let _ = self.import_transactions(chain, txs, |a| AccountDetails { - nonce: chain.latest_nonce(a), - balance: chain.latest_balance(a), - }); + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let _ = self.add_transactions_to_queue(chain, txs, TransactionOrigin::External, + &mut transaction_queue); }); } @@ -703,6 +736,10 @@ impl MinerService for Miner { }); } + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- self.update_sealing(chain); } } diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 152bd1a61..a8b7ec1aa 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -106,14 +106,12 @@ pub trait MinerService : Send + Sync { fn set_tx_gas_limit(&self, limit: U256); /// Imports transactions to transaction queue. - fn import_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec, fetch_account: T) -> - Vec> - where T: Fn(&Address) -> AccountDetails, Self: Sized; + fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec) -> + Vec>; /// Imports own (node owner) transaction to queue. - fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) -> - Result - where T: Fn(&Address) -> AccountDetails, Self: Sized; + fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) -> + Result; /// Returns hashes of transactions currently in pending fn pending_transactions_hashes(&self) -> Vec; diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index 1a6b1c398..86ab1ae02 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -55,7 +55,7 @@ pub use self::rpc::RpcClient; use v1::types::TransactionRequest; use ethcore::error::Error as EthcoreError; -use ethcore::miner::{AccountDetails, MinerService}; +use ethcore::miner::MinerService; use ethcore::client::MiningBlockChainClient; use ethcore::transaction::{Action, SignedTransaction, Transaction}; use ethcore::account_provider::{AccountProvider, Error as AccountError}; @@ -79,12 +79,7 @@ fn dispatch_transaction(client: &C, miner: &M, signed_transaction: SignedT where C: MiningBlockChainClient, M: MinerService { let hash = signed_transaction.hash(); - let import = miner.import_own_transaction(client, signed_transaction, |a: &Address| { - AccountDetails { - nonce: client.latest_nonce(&a), - balance: client.latest_balance(&a), - } - }); + let import = miner.import_own_transaction(client, signed_transaction); import .map_err(transaction_error) diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 6329be7bd..deb12d14d 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -23,7 +23,7 @@ use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics}; use ethcore::block::{ClosedBlock, IsBlock}; use ethcore::transaction::SignedTransaction; use ethcore::receipt::Receipt; -use ethcore::miner::{MinerService, MinerStatus, AccountDetails, TransactionImportResult}; +use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult}; /// Test miner service. pub struct TestMinerService { @@ -130,14 +130,13 @@ impl MinerService for TestMinerService { } /// Imports transactions to transaction queue. - fn import_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec, fetch_account: T) -> - Vec> - where T: Fn(&Address) -> AccountDetails { + fn import_external_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec) -> + Vec> { // lets assume that all txs are valid self.imported_transactions.lock().unwrap().extend_from_slice(&transactions); for sender in transactions.iter().filter_map(|t| t.sender().ok()) { - let nonce = self.last_nonce(&sender).unwrap_or(fetch_account(&sender).nonce); + let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests"); self.last_nonces.write().unwrap().insert(sender, nonce + U256::from(1)); } transactions @@ -147,9 +146,8 @@ impl MinerService for TestMinerService { } /// Imports transactions to transaction queue. - fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, _fetch_account: T) -> - Result - where T: Fn(&Address) -> AccountDetails { + fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) -> + Result { // keep the pending nonces up to date if let Ok(ref sender) = transaction.sender() {