Fix issues during block sync (#11265)

This commit is contained in:
rakita
2020-09-05 19:45:31 +02:00
committed by Artem Vorotnikov
parent c58b52c21c
commit 506cee52e8
12 changed files with 259 additions and 52 deletions

View File

@@ -316,6 +316,7 @@ const MAINTAIN_SYNC_TIMER: TimerToken = 1;
const CONTINUE_SYNC_TIMER: TimerToken = 2;
const TX_TIMER: TimerToken = 3;
const PRIORITY_TIMER: TimerToken = 4;
const DELAYED_PROCESSING_TIMER: TimerToken = 5;
pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);
@@ -341,6 +342,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
.expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300))
.expect("Error registering transactions timer");
io.register_timer(DELAYED_PROCESSING_TIMER, Duration::from_millis(2100))
.expect("Error registering delayed processing timer");
io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL)
.expect("Error registering peers timer");
@@ -388,6 +391,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io),
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
DELAYED_PROCESSING_TIMER => self.sync.process_delayed_requests(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}

View File

@@ -223,6 +223,18 @@ impl BlockDownloader {
self.blocks.heap_size() + self.round_parents.heap_size_of_children()
}
fn reset_to_block(&mut self, start_hash: &H256, start_number: BlockNumber) {
self.reset();
self.last_imported_block = start_number;
self.last_imported_hash = start_hash.clone();
self.last_round_start = start_number;
self.last_round_start_hash = start_hash.clone();
self.imported_this_round = None;
self.round_parents = VecDeque::new();
self.target_hash = None;
self.retract_step = 1;
}
/// Returns best imported block number.
pub fn last_imported_block_number(&self) -> BlockNumber {
self.last_imported_block
@@ -491,6 +503,7 @@ impl BlockDownloader {
);
} else {
let best = io.chain().chain_info().best_block_number;
let best_hash = io.chain().chain_info().best_block_hash;
let oldest_reorg = io.chain().pruning_info().earliest_state;
if self.block_set == BlockSet::NewBlocks && best > start && start < oldest_reorg
{
@@ -500,29 +513,34 @@ impl BlockDownloader {
start,
start_hash
);
self.reset();
self.reset_to_block(&best_hash, best);
} else {
let n = start - cmp::min(self.retract_step, start);
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => {
self.last_imported_block = n;
self.last_imported_hash = h;
trace_sync!(
self,
"Searching common header in the blockchain {} ({})",
start,
self.last_imported_hash
);
}
None => {
debug_sync!(
self,
"Could not revert to previous block, last: {} ({})",
start,
self.last_imported_hash
);
self.reset();
if n == 0 {
debug_sync!(self, "Header not found, bottom line reached, resetting, last imported: {}", self.last_imported_hash);
self.reset_to_block(&best_hash, best);
} else {
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => {
self.last_imported_block = n;
self.last_imported_hash = h;
trace_sync!(
self,
"Searching common header in the blockchain {} ({})",
start,
self.last_imported_hash
);
}
None => {
debug_sync!(
self,
"Could not revert to previous block, last: {} ({})",
start,
self.last_imported_hash
);
self.reset_to_block(&best_hash, best);
}
}
}
}

View File

@@ -41,7 +41,7 @@ use super::sync_packet::{
};
use super::{
BlockSet, ChainSync, ForkConfirmation, PacketDecodeError, PeerAsking, PeerInfo, SyncRequester,
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2,
};
@@ -108,6 +108,8 @@ impl SyncHandler {
debug!(target: "sync", "Disconnected {}", peer_id);
sync.clear_peer_download(peer_id);
sync.peers.remove(&peer_id);
sync.delayed_requests
.retain(|(request_peer_id, _, _)| *request_peer_id != peer_id);
sync.active_peers.remove(&peer_id);
if sync.state == SyncState::SnapshotManifest {
@@ -153,12 +155,6 @@ impl SyncHandler {
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
return Ok(());
}
let difficulty: U256 = r.val_at(1)?;
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
peer.difficulty = Some(difficulty);
}
}
let block = Unverified::from_rlp(r.at(0)?.as_raw().to_vec())?;
let hash = block.header.hash();
let number = block.header.number();
@@ -166,10 +162,23 @@ impl SyncHandler {
if number > sync.highest_block.unwrap_or(0) {
sync.highest_block = Some(number);
}
let parent_hash = block.header.parent_hash();
let difficulty: U256 = r.val_at(1)?;
// Most probably the sent block is being imported by peer right now
// Use td and hash, that peer must have for now
let parent_td = difficulty.checked_sub(*block.header.difficulty());
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if peer
.difficulty
.map_or(true, |pd| parent_td.map_or(false, |td| td > pd))
{
peer.difficulty = parent_td;
}
}
let mut unknown = false;
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.latest_hash = hash;
peer.latest_hash = *parent_hash;
}
let last_imported_number = sync.new_blocks.last_imported_block_number();
@@ -755,7 +764,7 @@ impl SyncHandler {
io: &mut dyn SyncIo,
peer_id: PeerId,
r: &Rlp,
) -> Result<(), PacketDecodeError> {
) -> Result<(), PacketProcessError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty()
|| (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks)

