Backport sealing fixes to beta (#1583)

* Update sealing just once when  externally importing many blocks (#1541)

Fixes Issue #1372

* Fixing deadlock in miner (#1569)

* Fixing deadlock in miner
* Adding more comments
This commit is contained in:
Arkadiy Paronyan 2016-07-12 09:52:46 +02:00 committed by Gav Wood
parent aece120e77
commit ef124fa3ef
7 changed files with 88 additions and 81 deletions

View File

@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use util::*; use util::*;
use util::panics::*; use util::panics::*;
use views::BlockView; use views::BlockView;
use error::{Error, ImportError, ExecutionError, BlockError, ImportResult}; use error::{ImportError, ExecutionError, BlockError, ImportResult};
use header::{BlockNumber, Header}; use header::{BlockNumber, Header};
use state::State; use state::State;
use spec::Spec; use spec::Spec;
@ -38,7 +38,8 @@ use filter::Filter;
use log_entry::LocalizedLogEntry; use log_entry::LocalizedLogEntry;
use block_queue::{BlockQueue, BlockQueueInfo}; use block_queue::{BlockQueue, BlockQueueInfo};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics}; use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile,
BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics };
use client::Error as ClientError; use client::Error as ClientError;
use env_info::EnvInfo; use env_info::EnvInfo;
use executive::{Executive, Executed, TransactOptions, contract_address}; use executive::{Executive, Executed, TransactOptions, contract_address};
@ -49,7 +50,7 @@ use trace;
pub use types::blockchain_info::BlockChainInfo; pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus; pub use types::block_status::BlockStatus;
use evm::Factory as EvmFactory; use evm::Factory as EvmFactory;
use miner::{Miner, MinerService, TransactionImportResult, AccountDetails}; use miner::{Miner, MinerService};
const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_TX_QUEUE_SIZE: usize = 4096;
@ -384,12 +385,8 @@ impl Client {
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions"); let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let fetch_account = |a: &Address| AccountDetails { let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
nonce: self.latest_nonce(a), let results = self.miner.import_external_transactions(self, txs);
balance: self.latest_balance(a),
};
let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_transactions(self, tx, fetch_account);
results.len() results.len()
} }
@ -772,14 +769,6 @@ impl BlockChainClient for Client {
self.build_last_hashes(self.chain.best_block_hash()) self.build_last_hashes(self.chain.best_block_hash())
} }
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, Error>> {
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
self.miner.import_transactions(self, transactions, fetch_account)
}
fn queue_transactions(&self, transactions: Vec<Bytes>) { fn queue_transactions(&self, transactions: Vec<Bytes>) {
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE { if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len()); debug!("Ignoring {} transactions: queue is full", transactions.len());

View File

@ -47,8 +47,6 @@ use error::{ImportResult, ExecutionError};
use receipt::LocalizedReceipt; use receipt::LocalizedReceipt;
use trace::LocalizedTrace; use trace::LocalizedTrace;
use evm::Factory as EvmFactory; use evm::Factory as EvmFactory;
use miner::{TransactionImportResult};
use error::Error as EthError;
/// Options concerning what analytics we run on the call. /// Options concerning what analytics we run on the call.
#[derive(Eq, PartialEq, Default, Clone, Copy, Debug)] #[derive(Eq, PartialEq, Default, Clone, Copy, Debug)]
@ -187,9 +185,6 @@ pub trait BlockChainClient : Sync + Send {
/// Get last hashes starting from best block. /// Get last hashes starting from best block.
fn last_hashes(&self) -> LastHashes; fn last_hashes(&self) -> LastHashes;
/// import transactions from network/other 3rd party
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>>;
/// Queue transactions for importing. /// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>); fn queue_transactions(&self, transactions: Vec<Bytes>);

View File

@ -20,7 +20,8 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder};
use util::*; use util::*;
use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action}; use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
use blockchain::TreeRoute; use blockchain::TreeRoute;
use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID, TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics}; use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID,
TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics };
use header::{Header as BlockHeader, BlockNumber}; use header::{Header as BlockHeader, BlockNumber};
use filter::Filter; use filter::Filter;
use log_entry::LocalizedLogEntry; use log_entry::LocalizedLogEntry;
@ -37,9 +38,6 @@ use executive::Executed;
use error::{ExecutionError}; use error::{ExecutionError};
use trace::LocalizedTrace; use trace::LocalizedTrace;
use miner::{TransactionImportResult, AccountDetails};
use error::Error as EthError;
/// Test client. /// Test client.
pub struct TestBlockChainClient { pub struct TestBlockChainClient {
/// Blocks. /// Blocks.
@ -274,6 +272,10 @@ impl BlockChainClient for TestBlockChainClient {
} }
} }
fn latest_nonce(&self, address: &Address) -> U256 {
self.nonce(address, BlockID::Latest).unwrap()
}
fn code(&self, address: &Address) -> Option<Bytes> { fn code(&self, address: &Address) -> Option<Bytes> {
self.code.read().unwrap().get(address).cloned() self.code.read().unwrap().get(address).cloned()
} }
@ -286,6 +288,10 @@ impl BlockChainClient for TestBlockChainClient {
} }
} }
fn latest_balance(&self, address: &Address) -> U256 {
self.balance(address, BlockID::Latest).unwrap()
}
fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option<H256> { fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option<H256> {
if let BlockID::Latest = id { if let BlockID::Latest = id {
Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new)) Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new))
@ -487,21 +493,10 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!(); unimplemented!();
} }
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>> {
let nonces = self.nonces.read().unwrap();
let balances = self.balances.read().unwrap();
let fetch_account = |a: &Address| AccountDetails {
nonce: nonces[a],
balance: balances[a],
};
self.miner.import_transactions(self, transactions, &fetch_account)
}
fn queue_transactions(&self, transactions: Vec<Bytes>) { fn queue_transactions(&self, transactions: Vec<Bytes>) {
// import right here // import right here
let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.import_transactions(tx); self.miner.import_external_transactions(self, txs);
} }
fn pending_transactions(&self) -> Vec<SignedTransaction> { fn pending_transactions(&self) -> Vec<SignedTransaction> {

View File

@ -299,6 +299,10 @@ impl Miner {
let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_some(); let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_some();
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work); trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
if !have_work { if !have_work {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.sealing_enabled.store(true, atomic::Ordering::Relaxed); self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
self.prepare_sealing(chain); self.prepare_sealing(chain);
} }
@ -313,6 +317,19 @@ impl Miner {
!have_work !have_work
} }
fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) ->
Vec<Result<TransactionImportResult, Error>> {
let fetch_account = |a: &Address| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
};
transactions.into_iter()
.map(|tx| transaction_queue.add(tx, &fetch_account, origin))
.collect()
}
/// Are we allowed to do a non-mandatory reseal? /// Are we allowed to do a non-mandatory reseal?
fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() } fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() }
} }
@ -323,6 +340,10 @@ impl MinerService for Miner {
fn clear_and_reset(&self, chain: &MiningBlockChainClient) { fn clear_and_reset(&self, chain: &MiningBlockChainClient) {
self.transaction_queue.lock().unwrap().clear(); self.transaction_queue.lock().unwrap().clear();
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain); self.update_sealing(chain);
} }
@ -476,27 +497,31 @@ impl MinerService for Miner {
self.gas_range_target.read().unwrap().1 self.gas_range_target.read().unwrap().1
} }
fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) -> fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> Vec<Result<TransactionImportResult, Error>> {
where T: Fn(&Address) -> AccountDetails {
let results: Vec<Result<TransactionImportResult, Error>> = { let results = {
let mut transaction_queue = self.transaction_queue.lock().unwrap(); let mut transaction_queue = self.transaction_queue.lock().unwrap();
transactions.into_iter() self.add_transactions_to_queue(
.map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External)) chain, transactions, TransactionOrigin::External, &mut transaction_queue
.collect() )
}; };
if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() { if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain); self.update_sealing(chain);
} }
results results
} }
fn import_own_transaction<T>( fn import_own_transaction(
&self, &self,
chain: &MiningBlockChainClient, chain: &MiningBlockChainClient,
transaction: SignedTransaction, transaction: SignedTransaction,
fetch_account: T ) -> Result<TransactionImportResult, Error> {
) -> Result<TransactionImportResult, Error> where T: Fn(&Address) -> AccountDetails {
let hash = transaction.hash(); let hash = transaction.hash();
trace!(target: "own_tx", "Importing transaction: {:?}", transaction); trace!(target: "own_tx", "Importing transaction: {:?}", transaction);
@ -504,7 +529,7 @@ impl MinerService for Miner {
let imported = { let imported = {
// Be sure to release the lock before we call enable_and_prepare_sealing // Be sure to release the lock before we call enable_and_prepare_sealing
let mut transaction_queue = self.transaction_queue.lock().unwrap(); let mut transaction_queue = self.transaction_queue.lock().unwrap();
let import = transaction_queue.add(transaction, &fetch_account, TransactionOrigin::Local); let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap();
match import { match import {
Ok(ref res) => { Ok(ref res) => {
@ -520,6 +545,10 @@ impl MinerService for Miner {
import import
}; };
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
if imported.is_ok() && self.options.reseal_on_own_tx && self.tx_reseal_allowed() { if imported.is_ok() && self.options.reseal_on_own_tx && self.tx_reseal_allowed() {
// Make sure to do it after transaction is imported and lock is droped. // Make sure to do it after transaction is imported and lock is droped.
// We need to create pending block and enable sealing // We need to create pending block and enable sealing
@ -614,6 +643,10 @@ impl MinerService for Miner {
self.sealing_work.lock().unwrap().reset(); self.sealing_work.lock().unwrap().reset();
} else { } else {
*self.next_allowed_reseal.lock().unwrap() = Instant::now() + self.options.reseal_min_period; *self.next_allowed_reseal.lock().unwrap() = Instant::now() + self.options.reseal_min_period;
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.prepare_sealing(chain); self.prepare_sealing(chain);
} }
} }
@ -655,7 +688,12 @@ impl MinerService for Miner {
// Client should send message after commit to db and inserting to chain. // Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks."); .expect("Expected in-chain blocks.");
let block = BlockView::new(&block); let block = BlockView::new(&block);
block.transactions() let txs = block.transactions();
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
txs
} }
// 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions // 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions
@ -672,14 +710,9 @@ impl MinerService for Miner {
.par_iter() .par_iter()
.map(|h| fetch_transactions(chain, h)); .map(|h| fetch_transactions(chain, h));
out_of_chain.for_each(|txs| { out_of_chain.for_each(|txs| {
// populate sender let mut transaction_queue = self.transaction_queue.lock().unwrap();
for tx in &txs { let _ = self.add_transactions_to_queue(chain, txs, TransactionOrigin::External,
let _sender = tx.sender(); &mut transaction_queue);
}
let _ = self.import_transactions(chain, txs, |a| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
});
}); });
} }
@ -703,6 +736,10 @@ impl MinerService for Miner {
}); });
} }
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain); self.update_sealing(chain);
} }
} }

