diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 50735612e..81997be07 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -94,6 +94,7 @@ impl ClientService { let pruning = config.pruning; let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?; + miner.set_io_channel(io_service.channel()); let snapshot_params = SnapServiceParams { engine: spec.engine.clone(), diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c3338ce21..3eb0a7815 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -200,7 +200,7 @@ pub struct Client { /// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`. liveness: AtomicBool, - io_channel: Mutex>, + io_channel: RwLock>, /// List of actors to be notified on certain chain events notify: RwLock>>, @@ -761,7 +761,7 @@ impl Client { db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), report: RwLock::new(Default::default()), - io_channel: Mutex::new(message_channel), + io_channel: RwLock::new(message_channel), notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), @@ -995,7 +995,7 @@ impl Client { /// Replace io channel. Useful for testing. pub fn set_io_channel(&self, io_channel: IoChannel) { - *self.io_channel.lock() = io_channel; + *self.io_channel.write() = io_channel; } /// Get a copy of the best block's state. @@ -2011,7 +2011,7 @@ impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { trace_time!("queue_transactions"); let len = transactions.len(); - self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { + self.queue_transactions.queue(&self.io_channel.read(), len, move |client| { trace_time!("import_queued_transactions"); let txs: Vec = transactions @@ -2060,7 +2060,7 @@ impl IoClient for Client { let queued = self.queued_ancient_blocks.clone(); let lock = self.ancient_blocks_import_lock.clone(); - match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { + match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| { trace_time!("import_ancient_block"); // Make sure to hold the lock here to prevent importing out of order. // We use separate lock, cause we don't want to block queueing. @@ -2092,7 +2092,7 @@ impl IoClient for Client { } fn queue_consensus_message(&self, message: Bytes) { - match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { + match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| { if let Err(e) = client.engine().handle_message(&message) { debug!(target: "poa", "Invalid message received: {}", e); } @@ -2202,7 +2202,14 @@ impl ImportSealedBlock for Client { route }; let route = ChainRoute::from([route].as_ref()); - self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), self.engine.seals_internally().is_some()); + self.importer.miner.chain_new_blocks( + self, + &[h.clone()], + &[], + route.enacted(), + route.retracted(), + self.engine.seals_internally().is_some(), + ); self.notify(|notify| { notify.new_blocks( vec![h.clone()], @@ -2526,7 +2533,7 @@ impl IoChannelQueue { } } - pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where + pub fn queue(&self, channel: &IoChannel, count: usize, fun: F) -> Result<(), QueueError> where F: Fn(&Client) + Send + Sync + 'static, { let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index d196dc2f0..8ff29848c 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -28,6 +28,7 @@ use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStat #[cfg(feature = "work-notify")] use ethcore_miner::work_notify::NotifyWork; use ethereum_types::{H256, U256, Address}; +use io::IoChannel; use parking_lot::{Mutex, RwLock}; use rayon::prelude::*; use transaction::{ @@ -44,7 +45,7 @@ use block::{ClosedBlock, IsBlock, Block, SealedBlock}; use client::{ BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce }; -use client::BlockId; +use client::{BlockId, ClientIoMessage}; use executive::contract_address; use header::{Header, BlockNumber}; use miner; @@ -211,6 +212,7 @@ pub struct Miner { transaction_queue: Arc, engine: Arc, accounts: Option>, + io_channel: RwLock>>, } impl Miner { @@ -227,7 +229,12 @@ impl Miner { } /// Creates new instance of miner Arc. - pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option>) -> Self { + pub fn new( + options: MinerOptions, + gas_pricer: GasPricer, + spec: &Spec, + accounts: Option>, + ) -> Self { let limits = options.pool_limits.clone(); let verifier_options = options.pool_verification_options.clone(); let tx_queue_strategy = options.tx_queue_strategy; @@ -251,6 +258,7 @@ impl Miner { transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)), accounts, engine: spec.engine.clone(), + io_channel: RwLock::new(None), } } @@ -270,6 +278,11 @@ impl Miner { }, GasPricer::new_fixed(minimal_gas_price), spec, accounts) } + /// Sets `IoChannel` + pub fn set_io_channel(&self, io_channel: IoChannel) { + *self.io_channel.write() = Some(io_channel); + } + /// Clear all pending block states pub fn clear(&self) { self.sealing.lock().queue.reset(); @@ -1176,7 +1189,32 @@ impl miner::MinerService for Miner { // (thanks to Ready), but culling can take significant amount of time, // so best to leave it after we create some work for miners to prevent increased // uncle rate. - self.transaction_queue.cull(client); + // If the io_channel is available attempt to offload culling to a separate task + // to avoid blocking chain_new_blocks + if let Some(ref channel) = *self.io_channel.read() { + let queue = self.transaction_queue.clone(); + let nonce_cache = self.nonce_cache.clone(); + let engine = self.engine.clone(); + let accounts = self.accounts.clone(); + let refuse_service_transactions = self.options.refuse_service_transactions; + + let cull = move |chain: &::client::Client| { + let client = PoolClient::new( + chain, + &nonce_cache, + &*engine, + accounts.as_ref().map(|x| &**x), + refuse_service_transactions, + ); + queue.cull(client); + }; + + if let Err(e) = channel.send(ClientIoMessage::execute(cull)) { + warn!(target: "miner", "Error queueing cull: {:?}", e); + } + } else { + self.transaction_queue.cull(client); + } } } diff --git a/ethcore/src/miner/pool_client.rs b/ethcore/src/miner/pool_client.rs index f537a2757..25d9a809c 100644 --- a/ethcore/src/miner/pool_client.rs +++ b/ethcore/src/miner/pool_client.rs @@ -16,8 +16,11 @@ //! Blockchain access for transaction pool. -use std::fmt; -use std::collections::HashMap; +use std::{ + collections::HashMap, + fmt, + sync::Arc, +}; use ethereum_types::{H256, U256, Address}; use ethcore_miner::pool; @@ -37,9 +40,9 @@ use miner; use miner::service_transaction_checker::ServiceTransactionChecker; /// Cache for state nonces. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NonceCache { - nonces: RwLock>, + nonces: Arc>>, limit: usize } @@ -47,7 +50,7 @@ impl NonceCache { /// Create new cache with a limit of `limit` entries. pub fn new(limit: usize) -> Self { NonceCache { - nonces: RwLock::new(HashMap::with_capacity(limit / 2)), + nonces: Arc::new(RwLock::new(HashMap::with_capacity(limit / 2))), limit, } } diff --git a/parity/run.rs b/parity/run.rs index 176121d75..a8082b724 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -504,7 +504,8 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: cmd.miner_options, cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()), &spec, - Some(account_provider.clone()) + Some(account_provider.clone()), + )); miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed"); miner.set_gas_range_target(cmd.miner_extras.gas_range_target);