// Copyright 2015, 2016 Ethcore (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . use std::sync::Arc; use std::mem; use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use crossbeam::sync::chase_lev; use service::{HandlerId, IoChannel, IoContext}; use IoHandler; use panics::*; use std::cell::Cell; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; const STACK_SIZE: usize = 16*1024*1024; thread_local! { /// Stack size /// Should be modified if it is changed in Rust since it is no way /// to know or get it pub static LOCAL_STACK_SIZE: Cell = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024)); } pub enum WorkType { Readable, Writable, Hup, Timeout, Message(Message) } pub struct Work { pub work_type: WorkType, pub token: usize, pub handler_id: HandlerId, pub handler: Arc>, } /// An IO worker thread /// Sorts them ready for blockchain insertion. pub struct Worker { thread: Option>, wait: Arc, deleting: Arc, wait_mutex: Arc>, } impl Worker { /// Creates a new worker instance. pub fn new(index: usize, stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, wait_mutex: Arc>, panic_handler: Arc ) -> Worker where Message: Send + Sync + Clone + 'static { let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { thread: None, wait: wait.clone(), deleting: deleting.clone(), wait_mutex: wait_mutex.clone(), }; worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn( move || { LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE)); panic_handler.catch_panic(move || { Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) }).unwrap() }) .expect("Error creating worker thread")); worker } fn work_loop(stealer: chase_lev::Stealer>, channel: IoChannel, wait: Arc, wait_mutex: Arc>, deleting: Arc) where Message: Send + Sync + Clone + 'static { loop { { let lock = wait_mutex.lock().unwrap(); if deleting.load(AtomicOrdering::Acquire) { return; } let _ = wait.wait(lock); } while !deleting.load(AtomicOrdering::Acquire) { match stealer.steal() { chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()), _ => break, } } } } fn do_work(work: Work, channel: IoChannel) where Message: Send + Sync + Clone + 'static { match work.work_type { WorkType::Readable => { work.handler.stream_readable(&IoContext::new(channel, work.handler_id), work.token); }, WorkType::Writable => { work.handler.stream_writable(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Hup => { work.handler.stream_hup(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Timeout => { work.handler.timeout(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Message(message) => { work.handler.message(&IoContext::new(channel, work.handler_id), &message); } } } } impl Drop for Worker { fn drop(&mut self) { trace!(target: "shutdown", "[IoWorker] Closing..."); let _ = self.wait_mutex.lock().unwrap(); self.deleting.store(true, AtomicOrdering::Release); self.wait.notify_all(); let thread = mem::replace(&mut self.thread, None).unwrap(); thread.join().ok(); trace!(target: "shutdown", "[IoWorker] Closed"); } }