View File

@ -106,14 +106,12 @@ pub trait MinerService : Send + Sync {
fn set_tx_gas_limit(&self, limit: U256); fn set_tx_gas_limit(&self, limit: U256);
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) -> fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> Vec<Result<TransactionImportResult, Error>>;
where T: Fn(&Address) -> AccountDetails, Self: Sized;
/// Imports own (node owner) transaction to queue. /// Imports own (node owner) transaction to queue.
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) -> fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
Result<TransactionImportResult, Error> Result<TransactionImportResult, Error>;
where T: Fn(&Address) -> AccountDetails, Self: Sized;
/// Returns hashes of transactions currently in pending /// Returns hashes of transactions currently in pending
fn pending_transactions_hashes(&self) -> Vec<H256>; fn pending_transactions_hashes(&self) -> Vec<H256>;

View File

@ -55,7 +55,7 @@ pub use self::rpc::RpcClient;
use v1::types::TransactionRequest; use v1::types::TransactionRequest;
use ethcore::error::Error as EthcoreError; use ethcore::error::Error as EthcoreError;
use ethcore::miner::{AccountDetails, MinerService}; use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient; use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{Action, SignedTransaction, Transaction}; use ethcore::transaction::{Action, SignedTransaction, Transaction};
use ethcore::account_provider::{AccountProvider, Error as AccountError}; use ethcore::account_provider::{AccountProvider, Error as AccountError};
@ -79,12 +79,7 @@ fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: SignedT
where C: MiningBlockChainClient, M: MinerService { where C: MiningBlockChainClient, M: MinerService {
let hash = signed_transaction.hash(); let hash = signed_transaction.hash();
let import = miner.import_own_transaction(client, signed_transaction, |a: &Address| { let import = miner.import_own_transaction(client, signed_transaction);
AccountDetails {
nonce: client.latest_nonce(&a),
balance: client.latest_balance(&a),
}
});
import import
.map_err(transaction_error) .map_err(transaction_error)

View File

@ -23,7 +23,7 @@ use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics};
use ethcore::block::{ClosedBlock, IsBlock}; use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
use ethcore::miner::{MinerService, MinerStatus, AccountDetails, TransactionImportResult}; use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult};
/// Test miner service. /// Test miner service.
pub struct TestMinerService { pub struct TestMinerService {
@ -130,14 +130,13 @@ impl MinerService for TestMinerService {
} }
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_transactions<T>(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) -> fn import_external_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> Vec<Result<TransactionImportResult, Error>> {
where T: Fn(&Address) -> AccountDetails {
// lets assume that all txs are valid // lets assume that all txs are valid
self.imported_transactions.lock().unwrap().extend_from_slice(&transactions); self.imported_transactions.lock().unwrap().extend_from_slice(&transactions);
for sender in transactions.iter().filter_map(|t| t.sender().ok()) { for sender in transactions.iter().filter_map(|t| t.sender().ok()) {
let nonce = self.last_nonce(&sender).unwrap_or(fetch_account(&sender).nonce); let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests");
self.last_nonces.write().unwrap().insert(sender, nonce + U256::from(1)); self.last_nonces.write().unwrap().insert(sender, nonce + U256::from(1));
} }
transactions transactions
@ -147,9 +146,8 @@ impl MinerService for TestMinerService {
} }
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, _fetch_account: T) -> fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
Result<TransactionImportResult, Error> Result<TransactionImportResult, Error> {
where T: Fn(&Address) -> AccountDetails {
// keep the pending nonces up to date // keep the pending nonces up to date
if let Ok(ref sender) = transaction.sender() { if let Ok(ref sender) = transaction.sender() {