// Copyright 2015-2020 Parity Technologies (UK) Ltd. // This file is part of OpenEthereum. // OpenEthereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // OpenEthereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with OpenEthereum. If not, see . use deque; use futures::future::{self, Loop}; use service_mio::{HandlerId, IoChannel, IoContext}; use std::{ sync::{ atomic::{AtomicBool, Ordering as AtomicOrdering}, Arc, }, thread::{self, JoinHandle}, }; use tokio::{self}; use IoHandler; use LOCAL_STACK_SIZE; use parking_lot::{Condvar, Mutex}; const STACK_SIZE: usize = 16 * 1024 * 1024; pub enum WorkType { Readable, Writable, Hup, Timeout, Message(Arc), } pub struct Work { pub work_type: WorkType, pub token: usize, pub handler_id: HandlerId, pub handler: Arc>, } /// An IO worker thread /// Sorts them ready for blockchain insertion. pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, wait_mutex: Arc>, } impl Worker { /// Creates a new worker instance. pub fn new( name: &str, stealer: deque::Stealer>, channel: IoChannel, wait: Arc, wait_mutex: Arc>, ) -> Worker where Message: Send + Sync + 'static, { let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { thread: None, wait: wait.clone(), deleting: deleting.clone(), wait_mutex: wait_mutex.clone(), }; worker.thread = Some( thread::Builder::new() .stack_size(STACK_SIZE) .name(format!("Worker {}", name)) .spawn(move || { LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE)); let ini = (stealer, channel.clone(), wait, wait_mutex.clone(), deleting); let future = future::loop_fn(ini, |(stealer, channel, wait, wait_mutex, deleting)| { { let mut lock = wait_mutex.lock(); if deleting.load(AtomicOrdering::SeqCst) { return Ok(Loop::Break(())); } wait.wait(&mut lock); } while !deleting.load(AtomicOrdering::SeqCst) { match stealer.steal() { deque::Steal::Data(work) => { Worker::do_work(work, channel.clone()) } deque::Steal::Retry => {} deque::Steal::Empty => break, } } Ok(Loop::Continue(( stealer, channel, wait, wait_mutex, deleting, ))) }); if let Err(()) = tokio::runtime::current_thread::block_on_all(future) { error!(target: "ioworker", "error while executing future") } }) .expect("Error creating worker thread"), ); worker } fn do_work(work: Work, channel: IoChannel) where Message: Send + Sync + 'static, { match work.work_type { WorkType::Readable => { work.handler .stream_readable(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Writable => { work.handler .stream_writable(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Hup => { work.handler .stream_hup(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Timeout => { work.handler .timeout(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Message(message) => { work.handler .message(&IoContext::new(channel, work.handler_id), &*message); } } } } impl Drop for Worker { fn drop(&mut self) { trace!(target: "shutdown", "[IoWorker] Closing..."); let _ = self.wait_mutex.lock(); self.deleting.store(true, AtomicOrdering::SeqCst); self.wait.notify_all(); if let Some(thread) = self.thread.take() { thread.join().ok(); } trace!(target: "shutdown", "[IoWorker] Closed"); } }