Prevent sync restart if import queue full (#9381)

This commit is contained in:
Andrew Jones 2018-08-24 09:42:24 +01:00 committed by GitHub
parent 31291ebd35
commit 0b34579b04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 26 deletions

View File

@ -16,7 +16,6 @@
use std::collections::{HashSet, BTreeMap, VecDeque}; use std::collections::{HashSet, BTreeMap, VecDeque};
use std::cmp; use std::cmp;
use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
@ -50,13 +49,16 @@ use client::{
}; };
use encoded; use encoded;
use engines::{EthEngine, EpochTransition, ForkChoice}; use engines::{EthEngine, EpochTransition, ForkChoice};
use error::{ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError}; use error::{
ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult,
QueueError, QueueErrorKind, Error as EthcoreError
};
use vm::{EnvInfo, LastHashes}; use vm::{EnvInfo, LastHashes};
use evm::Schedule; use evm::Schedule;
use executive::{Executive, Executed, TransactOptions, contract_address}; use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory}; use factory::{Factories, VmFactory};
use header::{BlockNumber, Header, ExtendedHeader}; use header::{BlockNumber, Header, ExtendedHeader};
use io::{IoChannel, IoError}; use io::IoChannel;
use log_entry::LocalizedLogEntry; use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService}; use miner::{Miner, MinerService};
use ethcore_miner::pool::VerifiedTransaction; use ethcore_miner::pool::VerifiedTransaction;
@ -2095,7 +2097,7 @@ impl IoClient for Client {
let queued = self.queued_ancient_blocks.clone(); let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone(); let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| { self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block"); trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order. // Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing. // We use separate lock, cause we don't want to block queueing.
@ -2119,10 +2121,9 @@ impl IoClient for Client {
break; break;
} }
} }
}) { })?;
Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))), Ok(hash)
}
} }
fn queue_consensus_message(&self, message: Bytes) { fn queue_consensus_message(&self, message: Bytes) {
@ -2538,21 +2539,6 @@ mod tests {
} }
} }
#[derive(Debug)]
enum QueueError {
Channel(IoError),
Full(usize),
}
impl fmt::Display for QueueError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),
}
}
}
/// Queue some items to be processed by IO client. /// Queue some items to be processed by IO client.
struct IoChannelQueue { struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>, currently_queued: Arc<AtomicUsize>,
@ -2571,7 +2557,7 @@ impl IoChannelQueue {
F: Fn(&Client) + Send + Sync + 'static, F: Fn(&Client) + Send + Sync + 'static,
{ {
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit)); ensure!(queue_size < self.limit, QueueErrorKind::Full(self.limit));
let currently_queued = self.currently_queued.clone(); let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| { let result = channel.send(ClientIoMessage::execute(move |client| {
@ -2584,7 +2570,7 @@ impl IoChannelQueue {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(()) Ok(())
}, },
Err(e) => Err(QueueError::Channel(e)), Err(e) => bail!(QueueErrorKind::Channel(e)),
} }
} }
} }

View File

@ -150,6 +150,24 @@ impl error::Error for BlockError {
} }
} }
error_chain! {
types {
QueueError, QueueErrorKind, QueueErrorResultExt, QueueErrorResult;
}
errors {
#[doc = "Queue is full"]
Full(limit: usize) {
description("Queue is full")
display("The queue is full ({})", limit)
}
}
foreign_links {
Channel(IoError) #[doc = "Io channel error"];
}
}
error_chain! { error_chain! {
types { types {
ImportError, ImportErrorKind, ImportErrorResultExt, ImportErrorResult; ImportError, ImportErrorKind, ImportErrorResultExt, ImportErrorResult;
@ -183,6 +201,7 @@ error_chain! {
links { links {
Import(ImportError, ImportErrorKind) #[doc = "Import error"]; Import(ImportError, ImportErrorKind) #[doc = "Import error"];
Queue(QueueError, QueueErrorKind) #[doc = "Io channel queue error"];
} }
foreign_links { foreign_links {

View File

@ -25,7 +25,7 @@ use ethereum_types::H256;
use rlp::{self, Rlp}; use rlp::{self, Rlp};
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind}; use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind};
use ethcore::error::{ImportErrorKind, BlockError}; use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError};
use sync_io::SyncIo; use sync_io::SyncIo;
use blocks::{BlockCollection, SyncBody, SyncHeader}; use blocks::{BlockCollection, SyncBody, SyncHeader};
@ -513,6 +513,10 @@ impl BlockDownloader {
debug!(target: "sync", "Block temporarily invalid, restarting sync"); debug!(target: "sync", "Block temporarily invalid, restarting sync");
break; break;
}, },
Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => {
debug!(target: "sync", "Block import queue full ({}), restarting sync", limit);
break;
},
Err(e) => { Err(e) => {
debug!(target: "sync", "Bad block {:?} : {:?}", h, e); debug!(target: "sync", "Bad block {:?} : {:?}", h, e);
bad = true; bad = true;