Fix not downloading old blocks (#8642)
This commit is contained in:
parent
cdbcfaa7de
commit
d1934363e7
@ -17,7 +17,7 @@
|
|||||||
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
|
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
|
|
||||||
@ -210,6 +210,8 @@ pub struct Client {
|
|||||||
queue_transactions: IoChannelQueue,
|
queue_transactions: IoChannelQueue,
|
||||||
/// Ancient blocks import queue
|
/// Ancient blocks import queue
|
||||||
queue_ancient_blocks: IoChannelQueue,
|
queue_ancient_blocks: IoChannelQueue,
|
||||||
|
/// Hashes of pending ancient block wainting to be included
|
||||||
|
pending_ancient_blocks: RwLock<HashSet<H256>>,
|
||||||
/// Consensus messages import queue
|
/// Consensus messages import queue
|
||||||
queue_consensus_message: IoChannelQueue,
|
queue_consensus_message: IoChannelQueue,
|
||||||
|
|
||||||
@ -433,6 +435,7 @@ impl Importer {
|
|||||||
let hash = header.hash();
|
let hash = header.hash();
|
||||||
let _import_lock = self.import_lock.lock();
|
let _import_lock = self.import_lock.lock();
|
||||||
|
|
||||||
|
trace!(target: "client", "Trying to import old block #{}", header.number());
|
||||||
{
|
{
|
||||||
trace_time!("import_old_block");
|
trace_time!("import_old_block");
|
||||||
// verify the block, passing the chain for updating the epoch verifier.
|
// verify the block, passing the chain for updating the epoch verifier.
|
||||||
@ -761,6 +764,7 @@ impl Client {
|
|||||||
notify: RwLock::new(Vec::new()),
|
notify: RwLock::new(Vec::new()),
|
||||||
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
|
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
|
||||||
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
|
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
|
||||||
|
pending_ancient_blocks: RwLock::new(HashSet::new()),
|
||||||
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
|
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
|
||||||
last_hashes: RwLock::new(VecDeque::new()),
|
last_hashes: RwLock::new(VecDeque::new()),
|
||||||
factories: factories,
|
factories: factories,
|
||||||
@ -2008,7 +2012,7 @@ impl BlockChainClient for Client {
|
|||||||
impl IoClient for Client {
|
impl IoClient for Client {
|
||||||
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
|
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
|
||||||
let len = transactions.len();
|
let len = transactions.len();
|
||||||
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
|
self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| {
|
||||||
trace_time!("import_queued_transactions");
|
trace_time!("import_queued_transactions");
|
||||||
|
|
||||||
let txs: Vec<UnverifiedTransaction> = transactions
|
let txs: Vec<UnverifiedTransaction> = transactions
|
||||||
@ -2032,23 +2036,32 @@ impl IoClient for Client {
|
|||||||
|
|
||||||
{
|
{
|
||||||
// check block order
|
// check block order
|
||||||
if self.chain.read().is_known(&header.hash()) {
|
if self.chain.read().is_known(&hash) {
|
||||||
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
|
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
|
||||||
}
|
}
|
||||||
let status = self.block_status(BlockId::Hash(*header.parent_hash()));
|
|
||||||
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
|
let parent_hash = *header.parent_hash();
|
||||||
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash())));
|
let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash);
|
||||||
|
let status = self.block_status(BlockId::Hash(parent_hash));
|
||||||
|
if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) {
|
||||||
|
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
|
self.pending_ancient_blocks.write().insert(hash);
|
||||||
client.importer.import_old_block(
|
|
||||||
|
trace!(target: "client", "Queuing old block #{}", header.number());
|
||||||
|
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| {
|
||||||
|
let result = client.importer.import_old_block(
|
||||||
&header,
|
&header,
|
||||||
&block_bytes,
|
&block_bytes,
|
||||||
&receipts_bytes,
|
&receipts_bytes,
|
||||||
&**client.db.read(),
|
&**client.db.read(),
|
||||||
&*client.chain.read()
|
&*client.chain.read()
|
||||||
).map(|_| ()).unwrap_or_else(|e| {
|
);
|
||||||
|
|
||||||
|
client.pending_ancient_blocks.write().remove(&hash);
|
||||||
|
result.map(|_| ()).unwrap_or_else(|e| {
|
||||||
error!(target: "client", "Error importing ancient block: {}", e);
|
error!(target: "client", "Error importing ancient block: {}", e);
|
||||||
});
|
});
|
||||||
}) {
|
}) {
|
||||||
@ -2058,7 +2071,7 @@ impl IoClient for Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn queue_consensus_message(&self, message: Bytes) {
|
fn queue_consensus_message(&self, message: Bytes) {
|
||||||
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
|
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| {
|
||||||
if let Err(e) = client.engine().handle_message(&message) {
|
if let Err(e) = client.engine().handle_message(&message) {
|
||||||
debug!(target: "poa", "Invalid message received: {}", e);
|
debug!(target: "poa", "Invalid message received: {}", e);
|
||||||
}
|
}
|
||||||
@ -2471,35 +2484,38 @@ impl fmt::Display for QueueError {
|
|||||||
|
|
||||||
/// Queue some items to be processed by IO client.
|
/// Queue some items to be processed by IO client.
|
||||||
struct IoChannelQueue {
|
struct IoChannelQueue {
|
||||||
currently_queued: Arc<AtomicUsize>,
|
queue: Arc<Mutex<VecDeque<Box<Fn(&Client) + Send>>>>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IoChannelQueue {
|
impl IoChannelQueue {
|
||||||
pub fn new(limit: usize) -> Self {
|
pub fn new(limit: usize) -> Self {
|
||||||
IoChannelQueue {
|
IoChannelQueue {
|
||||||
currently_queued: Default::default(),
|
queue: Default::default(),
|
||||||
limit,
|
limit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
|
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, fun: F) -> Result<(), QueueError>
|
||||||
F: Fn(&Client) + Send + Sync + 'static,
|
where F: Fn(&Client) + Send + Sync + 'static
|
||||||
{
|
{
|
||||||
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
|
{
|
||||||
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
|
let mut queue = self.queue.lock();
|
||||||
|
let queue_size = queue.len();
|
||||||
|
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
|
||||||
|
|
||||||
let currently_queued = self.currently_queued.clone();
|
queue.push_back(Box::new(fun));
|
||||||
|
}
|
||||||
|
|
||||||
|
let queue = self.queue.clone();
|
||||||
let result = channel.send(ClientIoMessage::execute(move |client| {
|
let result = channel.send(ClientIoMessage::execute(move |client| {
|
||||||
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
|
while let Some(fun) = queue.lock().pop_front() {
|
||||||
fun(client);
|
fun(client);
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => {
|
Ok(_) => Ok(()),
|
||||||
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
|
|
||||||
Ok(())
|
|
||||||
},
|
|
||||||
Err(e) => Err(QueueError::Channel(e)),
|
Err(e) => Err(QueueError::Channel(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -266,7 +266,7 @@ impl BlockCollection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain.
|
/// Get a valid chain of blocks ordered in ascending order and ready for importing into blockchain.
|
||||||
pub fn drain(&mut self) -> Vec<BlockAndReceipts> {
|
pub fn drain(&mut self) -> Vec<BlockAndReceipts> {
|
||||||
if self.blocks.is_empty() || self.head.is_none() {
|
if self.blocks.is_empty() || self.head.is_none() {
|
||||||
return Vec::new();
|
return Vec::new();
|
||||||
|
Loading…
Reference in New Issue
Block a user