openethereum/util/io/src/worker.rs
Arkadiy Paronyan 487dfb0208 Snapshot sync part 2 (#2098)
* Split block downloader into a module

* Snapshot sync progress

* Warp sync CLI option

* Increased snapshot chunk and ping timeouts

* Fixed an issue with delayed writes

* Updated bootnodes

* Don't run pending IO tasks on shutdown

* Optional first_block; removed insert_snapshot_block

* Fixing expect calls

* Fixed stalled sync

* style and docs

* Update block_sync.rs

[ci:skip]
2016-10-18 18:16:00 +02:00

145 lines
4.3 KiB
Rust

// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::mem;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
use service::{HandlerId, IoChannel, IoContext};
use IoHandler;
use panics::*;
use std::cell::Cell;
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
const STACK_SIZE: usize = 16*1024*1024;
thread_local! {
/// Stack size
/// Should be modified if it is changed in Rust since it is no way
/// to know or get it
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
}
pub enum WorkType<Message> {
Readable,
Writable,
Hup,
Timeout,
Message(Message)
}
pub struct Work<Message> {
pub work_type: WorkType<Message>,
pub token: usize,
pub handler_id: HandlerId,
pub handler: Arc<IoHandler<Message>>,
}
/// An IO worker thread
/// Sorts them ready for blockchain insertion.
pub struct Worker {
thread: Option<JoinHandle<()>>,
wait: Arc<SCondvar>,
deleting: Arc<AtomicBool>,
wait_mutex: Arc<SMutex<()>>,
}
impl Worker {
/// Creates a new worker instance.
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<SCondvar>,
wait_mutex: Arc<SMutex<()>>,
panic_handler: Arc<PanicHandler>
) -> Worker
where Message: Send + Sync + Clone + 'static {
let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker {
thread: None,
wait: wait.clone(),
deleting: deleting.clone(),
wait_mutex: wait_mutex.clone(),
};
worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn(
move || {
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
panic_handler.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap()
})
.expect("Error creating worker thread"));
worker
}
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<SCondvar>,
wait_mutex: Arc<SMutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static {
loop {
{
let lock = wait_mutex.lock().unwrap();
if deleting.load(AtomicOrdering::Acquire) {
return;
}
let _ = wait.wait(lock);
}
while !deleting.load(AtomicOrdering::Acquire) {
match stealer.steal() {
chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()),
_ => break,
}
}
}
}
fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + Clone + 'static {
match work.work_type {
WorkType::Readable => {
work.handler.stream_readable(&IoContext::new(channel, work.handler_id), work.token);
},
WorkType::Writable => {
work.handler.stream_writable(&IoContext::new(channel, work.handler_id), work.token);
}
WorkType::Hup => {
work.handler.stream_hup(&IoContext::new(channel, work.handler_id), work.token);
}
WorkType::Timeout => {
work.handler.timeout(&IoContext::new(channel, work.handler_id), work.token);
}
WorkType::Message(message) => {
work.handler.message(&IoContext::new(channel, work.handler_id), &message);
}
}
}
}
impl Drop for Worker {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock().unwrap();
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap();
thread.join().ok();
trace!(target: "shutdown", "[IoWorker] Closed");
}
}