handle SyncHandler errors properly (#9151)

* handle SyncHandler errors properly, closes #9150

* applied review suggestions
This commit is contained in:
Marek Kotewicz 2018-07-19 12:46:33 +02:00 committed by GitHub
parent 3c27587d83
commit 073365d5d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 158 deletions

View File

@ -22,7 +22,7 @@ use std::collections::{HashSet, VecDeque};
use std::cmp;
use heapsize::HeapSizeOf;
use ethereum_types::H256;
use rlp::Rlp;
use rlp::{self, Rlp};
use ethcore::views::BlockView;
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind};
@ -76,12 +76,18 @@ pub enum DownloadAction {
#[derive(Eq, PartialEq, Debug)]
pub enum BlockDownloaderImportError {
/// Imported data is rejected as invalid.
/// Imported data is rejected as invalid. Peer should be dropped.
Invalid,
/// Imported data is valid but rejected cause the downloader does not need it.
Useless,
}
impl From<rlp::DecoderError> for BlockDownloaderImportError {
fn from(_: rlp::DecoderError) -> BlockDownloaderImportError {
BlockDownloaderImportError::Invalid
}
}
/// Block downloader strategy.
/// Manages state and block data for a block download process.
pub struct BlockDownloader {
@ -316,7 +322,7 @@ impl BlockDownloader {
}
/// Called by peer once it has new block bodies
pub fn import_bodies(&mut self, _io: &mut SyncIo, r: &Rlp) -> Result<(), BlockDownloaderImportError> {
pub fn import_bodies(&mut self, r: &Rlp) -> Result<(), BlockDownloaderImportError> {
let item_count = r.item_count().unwrap_or(0);
if item_count == 0 {
return Err(BlockDownloaderImportError::Useless);

View File

@ -14,8 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, HashMap};
use std::collections::hash_map::Entry;
use std::collections::{HashSet, HashMap, hash_map};
use smallvec::SmallVec;
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP};
use heapsize::HeapSizeOf;
@ -380,7 +379,7 @@ impl BlockCollection {
};
self.downloading_receipts.remove(&receipt_root);
match self.receipt_ids.entry(receipt_root) {
Entry::Occupied(entry) => {
hash_map::Entry::Occupied(entry) => {
for h in entry.remove() {
match self.blocks.get_mut(&h) {
Some(ref mut block) => {
@ -394,8 +393,8 @@ impl BlockCollection {
}
}
Ok(())
}
_ => {
},
hash_map::Entry::Vacant(_) => {
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);
Err(network::ErrorKind::BadProtocol.into())
}

View File

@ -87,9 +87,23 @@ impl SyncHandler {
Ok(())
}
};
result.unwrap_or_else(|e| {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
})
match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id);
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer);
},
Ok(()) => {
// give a task to the same peer first
sync.sync_peer(io, peer, false);
},
}
// give tasks to other peers
sync.continue_sync(io);
}
/// Called when peer sends us new consensus packet
@ -137,7 +151,7 @@ impl SyncHandler {
}
/// Called by peer once it has new block bodies
pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
return Ok(());
@ -165,8 +179,7 @@ impl SyncHandler {
let last_imported_number = sync.new_blocks.last_imported_block_number();
if last_imported_number > header.number() && last_imported_number - header.number() > MAX_NEW_BLOCK_AGE {
trace!(target: "sync", "Ignored ancient new block {:?}", h);
io.disable_peer(peer_id);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => {
@ -187,7 +200,7 @@ impl SyncHandler {
},
Err(e) => {
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
io.disable_peer(peer_id);
return Err(DownloaderImportError::Invalid);
}
};
if unknown {
@ -199,12 +212,11 @@ impl SyncHandler {
sync.sync_peer(io, peer_id, true);
}
}
sync.continue_sync(io);
Ok(())
}
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
return Ok(());
@ -223,7 +235,6 @@ impl SyncHandler {
if max > sync.highest_block.unwrap_or(0) {
sync.highest_block = Some(max);
}
sync.continue_sync(io);
return Ok(());
}
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()?);
@ -241,8 +252,7 @@ impl SyncHandler {
}
if last_imported_number > number && last_imported_number - number > MAX_NEW_BLOCK_AGE {
trace!(target: "sync", "Ignored ancient new block hash {:?}", hash);
io.disable_peer(peer_id);
continue;
return Err(DownloaderImportError::Invalid);
}
match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain => {
@ -263,8 +273,7 @@ impl SyncHandler {
},
BlockStatus::Bad => {
debug!(target: "sync", "Bad new block hash {:?}", hash);
io.disable_peer(peer_id);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
}
};
@ -274,65 +283,46 @@ impl SyncHandler {
sync.state = SyncState::NewBlocks;
sync.sync_peer(io, peer_id, true);
}
sync.continue_sync(io);
Ok(())
}
/// Called by peer once it has new block bodies
fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
sync.clear_peer_download(peer_id);
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
let block_set = sync.peers.get(&peer_id)
.and_then(|p| p.block_set)
.unwrap_or(BlockSet::NewBlocks);
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) {
trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id);
sync.continue_sync(io);
return Ok(());
}
let item_count = r.item_count()?;
trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set);
if item_count == 0 {
sync.deactivate_peer(io, peer_id);
}
else if sync.state == SyncState::Waiting {
Err(DownloaderImportError::Useless)
} else if sync.state == SyncState::Waiting {
trace!(target: "sync", "Ignored block bodies while waiting");
}
else
Ok(())
} else {
{
let result = {
let downloader = match block_set {
BlockSet::NewBlocks => &mut sync.new_blocks,
BlockSet::OldBlocks => match sync.old_blocks {
None => {
trace!(target: "sync", "Ignored block headers while block download is inactive");
sync.continue_sync(io);
return Ok(());
},
Some(ref mut blocks) => blocks,
}
};
downloader.import_bodies(io, r)
};
match result {
Err(DownloaderImportError::Invalid) => {
io.disable_peer(peer_id);
sync.deactivate_peer(io, peer_id);
sync.continue_sync(io);
return Ok(());
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer_id);
},
Ok(()) => (),
downloader.import_bodies(r)?;
}
sync.collect_blocks(io, block_set);
sync.sync_peer(io, peer_id, false);
}
sync.continue_sync(io);
Ok(())
}
}
fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
{
let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers");
peer.asking = PeerAsking::Nothing;
@ -347,8 +337,7 @@ impl SyncHandler {
let header = r.at(0)?.as_raw();
if keccak(&header) != fork_hash {
trace!(target: "sync", "{}: Fork mismatch", peer_id);
io.disable_peer(peer_id);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
trace!(target: "sync", "{}: Confirmed peer", peer_id);
@ -361,12 +350,11 @@ impl SyncHandler {
}
}
sync.sync_peer(io, peer_id, false);
return Ok(());
}
/// Called by peer once it has new block headers during sync
fn on_peer_block_headers(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_peer_block_headers(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
let is_fork_header_request = match sync.peers.get(&peer_id) {
Some(peer) if peer.asking == PeerAsking::ForkHeader => true,
_ => false,
@ -382,19 +370,16 @@ impl SyncHandler {
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed {
trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash);
sync.continue_sync(io);
return Ok(());
}
let item_count = r.item_count()?;
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, sync.state, block_set);
if (sync.state == SyncState::Idle || sync.state == SyncState::WaitingPeers) && sync.old_blocks.is_none() {
trace!(target: "sync", "Ignored unexpected block headers");
sync.continue_sync(io);
return Ok(());
}
if sync.state == SyncState::Waiting {
trace!(target: "sync", "Ignored block headers while waiting");
sync.continue_sync(io);
return Ok(());
}
@ -405,101 +390,63 @@ impl SyncHandler {
match sync.old_blocks {
None => {
trace!(target: "sync", "Ignored block headers while block download is inactive");
sync.continue_sync(io);
return Ok(());
},
Some(ref mut blocks) => blocks,
}
}
};
downloader.import_headers(io, r, expected_hash)
downloader.import_headers(io, r, expected_hash)?
};
match result {
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer_id);
},
Err(DownloaderImportError::Invalid) => {
io.disable_peer(peer_id);
sync.deactivate_peer(io, peer_id);
sync.continue_sync(io);
return Ok(());
},
Ok(DownloadAction::Reset) => {
if let DownloadAction::Reset = result {
// mark all outstanding requests as expired
trace!("Resetting downloads for {:?}", block_set);
for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) {
p.reset_asking();
}
}
Ok(DownloadAction::None) => {},
}
sync.collect_blocks(io, block_set);
// give a task to the same peer first if received valuable headers.
sync.sync_peer(io, peer_id, false);
// give tasks to other peers
sync.continue_sync(io);
Ok(())
}
/// Called by peer once it has new block receipts
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
sync.clear_peer_download(peer_id);
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) {
trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id);
sync.continue_sync(io);
return Ok(());
}
let item_count = r.item_count()?;
trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count);
if item_count == 0 {
sync.deactivate_peer(io, peer_id);
}
else if sync.state == SyncState::Waiting {
Err(DownloaderImportError::Useless)
} else if sync.state == SyncState::Waiting {
trace!(target: "sync", "Ignored block receipts while waiting");
}
else
Ok(())
} else {
{
let result = {
let downloader = match block_set {
BlockSet::NewBlocks => &mut sync.new_blocks,
BlockSet::OldBlocks => match sync.old_blocks {
None => {
trace!(target: "sync", "Ignored block headers while block download is inactive");
sync.continue_sync(io);
return Ok(());
},
Some(ref mut blocks) => blocks,
}
};
downloader.import_receipts(io, r)
};
match result {
Err(DownloaderImportError::Invalid) => {
io.disable_peer(peer_id);
sync.deactivate_peer(io, peer_id);
sync.continue_sync(io);
return Ok(());
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer_id);
},
Ok(()) => (),
downloader.import_receipts(io, r)?;
}
sync.collect_blocks(io, block_set);
sync.sync_peer(io, peer_id, false);
}
sync.continue_sync(io);
Ok(())
}
}
/// Called when snapshot manifest is downloaded from a peer.
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
return Ok(());
@ -507,43 +454,28 @@ impl SyncHandler {
sync.clear_peer_download(peer_id);
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || sync.state != SyncState::SnapshotManifest {
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
sync.continue_sync(io);
return Ok(());
}
let manifest_rlp = r.at(0)?;
let manifest = match ManifestData::from_rlp(manifest_rlp.as_raw()) {
Err(e) => {
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
io.disable_peer(peer_id);
sync.continue_sync(io);
return Ok(());
}
Ok(manifest) => manifest,
};
let manifest = ManifestData::from_rlp(manifest_rlp.as_raw())?;
let is_supported_version = io.snapshot_service().supported_versions()
.map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h);
if !is_supported_version {
trace!(target: "sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
io.disable_peer(peer_id);
sync.continue_sync(io);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw()));
io.snapshot_service().begin_restore(manifest);
sync.state = SyncState::SnapshotData;
// give a task to the same peer first.
sync.sync_peer(io, peer_id, false);
// give tasks to other peers
sync.continue_sync(io);
Ok(())
}
/// Called when snapshot data is downloaded from a peer.
fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
return Ok(());
@ -551,7 +483,6 @@ impl SyncHandler {
sync.clear_peer_download(peer_id);
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (sync.state != SyncState::SnapshotData && sync.state != SyncState::SnapshotWaiting) {
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
sync.continue_sync(io);
return Ok(());
}
@ -569,7 +500,6 @@ impl SyncHandler {
}
sync.snapshot.clear();
sync.continue_sync(io);
return Ok(());
},
RestorationStatus::Initializing { .. } => {
@ -594,7 +524,6 @@ impl SyncHandler {
Err(()) => {
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
io.disconnect_peer(peer_id);
sync.continue_sync(io);
return Ok(());
}
}
@ -603,15 +532,12 @@ impl SyncHandler {
// wait for snapshot restoration process to complete
sync.state = SyncState::SnapshotWaiting;
}
// give a task to the same peer first.
sync.sync_peer(io, peer_id, false);
// give tasks to other peers
sync.continue_sync(io);
Ok(())
}
/// Called by peer to report status
fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
sync.handshaking_peers.remove(&peer_id);
let protocol_version: u8 = r.val_at(0)?;
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
@ -647,23 +573,20 @@ impl SyncHandler {
}
let chain_info = io.chain().chain_info();
if peer.genesis != chain_info.genesis_hash {
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
if peer.network_id != sync.network_id {
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
if false
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0))
{
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Ok(());
return Err(DownloaderImportError::Invalid);
}
if sync.sync_start_time.is_none() {
@ -676,22 +599,15 @@ impl SyncHandler {
sync.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
match sync.fork_block {
Some((fork_block, _)) => {
if let Some((fork_block, _)) = sync.fork_block {
SyncRequester::request_fork_header(sync, io, peer_id, fork_block);
},
_ => {
// when there's no `fork_block` defined we initialize the peer with
// `confirmation: ForkConfirmation::Confirmed`.
sync.sync_peer(io, peer_id, false);
}
}
Ok(())
}
/// Called when peer sends us new transactions
fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
@ -715,7 +631,7 @@ impl SyncHandler {
}
/// Called when peer sends us signed private transaction packet
fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());
@ -729,7 +645,7 @@ impl SyncHandler {
}
/// Called when peer sends us new private transaction packet
fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());