Combine mining queue and enabled into single locked datum (#1749)
* Combine mining queue and enabled into single locked datum Additional tracing. * Fix bug uncovered by test. * Fix typo * Remove unneeded log initialisation in test. [ci:skip]
This commit is contained in:
parent
11cb544c24
commit
297d25dd65
@ -342,7 +342,7 @@ impl Client {
|
||||
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
||||
pub fn import_verified_blocks(&self) -> usize {
|
||||
let max_blocks_to_import = 64;
|
||||
let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = {
|
||||
let (imported_blocks, import_results, invalid_blocks, imported, duration) = {
|
||||
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
|
||||
let mut invalid_blocks = HashSet::new();
|
||||
let mut import_results = Vec::with_capacity(max_blocks_to_import);
|
||||
@ -352,8 +352,6 @@ impl Client {
|
||||
let start = precise_time_ns();
|
||||
let blocks = self.block_queue.drain(max_blocks_to_import);
|
||||
|
||||
let original_best = self.chain_info().best_block_hash;
|
||||
|
||||
for block in blocks {
|
||||
let header = &block.header;
|
||||
if invalid_blocks.contains(&header.parent_hash) {
|
||||
@ -387,7 +385,7 @@ impl Client {
|
||||
}
|
||||
}
|
||||
let duration_ns = precise_time_ns() - start;
|
||||
(imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns)
|
||||
(imported_blocks, import_results, invalid_blocks, imported, duration_ns)
|
||||
};
|
||||
|
||||
{
|
||||
@ -411,10 +409,6 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
if self.chain_info().best_block_hash != original_best {
|
||||
self.miner.update_sealing(self);
|
||||
}
|
||||
|
||||
imported
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use rayon::prelude::*;
|
||||
use std::sync::atomic::{self, AtomicBool};
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
use util::*;
|
||||
@ -159,15 +158,20 @@ impl GasPricer {
|
||||
}
|
||||
}
|
||||
|
||||
struct SealingWork {
|
||||
queue: UsingQueue<ClosedBlock>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
/// Keeps track of transactions using priority queue and holds currently mined block.
|
||||
pub struct Miner {
|
||||
// NOTE [ToDr] When locking always lock in this order!
|
||||
transaction_queue: Arc<Mutex<TransactionQueue>>,
|
||||
sealing_work: Mutex<UsingQueue<ClosedBlock>>,
|
||||
sealing_work: Mutex<SealingWork>,
|
||||
|
||||
// for sealing...
|
||||
options: MinerOptions,
|
||||
sealing_enabled: AtomicBool,
|
||||
|
||||
next_allowed_reseal: Mutex<Instant>,
|
||||
sealing_block_last_request: Mutex<u64>,
|
||||
gas_range_target: RwLock<(U256, U256)>,
|
||||
@ -186,10 +190,9 @@ impl Miner {
|
||||
Miner {
|
||||
transaction_queue: Arc::new(Mutex::new(TransactionQueue::new())),
|
||||
options: Default::default(),
|
||||
sealing_enabled: AtomicBool::new(false),
|
||||
next_allowed_reseal: Mutex::new(Instant::now()),
|
||||
sealing_block_last_request: Mutex::new(0),
|
||||
sealing_work: Mutex::new(UsingQueue::new(20)),
|
||||
sealing_work: Mutex::new(SealingWork{queue: UsingQueue::new(20), enabled: false}),
|
||||
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
|
||||
author: RwLock::new(Address::default()),
|
||||
extra_data: RwLock::new(Vec::new()),
|
||||
@ -206,10 +209,9 @@ impl Miner {
|
||||
let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)));
|
||||
Arc::new(Miner {
|
||||
transaction_queue: txq,
|
||||
sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()),
|
||||
next_allowed_reseal: Mutex::new(Instant::now()),
|
||||
sealing_block_last_request: Mutex::new(0),
|
||||
sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)),
|
||||
sealing_work: Mutex::new(SealingWork{queue: UsingQueue::new(options.work_queue_size), enabled: options.force_sealing || !options.new_work_notify.is_empty()}),
|
||||
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
|
||||
author: RwLock::new(Address::default()),
|
||||
extra_data: RwLock::new(Vec::new()),
|
||||
@ -231,12 +233,12 @@ impl Miner {
|
||||
|
||||
/// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing.
|
||||
pub fn pending_state(&self) -> Option<State> {
|
||||
self.sealing_work.lock().peek_last_ref().map(|b| b.block().fields().state.clone())
|
||||
self.sealing_work.lock().queue.peek_last_ref().map(|b| b.block().fields().state.clone())
|
||||
}
|
||||
|
||||
/// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing.
|
||||
pub fn pending_block(&self) -> Option<Block> {
|
||||
self.sealing_work.lock().peek_last_ref().map(|b| b.base().clone())
|
||||
self.sealing_work.lock().queue.peek_last_ref().map(|b| b.base().clone())
|
||||
}
|
||||
|
||||
/// Prepares new block for sealing including top transactions from queue.
|
||||
@ -258,7 +260,7 @@ impl Miner {
|
||||
let (transactions, mut open_block, original_work_hash) = {
|
||||
let transactions = {self.transaction_queue.lock().top_transactions()};
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash());
|
||||
let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash());
|
||||
let best_hash = chain.best_block_header().sha3();
|
||||
/*
|
||||
// check to see if last ClosedBlock in would_seals is actually same parent block.
|
||||
@ -268,7 +270,7 @@ impl Miner {
|
||||
// otherwise, leave everything alone.
|
||||
// otherwise, author a fresh block.
|
||||
*/
|
||||
let open_block = match sealing_work.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) {
|
||||
let open_block = match sealing_work.queue.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) {
|
||||
Some(old_block) => {
|
||||
trace!(target: "miner", "Already have previous work; updating and returning");
|
||||
// add transactions to old_block
|
||||
@ -359,7 +361,7 @@ impl Miner {
|
||||
|
||||
let (work, is_new) = {
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash());
|
||||
let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash());
|
||||
trace!(target: "miner", "Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash());
|
||||
let (work, is_new) = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) {
|
||||
trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash());
|
||||
@ -367,16 +369,16 @@ impl Miner {
|
||||
let number = block.block().fields().header.number();
|
||||
let difficulty = *block.block().fields().header.difficulty();
|
||||
let is_new = original_work_hash.map_or(true, |h| block.block().fields().header.hash() != h);
|
||||
sealing_work.push(block);
|
||||
sealing_work.queue.push(block);
|
||||
// If push notifications are enabled we assume all work items are used.
|
||||
if self.work_poster.is_some() && is_new {
|
||||
sealing_work.use_last_ref();
|
||||
sealing_work.queue.use_last_ref();
|
||||
}
|
||||
(Some((pow_hash, difficulty, number)), is_new)
|
||||
} else {
|
||||
(None, false)
|
||||
};
|
||||
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash()));
|
||||
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.queue.peek_last_ref().map(|b| b.block().fields().header.hash()));
|
||||
(work, is_new)
|
||||
};
|
||||
if is_new {
|
||||
@ -393,14 +395,22 @@ impl Miner {
|
||||
/// Returns true if we had to prepare new pending block
|
||||
fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool {
|
||||
trace!(target: "miner", "enable_and_prepare_sealing: entering");
|
||||
let have_work = self.sealing_work.lock().peek_last_ref().is_some();
|
||||
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
|
||||
if !have_work {
|
||||
let prepare_new = {
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let have_work = sealing_work.queue.peek_last_ref().is_some();
|
||||
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
|
||||
if !have_work {
|
||||
sealing_work.enabled = true;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
if prepare_new {
|
||||
// --------------------------------------------------------------------------
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
|
||||
self.prepare_sealing(chain);
|
||||
}
|
||||
let mut sealing_block_last_request = self.sealing_block_last_request.lock();
|
||||
@ -410,8 +420,8 @@ impl Miner {
|
||||
*sealing_block_last_request = best_number;
|
||||
}
|
||||
|
||||
// Return if
|
||||
!have_work
|
||||
// Return if we restarted
|
||||
prepare_new
|
||||
}
|
||||
|
||||
fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) ->
|
||||
@ -450,13 +460,13 @@ impl MinerService for Miner {
|
||||
MinerStatus {
|
||||
transactions_in_pending_queue: status.pending,
|
||||
transactions_in_future_queue: status.future,
|
||||
transactions_in_pending_block: sealing_work.peek_last_ref().map_or(0, |b| b.transactions().len()),
|
||||
transactions_in_pending_block: sealing_work.queue.peek_last_ref().map_or(0, |b| b.transactions().len()),
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction, analytics: CallAnalytics) -> Result<Executed, ExecutionError> {
|
||||
let sealing_work = self.sealing_work.lock();
|
||||
match sealing_work.peek_last_ref() {
|
||||
match sealing_work.queue.peek_last_ref() {
|
||||
Some(work) => {
|
||||
let block = work.block();
|
||||
|
||||
@ -503,7 +513,7 @@ impl MinerService for Miner {
|
||||
|
||||
fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
|
||||
let sealing_work = self.sealing_work.lock();
|
||||
sealing_work.peek_last_ref().map_or_else(
|
||||
sealing_work.queue.peek_last_ref().map_or_else(
|
||||
|| chain.latest_balance(address),
|
||||
|b| b.block().fields().state.balance(address)
|
||||
)
|
||||
@ -511,7 +521,7 @@ impl MinerService for Miner {
|
||||
|
||||
fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 {
|
||||
let sealing_work = self.sealing_work.lock();
|
||||
sealing_work.peek_last_ref().map_or_else(
|
||||
sealing_work.queue.peek_last_ref().map_or_else(
|
||||
|| chain.latest_storage_at(address, position),
|
||||
|b| b.block().fields().state.storage_at(address, position)
|
||||
)
|
||||
@ -519,12 +529,12 @@ impl MinerService for Miner {
|
||||
|
||||
fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
|
||||
let sealing_work = self.sealing_work.lock();
|
||||
sealing_work.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address))
|
||||
sealing_work.queue.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address))
|
||||
}
|
||||
|
||||
fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option<Bytes> {
|
||||
let sealing_work = self.sealing_work.lock();
|
||||
sealing_work.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address))
|
||||
sealing_work.queue.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address))
|
||||
}
|
||||
|
||||
fn set_author(&self, author: Address) {
|
||||
@ -673,8 +683,8 @@ impl MinerService for Miner {
|
||||
let queue = self.transaction_queue.lock();
|
||||
let sw = self.sealing_work.lock();
|
||||
// TODO: should only use the sealing_work when it's current (it could be an old block)
|
||||
let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) {
|
||||
true => sw.peek_last_ref(),
|
||||
let sealing_set = match sw.enabled {
|
||||
true => sw.queue.peek_last_ref(),
|
||||
false => None,
|
||||
};
|
||||
match (&self.options.pending_set, sealing_set) {
|
||||
@ -686,8 +696,8 @@ impl MinerService for Miner {
|
||||
fn pending_transactions_hashes(&self) -> Vec<H256> {
|
||||
let queue = self.transaction_queue.lock();
|
||||
let sw = self.sealing_work.lock();
|
||||
let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) {
|
||||
true => sw.peek_last_ref(),
|
||||
let sealing_set = match sw.enabled {
|
||||
true => sw.queue.peek_last_ref(),
|
||||
false => None,
|
||||
};
|
||||
match (&self.options.pending_set, sealing_set) {
|
||||
@ -699,8 +709,8 @@ impl MinerService for Miner {
|
||||
fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
|
||||
let queue = self.transaction_queue.lock();
|
||||
let sw = self.sealing_work.lock();
|
||||
let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) {
|
||||
true => sw.peek_last_ref(),
|
||||
let sealing_set = match sw.enabled {
|
||||
true => sw.queue.peek_last_ref(),
|
||||
false => None,
|
||||
};
|
||||
match (&self.options.pending_set, sealing_set) {
|
||||
@ -710,7 +720,8 @@ impl MinerService for Miner {
|
||||
}
|
||||
|
||||
fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
|
||||
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().peek_last_ref()) {
|
||||
let sealing_work = self.sealing_work.lock();
|
||||
match (sealing_work.enabled, sealing_work.queue.peek_last_ref()) {
|
||||
(true, Some(pending)) => {
|
||||
let hashes = pending.transactions()
|
||||
.iter()
|
||||
@ -729,27 +740,43 @@ impl MinerService for Miner {
|
||||
}
|
||||
|
||||
fn update_sealing(&self, chain: &MiningBlockChainClient) {
|
||||
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
|
||||
let current_no = chain.chain_info().best_block_number;
|
||||
let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions();
|
||||
let last_request = *self.sealing_block_last_request.lock();
|
||||
let should_disable_sealing = !self.forced_sealing()
|
||||
&& !has_local_transactions
|
||||
&& current_no > last_request
|
||||
&& current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS;
|
||||
trace!(target: "miner", "update_sealing");
|
||||
let requires_reseal = {
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
if sealing_work.enabled {
|
||||
trace!(target: "miner", "update_sealing: sealing enabled");
|
||||
let current_no = chain.chain_info().best_block_number;
|
||||
let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions();
|
||||
let last_request = *self.sealing_block_last_request.lock();
|
||||
let should_disable_sealing = !self.forced_sealing()
|
||||
&& !has_local_transactions
|
||||
&& current_no > last_request
|
||||
&& current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS;
|
||||
|
||||
if should_disable_sealing {
|
||||
trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request);
|
||||
self.sealing_enabled.store(false, atomic::Ordering::Relaxed);
|
||||
self.sealing_work.lock().reset();
|
||||
trace!(target: "miner", "update_sealing: should_disable_sealing={}; current_no={}, last_request={}", should_disable_sealing, current_no, last_request);
|
||||
|
||||
if should_disable_sealing {
|
||||
trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request);
|
||||
sealing_work.enabled = false;
|
||||
sealing_work.queue.reset();
|
||||
false
|
||||
} else {
|
||||
// sealing enabled and we don't want to sleep.
|
||||
*self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period;
|
||||
true
|
||||
}
|
||||
} else {
|
||||
*self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period;
|
||||
// --------------------------------------------------------------------------
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.prepare_sealing(chain);
|
||||
// sealing is disabled.
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if requires_reseal {
|
||||
// --------------------------------------------------------------------------
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.prepare_sealing(chain);
|
||||
}
|
||||
}
|
||||
|
||||
@ -758,13 +785,13 @@ impl MinerService for Miner {
|
||||
self.enable_and_prepare_sealing(chain);
|
||||
trace!(target: "miner", "map_sealing_work: sealing prepared");
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let ret = sealing_work.use_last_ref();
|
||||
let ret = sealing_work.queue.use_last_ref();
|
||||
trace!(target: "miner", "map_sealing_work: leaving use_last_ref={:?}", ret.as_ref().map(|b| b.block().fields().header.hash()));
|
||||
ret.map(f)
|
||||
}
|
||||
|
||||
fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
|
||||
let result = if let Some(b) = self.sealing_work.lock().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) {
|
||||
let result = if let Some(b) = self.sealing_work.lock().queue.get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) {
|
||||
b.lock().try_seal(self.engine(), seal).or_else(|_| {
|
||||
warn!(target: "miner", "Mined solution rejected: Invalid.");
|
||||
Err(Error::PowInvalid)
|
||||
@ -783,6 +810,8 @@ impl MinerService for Miner {
|
||||
}
|
||||
|
||||
fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
|
||||
trace!(target: "miner", "chain_new_blocks");
|
||||
|
||||
fn fetch_transactions(chain: &MiningBlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
||||
let block = chain
|
||||
.block(BlockID::Hash(*hash))
|
||||
@ -838,11 +867,13 @@ impl MinerService for Miner {
|
||||
});
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.update_sealing(chain);
|
||||
if enacted.len() > 0 {
|
||||
// --------------------------------------------------------------------------
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.update_sealing(chain);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user