removed QueueError type (#10852)

This commit is contained in:
Marek Kotewicz 2019-07-06 20:40:56 +02:00 committed by Andronik Ordian
parent 0a9095626d
commit c4c5d79a0f
3 changed files with 38 additions and 63 deletions

View File

@ -63,7 +63,7 @@ use engines::{MAX_UNCLE_AGE, Engine, EpochTransition, ForkChoice, EngineError, S
use engines::epoch::PendingTransition;
use error::{
ImportError, ExecutionError, CallError, BlockError,
QueueError, Error as EthcoreError, EthcoreResult,
Error as EthcoreError, EthcoreResult,
};
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
@ -2604,6 +2604,39 @@ fn transaction_receipt(
}
}
/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}
impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}
pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> EthcoreResult<()> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
if queue_size >= self.limit {
return Err(EthcoreError::FullQueue(self.limit))
};
let currently_queued = self.currently_queued.clone();
let _ok = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}))?;
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
}
}
#[cfg(test)]
mod tests {
use ethereum_types::{H256, Address};
@ -2761,41 +2794,3 @@ mod tests {
});
}
}
/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}
impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}
pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
if queue_size >= self.limit {
return Err(QueueError::Full(self.limit))
};
let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}));
match result {
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => return Err(QueueError::Channel(e)),
}
}
}

View File

@ -150,26 +150,6 @@ impl error::Error for BlockError {
}
}
/// Queue error
#[derive(Debug, Display, From)]
pub enum QueueError {
/// Queue is full
#[display(fmt = "Queue is full ({})", _0)]
Full(usize),
/// Io channel error
#[display(fmt = "Io channel error: {}", _0)]
Channel(::io::IoError)
}
impl error::Error for QueueError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
QueueError::Channel(e) => Some(e),
_ => None,
}
}
}
/// Block import Error
#[derive(Debug, Display)]
pub enum ImportError {
@ -196,8 +176,8 @@ pub enum Error {
#[display(fmt = "Import error: {}", _0)]
Import(ImportError),
/// Io channel queue error
#[display(fmt = "Queue error: {}", _0)]
Queue(QueueError),
#[display(fmt = "Queue is full: {}", _0)]
FullQueue(usize),
/// Io create error
#[display(fmt = "Io error: {}", _0)]
Io(::io::IoError),

View File

@ -25,7 +25,7 @@ use ethereum_types::H256;
use rlp::{self, Rlp};
use types::BlockNumber;
use ethcore::client::{BlockStatus, BlockId};
use ethcore::error::{ImportError, QueueError, BlockError, Error as EthcoreError};
use ethcore::error::{ImportError, BlockError, Error as EthcoreError};
use sync_io::SyncIo;
use blocks::{BlockCollection, SyncBody, SyncHeader};
use chain::BlockSet;
@ -582,7 +582,7 @@ impl BlockDownloader {
debug_sync!(self, "Block temporarily invalid: {:?}, restarting sync", h);
break;
},
Err(EthcoreError::Queue(QueueError::Full(limit))) => {
Err(EthcoreError::FullQueue(limit)) => {
debug_sync!(self, "Block import queue full ({}), restarting sync", limit);
download_action = DownloadAction::Reset;
break;