6654d02163
* Trace precompiled contracts when the transfer value is not zero (#8486) * Trace precompiled contracts when the transfer value is not zero * Add tests for precompiled CALL tracing * Use byzantium test machine for the new test * Add notes in comments on why we don't trace all precompileds * Use is_transferred instead of transferred * Return error if RLP size of transaction exceeds the limit (#8473) * Return error if RLP size of transaction exceeds the limit * Review comments fixed * RLP check moved to verifier, corresponding pool test added * Don't block sync when importing old blocks (#8530) * Alter IO queueing. * Don't require IoMessages to be Clone * Ancient blocks imported via IoChannel. * Get rid of private transactions io message. * Get rid of deadlock and fix disconnected handler. * Revert to old disconnect condition. * Fix tests. * Fix deadlock. * Refactoring `ethcore-sync` - Fixing warp-sync barrier (#8543) * Start dividing sync chain : first supplier method * WIP - updated chain sync supplier * Finish refactoring the Chain Sync Supplier * Create Chain Sync Requester * Add Propagator for Chain Sync * Add the Chain Sync Handler * Move tests from mod -> handler * Move tests to propagator * Refactor SyncRequester arguments * Refactoring peer fork header handler * Fix wrong highest block number in snapshot sync * Small refactor... * Address PR grumbles * Retry failed CI job * Fix tests * PR Grumbles * Handle socket address parsing errors (#8545) Unpack errors and check for io::ErrorKind::InvalidInput and return our own AddressParse error. Remove the foreign link to std::net::AddrParseError and add an `impl From` for that error. Test parsing properly. * Fix packet count when talking with PAR2 peers (#8555) * Support diferent packet counts in different protocol versions. * Fix light timeouts and eclipse protection. * Fix devp2p tests. * Fix whisper-cli compilation. * Fix compilation. * Fix ethcore-sync tests. * Revert "Fix light timeouts and eclipse protection." This reverts commit 06285ea8c1d9d184d809f64b5507aece633da6cc. * Increase timeouts. * Add whisper CLI to the pipelines (#8578) * Add whisper CLI to the pipelines * Address todo, ref #8579 * Rename `whisper-cli binary` to `whisper` (#8579) * rename whisper-cli binary to whisper * fix tests * Remove manually added text to the errors (#8595) These messages were confusing for the users especially the help message. * Fix account list double 0x display (#8596) * Remove unused self import * Fix account list double 0x display * Fix BlockReward contract "arithmetic operation overflow" (#8611) * Fix BlockReward contract "arithmetic operation overflow" * Add docs on how execute_as_system works * Fix typo * Rlp decode returns Result (#8527) rlp::decode returns Result Make a best effort to handle decoding errors gracefully throughout the code, using `expect` where the value is guaranteed to be valid (and in other places where it makes sense). * Remove expect (#8536) * Remove expect and propagate rlp::DecoderErrors as TrieErrors * Decoding headers can fail (#8570) * rlp::decode returns Result * Fix journaldb to handle rlp::decode Result * Fix ethcore to work with rlp::decode returning Result * Light client handles rlp::decode returning Result * Fix tests in rlp_derive * Fix tests * Cleanup * cleanup * Allow panic rather than breaking out of iterator * Let decoding failures when reading from disk blow up * syntax * Fix the trivial grumbles * Fix failing tests * Make Account::from_rlp return Result * Syntx, sigh * Temp-fix for decoding failures * Header::decode returns Result Handle new return type throughout the code base. * Do not continue reading from the DB when a value could not be read * Fix tests * Handle header decoding in light_sync * Handling header decoding errors * Let the DecodeError bubble up unchanged * Remove redundant error conversion * fix compiler warning (#8590) * Attempt to fix intermittent test failures (#8584) Occasionally should_return_correct_nonces_when_dropped_because_of_limit fails, possibly because of multiple threads competing to finish. See CI logs here for an example: https://gitlab.parity.io/parity/parity/-/jobs/86738 * block_header can fail so return Result (#8581) * block_header can fail so return Result * Restore previous return type based on feedback * Fix failing doc tests running on non-code * Block::decode() returns Result (#8586) * Gitlab test script fixes (#8573) * Exclude /docs from modified files. * Ensure all references in the working tree are available * Remove duplicated line from test script
141 lines
4.2 KiB
Rust
141 lines
4.2 KiB
Rust
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
// This file is part of Parity.
|
|
|
|
// Parity is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Parity is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
use std::sync::Arc;
|
|
use std::thread::{JoinHandle, self};
|
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
|
use crossbeam::sync::chase_lev;
|
|
use service::{HandlerId, IoChannel, IoContext};
|
|
use IoHandler;
|
|
use std::cell::Cell;
|
|
|
|
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
|
|
|
const STACK_SIZE: usize = 16*1024*1024;
|
|
|
|
thread_local! {
|
|
/// Stack size
|
|
/// Should be modified if it is changed in Rust since it is no way
|
|
/// to know or get it
|
|
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
|
|
}
|
|
|
|
pub enum WorkType<Message> {
|
|
Readable,
|
|
Writable,
|
|
Hup,
|
|
Timeout,
|
|
Message(Arc<Message>)
|
|
}
|
|
|
|
pub struct Work<Message> {
|
|
pub work_type: WorkType<Message>,
|
|
pub token: usize,
|
|
pub handler_id: HandlerId,
|
|
pub handler: Arc<IoHandler<Message>>,
|
|
}
|
|
|
|
/// An IO worker thread
|
|
/// Sorts them ready for blockchain insertion.
|
|
pub struct Worker {
|
|
thread: Option<JoinHandle<()>>,
|
|
wait: Arc<SCondvar>,
|
|
deleting: Arc<AtomicBool>,
|
|
wait_mutex: Arc<SMutex<()>>,
|
|
}
|
|
|
|
impl Worker {
|
|
/// Creates a new worker instance.
|
|
pub fn new<Message>(index: usize,
|
|
stealer: chase_lev::Stealer<Work<Message>>,
|
|
channel: IoChannel<Message>,
|
|
wait: Arc<SCondvar>,
|
|
wait_mutex: Arc<SMutex<()>>,
|
|
) -> Worker
|
|
where Message: Send + Sync + 'static {
|
|
let deleting = Arc::new(AtomicBool::new(false));
|
|
let mut worker = Worker {
|
|
thread: None,
|
|
wait: wait.clone(),
|
|
deleting: deleting.clone(),
|
|
wait_mutex: wait_mutex.clone(),
|
|
};
|
|
worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn(
|
|
move || {
|
|
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
|
|
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
|
|
})
|
|
.expect("Error creating worker thread"));
|
|
worker
|
|
}
|
|
|
|
fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
|
|
channel: IoChannel<Message>, wait: Arc<SCondvar>,
|
|
wait_mutex: Arc<SMutex<()>>,
|
|
deleting: Arc<AtomicBool>)
|
|
where Message: Send + Sync + 'static {
|
|
loop {
|
|
{
|
|
let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
|
|
if deleting.load(AtomicOrdering::Acquire) {
|
|
return;
|
|
}
|
|
let _ = wait.wait(lock);
|
|
}
|
|
|
|
while !deleting.load(AtomicOrdering::Acquire) {
|
|
match stealer.steal() {
|
|
chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()),
|
|
_ => break,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + 'static {
|
|
match work.work_type {
|
|
WorkType::Readable => {
|
|
work.handler.stream_readable(&IoContext::new(channel, work.handler_id), work.token);
|
|
},
|
|
WorkType::Writable => {
|
|
work.handler.stream_writable(&IoContext::new(channel, work.handler_id), work.token);
|
|
}
|
|
WorkType::Hup => {
|
|
work.handler.stream_hup(&IoContext::new(channel, work.handler_id), work.token);
|
|
}
|
|
WorkType::Timeout => {
|
|
work.handler.timeout(&IoContext::new(channel, work.handler_id), work.token);
|
|
}
|
|
WorkType::Message(message) => {
|
|
work.handler.message(&IoContext::new(channel, work.handler_id), &*message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for Worker {
|
|
fn drop(&mut self) {
|
|
trace!(target: "shutdown", "[IoWorker] Closing...");
|
|
let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex");
|
|
self.deleting.store(true, AtomicOrdering::Release);
|
|
self.wait.notify_all();
|
|
if let Some(thread) = self.thread.take() {
|
|
thread.join().ok();
|
|
}
|
|
trace!(target: "shutdown", "[IoWorker] Closed");
|
|
}
|
|
}
|