View File

@@ -97,6 +97,7 @@ use super::{SyncConfig, WarpSync};
use api::{EthProtocolInfo as PeerInfoDigest, PriorityTask, PAR_PROTOCOL};
use block_sync::{BlockDownloader, DownloadAction};
use bytes::Bytes;
use derive_more::Display;
use ethcore::{
client::{BlockChainClient, BlockChainInfo, BlockId, BlockQueueInfo, BlockStatus},
snapshot::RestorationStatus,
@@ -105,7 +106,7 @@ use ethereum_types::{H256, U256};
use fastmap::{H256FastMap, H256FastSet};
use hash::keccak;
use heapsize::HeapSizeOf;
use network::{self, client_version::ClientVersion, PacketId, PeerId};
use network::{self, client_version::ClientVersion, PeerId};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use rand::Rng;
use rlp::{DecoderError, RlpStream};
@@ -124,7 +125,7 @@ use self::{
handler::SyncHandler,
sync_packet::{
PacketInfo,
SyncPacket::{NewBlockPacket, StatusPacket},
SyncPacket::{self, NewBlockPacket, StatusPacket},
},
};
@@ -133,7 +134,23 @@ use self::{propagator::SyncPropagator, requester::SyncRequester};
known_heap_size!(0, PeerInfo);
pub type PacketDecodeError = DecoderError;
/// Possible errors during packet's processing
#[derive(Debug, Display)]
pub enum PacketProcessError {
/// Error of RLP decoder
#[display(fmt = "Decoder Error: {}", _0)]
Decoder(DecoderError),
/// Underlying client is busy and cannot process the packet
/// The packet should be postponed for later response
#[display(fmt = "Underlying client is busy")]
ClientBusy,
}
impl From<DecoderError> for PacketProcessError {
fn from(err: DecoderError) -> Self {
PacketProcessError::Decoder(err).into()
}
}
/// 63 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_63: (u8, u8) = (63, 0x11);
@@ -363,7 +380,7 @@ pub mod random {
}
}
pub type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
pub type RlpResponseResult = Result<Option<(SyncPacket, RlpStream)>, PacketProcessError>;
pub type Peers = HashMap<PeerId, PeerInfo>;
/// Thread-safe wrapper for `ChainSync`.
@@ -420,6 +437,23 @@ impl ChainSyncApi {
SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data)
}
/// Process the queue with requests, that were delayed with response.
pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) {
let requests = self.sync.write().retrieve_delayed_requests();
if !requests.is_empty() {
debug!(target: "sync", "Processing {} delayed requests", requests.len());
for (peer_id, packet_id, packet_data) in requests {
SyncSupplier::dispatch_delayed_request(
&self.sync,
io,
peer_id,
packet_id,
&packet_data,
);
}
}
}
/// Process a priority propagation queue.
/// This task is run from a timer and should be time constrained.
/// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded.
@@ -622,6 +656,10 @@ pub struct ChainSync {
/// Connected peers pending Status message.
/// Value is request timestamp.
handshaking_peers: HashMap<PeerId, Instant>,
/// Requests, that can not be processed at the moment
delayed_requests: Vec<(PeerId, u8, Vec<u8>)>,
/// Ids of delayed requests, used for lookup, id is composed from peer id and packet id
delayed_requests_ids: HashSet<(PeerId, u8)>,
/// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<Instant>,
/// Transactions propagation statistics
@@ -646,6 +684,8 @@ impl ChainSync {
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
active_peers: HashSet::new(),
delayed_requests: Vec::new(),
delayed_requests_ids: HashSet::new(),
new_blocks: BlockDownloader::new(
BlockSet::NewBlocks,
&chain_info.best_block_hash,
@@ -753,6 +793,22 @@ impl ChainSync {
self.active_peers = self.peers.keys().cloned().collect();
}
/// Add a request for later processing
pub fn add_delayed_request(&mut self, peer: PeerId, packet_id: u8, data: &[u8]) {
// Ignore the request, if there is a request already in queue with the same id
if !self.delayed_requests_ids.contains(&(peer, packet_id)) {
self.delayed_requests_ids.insert((peer, packet_id));
self.delayed_requests.push((peer, packet_id, data.to_vec()));
debug!(target: "sync", "Delayed request with packet id {} from peer {} added", packet_id, peer);
}
}
/// Drain and return all delayed requests
pub fn retrieve_delayed_requests(&mut self) -> Vec<(PeerId, u8, Vec<u8>)> {
self.delayed_requests_ids.clear();
self.delayed_requests.drain(..).collect()
}
/// Restart sync
pub fn reset_and_continue(&mut self, io: &mut dyn SyncIo) {
trace!(target: "sync", "Restarting");

View File

@@ -36,7 +36,7 @@ use super::sync_packet::{
};
use super::{
ChainSync, PacketDecodeError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND,
ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND,
MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND,
};
@@ -129,14 +129,64 @@ impl SyncSupplier {
}
};
result.unwrap_or_else(|e| {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
})
match result {
Err(PacketProcessError::Decoder(e)) => {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e)
}
Err(PacketProcessError::ClientBusy) => {
sync.write().add_delayed_request(peer, packet_id, data)
}
Ok(()) => {}
}
}
}
/// Dispatch delayed request
/// The main difference with dispatch packet is the direct send of the responses to the peer
pub fn dispatch_delayed_request(
sync: &RwLock<ChainSync>,
io: &mut dyn SyncIo,
peer: PeerId,
packet_id: u8,
data: &[u8],
) {
let rlp = Rlp::new(data);
if let Some(id) = SyncPacket::from_u8(packet_id) {
let result = match id {
GetBlockHeadersPacket => SyncSupplier::send_rlp(
io,
&rlp,
peer,
SyncSupplier::return_block_headers,
|e| format!("Error sending block headers: {:?}", e),
),
_ => {
debug!(target:"sync", "Unexpected packet {} was dispatched for delayed processing", packet_id);
Ok(())
}
};
match result {
Err(PacketProcessError::Decoder(e)) => {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e)
}
Err(PacketProcessError::ClientBusy) => {
sync.write().add_delayed_request(peer, packet_id, data)
}
Ok(()) => {}
}
}
}
/// Respond to GetBlockHeaders request
fn return_block_headers(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
// Cannot return blocks, if forks processing is in progress,
// The request should be postponed for later processing
if io.chain().is_processing_fork() {
return Err(PacketProcessError::ClientBusy);
}
let payload_soft_limit = io.payload_soft_limit();
// Packet layout:
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
@@ -161,11 +211,11 @@ impl SyncSupplier {
trace!(target:"sync", "Returning single header: {:?}", hash);
let mut rlp = RlpStream::new_list(1);
rlp.append_raw(&hdr.into_inner(), 1);
return Ok(Some((BlockHeadersPacket.id(), rlp)));
return Ok(Some((BlockHeadersPacket, rlp)));
}
number
}
None => return Ok(Some((BlockHeadersPacket.id(), RlpStream::new_list(0)))), //no such header, return nothing
None => return Ok(Some((BlockHeadersPacket, RlpStream::new_list(0)))), //no such header, return nothing
}
} else {
let number = r.val_at::<BlockNumber>(0)?;
@@ -215,7 +265,7 @@ impl SyncSupplier {
let mut rlp = RlpStream::new_list(count as usize);
rlp.append_raw(&data, count as usize);
trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count);
Ok(Some((BlockHeadersPacket.id(), rlp)))
Ok(Some((BlockHeadersPacket, rlp)))
}
/// Respond to GetBlockBodies request
@@ -242,7 +292,7 @@ impl SyncSupplier {
let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added);
trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added);
Ok(Some((BlockBodiesPacket.id(), rlp)))
Ok(Some((BlockBodiesPacket, rlp)))
}
fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
@@ -270,7 +320,7 @@ impl SyncSupplier {
}
let mut rlp_result = RlpStream::new_list(added_headers);
rlp_result.append_raw(&data, added_headers);
Ok(Some((ReceiptsPacket.id(), rlp_result)))
Ok(Some((ReceiptsPacket, rlp_result)))
}
/// Respond to GetSnapshotManifest request
@@ -293,7 +343,7 @@ impl SyncSupplier {
RlpStream::new_list(0)
}
};
Ok(Some((SnapshotManifestPacket.id(), rlp)))
Ok(Some((SnapshotManifestPacket, rlp)))
}
/// Respond to GetSnapshotData request
@@ -312,7 +362,7 @@ impl SyncSupplier {
RlpStream::new_list(0)
}
};
Ok(Some((SnapshotDataPacket.id(), rlp)))
Ok(Some((SnapshotDataPacket, rlp)))
}
fn return_rlp<FRlp, FError>(
@@ -321,7 +371,26 @@ impl SyncSupplier {
peer: PeerId,
rlp_func: FRlp,
error_func: FError,
) -> Result<(), PacketDecodeError>
) -> Result<(), PacketProcessError>
where
FRlp: Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult,
FError: FnOnce(network::Error) -> String,
{
let response = rlp_func(io, rlp, peer);
if let Some((packet_id, rlp_stream)) = response? {
io.respond(packet_id.id(), rlp_stream.out())
.unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e)));
}
Ok(())
}
fn send_rlp<FRlp, FError>(
io: &mut dyn SyncIo,
rlp: &Rlp,
peer: PeerId,
rlp_func: FRlp,
error_func: FError,
) -> Result<(), PacketProcessError>
where
FRlp: Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult,
FError: FnOnce(network::Error) -> String,
@@ -330,7 +399,7 @@ impl SyncSupplier {
match response {
Err(e) => Err(e),
Ok(Some((packet_id, rlp_stream))) => {
io.respond(packet_id, rlp_stream.out())
io.send(peer, packet_id, rlp_stream.out())
.unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e)));
Ok(())
}

View File

@@ -22,6 +22,7 @@
//!
extern crate common_types as types;
extern crate derive_more;
extern crate ethcore;
extern crate ethcore_io as io;
extern crate ethcore_network as network;