Offload cull to IoWorker.

This commit is contained in:
Tomasz Drwięga 2018-07-11 17:12:00 +02:00
parent 4ba600fcc4
commit 485acc5229
No known key found for this signature in database
GPG Key ID: D066F497E62CAF66
5 changed files with 67 additions and 17 deletions

View File

@ -94,6 +94,7 @@ impl ClientService {
let pruning = config.pruning;
let client = Client::new(config, &spec, client_db.clone(), miner.clone(), io_service.channel())?;
miner.set_io_channel(io_service.channel());
let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(),

View File

@ -200,7 +200,7 @@ pub struct Client {
/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
liveness: AtomicBool,
io_channel: Mutex<IoChannel<ClientIoMessage>>,
io_channel: RwLock<IoChannel<ClientIoMessage>>,
/// List of actors to be notified on certain chain events
notify: RwLock<Vec<Weak<ChainNotify>>>,
@ -712,7 +712,7 @@ impl Client {
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel),
io_channel: RwLock::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
@ -947,7 +947,7 @@ impl Client {
/// Replace io channel. Useful for testing.
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.lock() = io_channel;
*self.io_channel.write() = io_channel;
}
/// Get a copy of the best block's state.
@ -1952,7 +1952,7 @@ impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
self.queue_transactions.queue(&self.io_channel.read(), len, move |client| {
trace_time!("import_queued_transactions");
let txs: Vec<UnverifiedTransaction> = transactions
@ -2001,7 +2001,7 @@ impl IoClient for Client {
let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
@ -2033,7 +2033,7 @@ impl IoClient for Client {
}
fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
@ -2142,7 +2142,14 @@ impl ImportSealedBlock for Client {
route
};
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
self.importer.miner.chain_new_blocks(
self,
&[h.clone()],
&[],
route.enacted(),
route.retracted(),
self.engine.seals_internally().is_some(),
);
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
@ -2452,7 +2459,7 @@ impl IoChannelQueue {
}
}
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);

View File

@ -27,6 +27,7 @@ use ethcore_miner::gas_pricer::GasPricer;
use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy};
use ethcore_miner::work_notify::NotifyWork;
use ethereum_types::{H256, U256, Address};
use io::IoChannel;
use parking_lot::{Mutex, RwLock};
use rayon::prelude::*;
use transaction::{
@ -43,7 +44,7 @@ use block::{ClosedBlock, IsBlock, Block, SealedBlock};
use client::{
BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce
};
use client::BlockId;
use client::{BlockId, ClientIoMessage};
use executive::contract_address;
use header::{Header, BlockNumber};
use miner;
@ -209,6 +210,7 @@ pub struct Miner {
transaction_queue: Arc<TransactionQueue>,
engine: Arc<EthEngine>,
accounts: Option<Arc<AccountProvider>>,
io_channel: RwLock<Option<IoChannel<ClientIoMessage>>>,
}
impl Miner {
@ -224,7 +226,12 @@ impl Miner {
}
/// Creates new instance of miner Arc.
pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option<Arc<AccountProvider>>) -> Self {
pub fn new(
options: MinerOptions,
gas_pricer: GasPricer,
spec: &Spec,
accounts: Option<Arc<AccountProvider>>,
) -> Self {
let limits = options.pool_limits.clone();
let verifier_options = options.pool_verification_options.clone();
let tx_queue_strategy = options.tx_queue_strategy;
@ -247,6 +254,7 @@ impl Miner {
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts,
engine: spec.engine.clone(),
io_channel: RwLock::new(None),
}
}
@ -266,6 +274,11 @@ impl Miner {
}, GasPricer::new_fixed(minimal_gas_price), spec, accounts)
}
/// Sets `IoChannel`
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.write() = Some(io_channel);
}
/// Clear all pending block states
pub fn clear(&self) {
self.sealing.lock().queue.reset();
@ -1130,9 +1143,34 @@ impl miner::MinerService for Miner {
// (thanks to Ready), but culling can take significant amount of time,
// so best to leave it after we create some work for miners to prevent increased
// uncle rate.
// If the io_channel is available attempt to offload culling to a separate task
// to avoid blocking chain_new_blocks
if let Some(ref channel) = *self.io_channel.read() {
let queue = self.transaction_queue.clone();
let nonce_cache = self.nonce_cache.clone();
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let refuse_service_transactions = self.options.refuse_service_transactions;
let cull = move |chain: &::client::Client| {
let client = PoolClient::new(
chain,
&nonce_cache,
&*engine,
accounts.as_ref().map(|x| &**x),
refuse_service_transactions,
);
queue.cull(client);
};
if let Err(e) = channel.send(ClientIoMessage::execute(cull)) {
warn!(target: "miner", "Error queueing cull: {:?}", e);
}
} else {
self.transaction_queue.cull(client);
}
}
}
fn pending_state(&self, latest_block_number: BlockNumber) -> Option<Self::State> {
self.map_existing_pending_block(|b| b.state().clone(), latest_block_number)

View File

@ -16,8 +16,11 @@
//! Blockchain access for transaction pool.
use std::fmt;
use std::collections::HashMap;
use std::{
collections::HashMap,
fmt,
sync::Arc,
};
use ethereum_types::{H256, U256, Address};
use ethcore_miner::pool;
@ -37,9 +40,9 @@ use miner;
use miner::service_transaction_checker::ServiceTransactionChecker;
/// Cache for state nonces.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NonceCache {
nonces: RwLock<HashMap<Address, U256>>,
nonces: Arc<RwLock<HashMap<Address, U256>>>,
limit: usize
}
@ -47,7 +50,7 @@ impl NonceCache {
/// Create new cache with a limit of `limit` entries.
pub fn new(limit: usize) -> Self {
NonceCache {
nonces: RwLock::new(HashMap::with_capacity(limit / 2)),
nonces: Arc::new(RwLock::new(HashMap::with_capacity(limit / 2))),
limit,
}
}

View File

@ -550,7 +550,8 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
cmd.miner_options,
cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()),
&spec,
Some(account_provider.clone())
Some(account_provider.clone()),
));
miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed");
miner.set_gas_range_target(cmd.miner_extras.gas_range_target);