Offload cull to IoWorker. (#9099)

This commit is contained in:
Tomasz Drwięga 2018-07-13 12:23:57 +02:00 committed by Afri Schoedon
parent 993650f3d6
commit 5f523f6966
5 changed files with 67 additions and 17 deletions

View File

@ -94,6 +94,7 @@ impl ClientService {
let pruning = config.pruning; let pruning = config.pruning;
let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?; let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?;
miner.set_io_channel(io_service.channel());
let snapshot_params = SnapServiceParams { let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(), engine: spec.engine.clone(),

View File

@ -200,7 +200,7 @@ pub struct Client {
/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`. /// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
liveness: AtomicBool, liveness: AtomicBool,
io_channel: Mutex<IoChannel<ClientIoMessage>>, io_channel: RwLock<IoChannel<ClientIoMessage>>,
/// List of actors to be notified on certain chain events /// List of actors to be notified on certain chain events
notify: RwLock<Vec<Weak<ChainNotify>>>, notify: RwLock<Vec<Weak<ChainNotify>>>,
@ -761,7 +761,7 @@ impl Client {
db: RwLock::new(db.clone()), db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db), state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel), io_channel: RwLock::new(message_channel),
notify: RwLock::new(Vec::new()), notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size), queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
@ -995,7 +995,7 @@ impl Client {
/// Replace io channel. Useful for testing. /// Replace io channel. Useful for testing.
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) { pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.lock() = io_channel; *self.io_channel.write() = io_channel;
} }
/// Get a copy of the best block's state. /// Get a copy of the best block's state.
@ -2011,7 +2011,7 @@ impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) { fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions"); trace_time!("queue_transactions");
let len = transactions.len(); let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { self.queue_transactions.queue(&self.io_channel.read(), len, move |client| {
trace_time!("import_queued_transactions"); trace_time!("import_queued_transactions");
let txs: Vec<UnverifiedTransaction> = transactions let txs: Vec<UnverifiedTransaction> = transactions
@ -2060,7 +2060,7 @@ impl IoClient for Client {
let queued = self.queued_ancient_blocks.clone(); let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone(); let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block"); trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order. // Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing. // We use separate lock, cause we don't want to block queueing.
@ -2092,7 +2092,7 @@ impl IoClient for Client {
} }
fn queue_consensus_message(&self, message: Bytes) { fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) { if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e); debug!(target: "poa", "Invalid message received: {}", e);
} }
@ -2202,7 +2202,14 @@ impl ImportSealedBlock for Client {
route route
}; };
let route = ChainRoute::from([route].as_ref()); let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), self.engine.seals_internally().is_some()); self.importer.miner.chain_new_blocks(
self,
&[h.clone()],
&[],
route.enacted(),
route.retracted(),
self.engine.seals_internally().is_some(),
);
self.notify(|notify| { self.notify(|notify| {
notify.new_blocks( notify.new_blocks(
vec![h.clone()], vec![h.clone()],
@ -2526,7 +2533,7 @@ impl IoChannelQueue {
} }
} }
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static, F: Fn(&Client) + Send + Sync + 'static,
{ {
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);

View File

@ -28,6 +28,7 @@ use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStat
#[cfg(feature = "work-notify")] #[cfg(feature = "work-notify")]
use ethcore_miner::work_notify::NotifyWork; use ethcore_miner::work_notify::NotifyWork;
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use io::IoChannel;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use rayon::prelude::*; use rayon::prelude::*;
use transaction::{ use transaction::{
@ -44,7 +45,7 @@ use block::{ClosedBlock, IsBlock, Block, SealedBlock};
use client::{ use client::{
BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce
}; };
use client::BlockId; use client::{BlockId, ClientIoMessage};
use executive::contract_address; use executive::contract_address;
use header::{Header, BlockNumber}; use header::{Header, BlockNumber};
use miner; use miner;
@ -211,6 +212,7 @@ pub struct Miner {
transaction_queue: Arc<TransactionQueue>, transaction_queue: Arc<TransactionQueue>,
engine: Arc<EthEngine>, engine: Arc<EthEngine>,
accounts: Option<Arc<AccountProvider>>, accounts: Option<Arc<AccountProvider>>,
io_channel: RwLock<Option<IoChannel<ClientIoMessage>>>,
} }
impl Miner { impl Miner {
@ -227,7 +229,12 @@ impl Miner {
} }
/// Creates new instance of miner Arc. /// Creates new instance of miner Arc.
pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option<Arc<AccountProvider>>) -> Self { pub fn new(
options: MinerOptions,
gas_pricer: GasPricer,
spec: &Spec,
accounts: Option<Arc<AccountProvider>>,
) -> Self {
let limits = options.pool_limits.clone(); let limits = options.pool_limits.clone();
let verifier_options = options.pool_verification_options.clone(); let verifier_options = options.pool_verification_options.clone();
let tx_queue_strategy = options.tx_queue_strategy; let tx_queue_strategy = options.tx_queue_strategy;
@ -251,6 +258,7 @@ impl Miner {
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)), transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts, accounts,
engine: spec.engine.clone(), engine: spec.engine.clone(),
io_channel: RwLock::new(None),
} }
} }
@ -270,6 +278,11 @@ impl Miner {
}, GasPricer::new_fixed(minimal_gas_price), spec, accounts) }, GasPricer::new_fixed(minimal_gas_price), spec, accounts)
} }
/// Sets `IoChannel`
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.write() = Some(io_channel);
}
/// Clear all pending block states /// Clear all pending block states
pub fn clear(&self) { pub fn clear(&self) {
self.sealing.lock().queue.reset(); self.sealing.lock().queue.reset();
@ -1176,7 +1189,32 @@ impl miner::MinerService for Miner {
// (thanks to Ready), but culling can take significant amount of time, // (thanks to Ready), but culling can take significant amount of time,
// so best to leave it after we create some work for miners to prevent increased // so best to leave it after we create some work for miners to prevent increased
// uncle rate. // uncle rate.
self.transaction_queue.cull(client); // If the io_channel is available attempt to offload culling to a separate task
// to avoid blocking chain_new_blocks
if let Some(ref channel) = *self.io_channel.read() {
let queue = self.transaction_queue.clone();
let nonce_cache = self.nonce_cache.clone();
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let refuse_service_transactions = self.options.refuse_service_transactions;
let cull = move |chain: &::client::Client| {
let client = PoolClient::new(
chain,
&nonce_cache,
&*engine,
accounts.as_ref().map(|x| &**x),
refuse_service_transactions,
);
queue.cull(client);
};
if let Err(e) = channel.send(ClientIoMessage::execute(cull)) {
warn!(target: "miner", "Error queueing cull: {:?}", e);
}
} else {
self.transaction_queue.cull(client);
}
} }
} }

View File

@ -16,8 +16,11 @@
//! Blockchain access for transaction pool. //! Blockchain access for transaction pool.
use std::fmt; use std::{
use std::collections::HashMap; collections::HashMap,
fmt,
sync::Arc,
};
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use ethcore_miner::pool; use ethcore_miner::pool;
@ -37,9 +40,9 @@ use miner;
use miner::service_transaction_checker::ServiceTransactionChecker; use miner::service_transaction_checker::ServiceTransactionChecker;
/// Cache for state nonces. /// Cache for state nonces.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct NonceCache { pub struct NonceCache {
nonces: RwLock<HashMap<Address, U256>>, nonces: Arc<RwLock<HashMap<Address, U256>>>,
limit: usize limit: usize
} }
@ -47,7 +50,7 @@ impl NonceCache {
/// Create new cache with a limit of `limit` entries. /// Create new cache with a limit of `limit` entries.
pub fn new(limit: usize) -> Self { pub fn new(limit: usize) -> Self {
NonceCache { NonceCache {
nonces: RwLock::new(HashMap::with_capacity(limit / 2)), nonces: Arc::new(RwLock::new(HashMap::with_capacity(limit / 2))),
limit, limit,
} }
} }

View File

@ -504,7 +504,8 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
cmd.miner_options, cmd.miner_options,
cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()), cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()),
&spec, &spec,
Some(account_provider.clone()) Some(account_provider.clone()),
)); ));
miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed"); miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed");
miner.set_gas_range_target(cmd.miner_extras.gas_range_target); miner.set_gas_range_target(cmd.miner_extras.gas_range_target);