Fix parallel transactions race-condition (#10995)
* WiP : clear pending txs cache & tick in Miner * Fixed pending transactions * Revert debugging code * Add ToDo comment * Remove commented-out code * Reverse LTO setting * WiP * Try to seal a new block if there are pending transactions * Try resealing only for internal imports * Remove logging * Use AtomicU64 instead of Mutex<BlockNumber> * Remove TxQueue cache clear // Update AtomicUint logic * Update comments in Miner * Revert import of `parking_lot` * Update `transaction-pool` dependency * Call directly `update_sealing` * Call `update_sealing` directly
This commit is contained in:
@@ -14,6 +14,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use common_types::{
|
||||
header::Header,
|
||||
engines::{
|
||||
@@ -51,6 +53,7 @@ impl From<ethjson::spec::InstantSealParams> for InstantSealParams {
|
||||
pub struct InstantSeal {
|
||||
params: InstantSealParams,
|
||||
machine: Machine,
|
||||
last_sealed_block: AtomicU64,
|
||||
}
|
||||
|
||||
impl InstantSeal {
|
||||
@@ -59,6 +62,7 @@ impl InstantSeal {
|
||||
InstantSeal {
|
||||
params,
|
||||
machine,
|
||||
last_sealed_block: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,11 +75,19 @@ impl Engine for InstantSeal {
|
||||
fn sealing_state(&self) -> SealingState { SealingState::Ready }
|
||||
|
||||
fn generate_seal(&self, block: &ExecutedBlock, _parent: &Header) -> Seal {
|
||||
if block.transactions.is_empty() {
|
||||
Seal::None
|
||||
} else {
|
||||
Seal::Regular(Vec::new())
|
||||
if !block.transactions.is_empty() {
|
||||
let block_number = block.header.number();
|
||||
let last_sealed_block = self.last_sealed_block.load(Ordering::SeqCst);
|
||||
// Return a regular seal if the given block is _higher_ than
|
||||
// the last sealed one
|
||||
if block_number > last_sealed_block {
|
||||
let prev_last_sealed_block = self.last_sealed_block.compare_and_swap(last_sealed_block, block_number, Ordering::SeqCst);
|
||||
if prev_last_sealed_block == last_sealed_block {
|
||||
return Seal::Regular(Vec::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
Seal::None
|
||||
}
|
||||
|
||||
fn verify_local_seal(&self, _header: &Header) -> Result<(), Error> {
|
||||
|
||||
@@ -47,7 +47,7 @@ state-db = { path = "../state-db" }
|
||||
time-utils = { path = "../../util/time-utils" }
|
||||
tiny-keccak = "1.4"
|
||||
trace = { path = "../trace" }
|
||||
transaction-pool = "2.0"
|
||||
transaction-pool = "2.0.1"
|
||||
url = "1"
|
||||
vm = { path = "../vm" }
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ use futures::sync::mpsc;
|
||||
use io::IoChannel;
|
||||
use miner::filter_options::{FilterOptions, FilterOperator};
|
||||
use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache};
|
||||
use miner;
|
||||
use miner::{self, MinerService};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use rayon::prelude::*;
|
||||
use types::{
|
||||
@@ -58,7 +58,7 @@ use using_queue::{UsingQueue, GetAction};
|
||||
|
||||
use block::{ClosedBlock, SealedBlock};
|
||||
use client::{BlockProducer, SealedBlockImporter, Client};
|
||||
use client_traits::{BlockChain, ChainInfo, Nonce, TransactionInfo};
|
||||
use client_traits::{BlockChain, ChainInfo, EngineClient, Nonce, TransactionInfo};
|
||||
use engine::{Engine, signer::EngineSigner};
|
||||
use machine::executive::contract_address;
|
||||
use spec::Spec;
|
||||
@@ -859,9 +859,9 @@ impl Miner {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare pending block, check whether sealing is needed, and then update sealing.
|
||||
fn prepare_and_update_sealing<C: miner::BlockChainClient>(&self, chain: &C) {
|
||||
use miner::MinerService;
|
||||
match self.engine.sealing_state() {
|
||||
SealingState::Ready => {
|
||||
self.maybe_enable_sealing();
|
||||
@@ -1429,6 +1429,9 @@ impl miner::MinerService for Miner {
|
||||
service_transaction_checker.as_ref(),
|
||||
);
|
||||
queue.cull(client);
|
||||
if is_internal_import {
|
||||
chain.update_sealing();
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = channel.send(ClientIoMessage::<Client>::execute(cull)) {
|
||||
@@ -1436,8 +1439,12 @@ impl miner::MinerService for Miner {
|
||||
}
|
||||
} else {
|
||||
self.transaction_queue.cull(client);
|
||||
if is_internal_import {
|
||||
self.update_sealing(chain);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref service_transaction_checker) = self.service_transaction_checker {
|
||||
match service_transaction_checker.refresh_cache(chain) {
|
||||
Ok(true) => {
|
||||
|
||||
Reference in New Issue
Block a user