diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 2e508f4d6..cc04fb04d 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -22,7 +22,7 @@ use std::collections::{HashSet, VecDeque}; use std::cmp; use heapsize::HeapSizeOf; use ethereum_types::H256; -use rlp::Rlp; +use rlp::{self, Rlp}; use ethcore::views::BlockView; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind}; @@ -76,12 +76,18 @@ pub enum DownloadAction { #[derive(Eq, PartialEq, Debug)] pub enum BlockDownloaderImportError { - /// Imported data is rejected as invalid. + /// Imported data is rejected as invalid. Peer should be dropped. Invalid, /// Imported data is valid but rejected cause the downloader does not need it. Useless, } +impl From for BlockDownloaderImportError { + fn from(_: rlp::DecoderError) -> BlockDownloaderImportError { + BlockDownloaderImportError::Invalid + } +} + /// Block downloader strategy. /// Manages state and block data for a block download process. pub struct BlockDownloader { @@ -316,7 +322,7 @@ impl BlockDownloader { } /// Called by peer once it has new block bodies - pub fn import_bodies(&mut self, _io: &mut SyncIo, r: &Rlp) -> Result<(), BlockDownloaderImportError> { + pub fn import_bodies(&mut self, r: &Rlp) -> Result<(), BlockDownloaderImportError> { let item_count = r.item_count().unwrap_or(0); if item_count == 0 { return Err(BlockDownloaderImportError::Useless); diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index a275e1dcc..df7d7a3bf 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -14,8 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::collections::{HashSet, HashMap}; -use std::collections::hash_map::Entry; +use std::collections::{HashSet, HashMap, hash_map}; use smallvec::SmallVec; use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP}; use heapsize::HeapSizeOf; @@ -380,7 +379,7 @@ impl BlockCollection { }; self.downloading_receipts.remove(&receipt_root); match self.receipt_ids.entry(receipt_root) { - Entry::Occupied(entry) => { + hash_map::Entry::Occupied(entry) => { for h in entry.remove() { match self.blocks.get_mut(&h) { Some(ref mut block) => { @@ -394,8 +393,8 @@ impl BlockCollection { } } Ok(()) - } - _ => { + }, + hash_map::Entry::Vacant(_) => { trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root); Err(network::ErrorKind::BadProtocol.into()) } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 75111a4d4..136ff3395 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -87,9 +87,23 @@ impl SyncHandler { Ok(()) } }; - result.unwrap_or_else(|e| { - debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); - }) + + match result { + Err(DownloaderImportError::Invalid) => { + debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id); + io.disable_peer(peer); + sync.deactivate_peer(io, peer); + }, + Err(DownloaderImportError::Useless) => { + sync.deactivate_peer(io, peer); + }, + Ok(()) => { + // give a task to the same peer first + sync.sync_peer(io, peer, false); + }, + } + // give tasks to other peers + sync.continue_sync(io); } /// Called when peer sends us new consensus packet @@ -137,7 +151,7 @@ impl SyncHandler { } /// Called by peer once it has new block bodies - pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id); return Ok(()); @@ -165,8 +179,7 @@ impl SyncHandler { let last_imported_number = sync.new_blocks.last_imported_block_number(); if last_imported_number > header.number() && last_imported_number - header.number() > MAX_NEW_BLOCK_AGE { trace!(target: "sync", "Ignored ancient new block {:?}", h); - io.disable_peer(peer_id); - return Ok(()); + return Err(DownloaderImportError::Invalid); } match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => { @@ -187,7 +200,7 @@ impl SyncHandler { }, Err(e) => { debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); - io.disable_peer(peer_id); + return Err(DownloaderImportError::Invalid); } }; if unknown { @@ -199,12 +212,11 @@ impl SyncHandler { sync.sync_peer(io, peer_id, true); } } - sync.continue_sync(io); Ok(()) } /// Handles `NewHashes` packet. Initiates headers download for any unknown hashes. - pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id); return Ok(()); @@ -223,7 +235,6 @@ impl SyncHandler { if max > sync.highest_block.unwrap_or(0) { sync.highest_block = Some(max); } - sync.continue_sync(io); return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()?); @@ -241,8 +252,7 @@ impl SyncHandler { } if last_imported_number > number && last_imported_number - number > MAX_NEW_BLOCK_AGE { trace!(target: "sync", "Ignored ancient new block hash {:?}", hash); - io.disable_peer(peer_id); - continue; + return Err(DownloaderImportError::Invalid); } match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain => { @@ -263,8 +273,7 @@ impl SyncHandler { }, BlockStatus::Bad => { debug!(target: "sync", "Bad new block hash {:?}", hash); - io.disable_peer(peer_id); - return Ok(()); + return Err(DownloaderImportError::Invalid); } } }; @@ -274,65 +283,46 @@ impl SyncHandler { sync.state = SyncState::NewBlocks; sync.sync_peer(io, peer_id, true); } - sync.continue_sync(io); Ok(()) } /// Called by peer once it has new block bodies - fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { sync.clear_peer_download(peer_id); - let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); + let block_set = sync.peers.get(&peer_id) + .and_then(|p| p.block_set) + .unwrap_or(BlockSet::NewBlocks); if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) { trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id); - sync.continue_sync(io); return Ok(()); } let item_count = r.item_count()?; trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set); if item_count == 0 { - sync.deactivate_peer(io, peer_id); - } - else if sync.state == SyncState::Waiting { + Err(DownloaderImportError::Useless) + } else if sync.state == SyncState::Waiting { trace!(target: "sync", "Ignored block bodies while waiting"); - } - else - { - let result = { + Ok(()) + } else { + { let downloader = match block_set { BlockSet::NewBlocks => &mut sync.new_blocks, BlockSet::OldBlocks => match sync.old_blocks { None => { trace!(target: "sync", "Ignored block headers while block download is inactive"); - sync.continue_sync(io); return Ok(()); }, Some(ref mut blocks) => blocks, } }; - downloader.import_bodies(io, r) - }; - - match result { - Err(DownloaderImportError::Invalid) => { - io.disable_peer(peer_id); - sync.deactivate_peer(io, peer_id); - sync.continue_sync(io); - return Ok(()); - }, - Err(DownloaderImportError::Useless) => { - sync.deactivate_peer(io, peer_id); - }, - Ok(()) => (), + downloader.import_bodies(r)?; } - sync.collect_blocks(io, block_set); - sync.sync_peer(io, peer_id, false); + Ok(()) } - sync.continue_sync(io); - Ok(()) } - fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { { let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers"); peer.asking = PeerAsking::Nothing; @@ -347,8 +337,7 @@ impl SyncHandler { let header = r.at(0)?.as_raw(); if keccak(&header) != fork_hash { trace!(target: "sync", "{}: Fork mismatch", peer_id); - io.disable_peer(peer_id); - return Ok(()); + return Err(DownloaderImportError::Invalid); } trace!(target: "sync", "{}: Confirmed peer", peer_id); @@ -361,12 +350,11 @@ impl SyncHandler { } } - sync.sync_peer(io, peer_id, false); return Ok(()); } /// Called by peer once it has new block headers during sync - fn on_peer_block_headers(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_peer_block_headers(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { let is_fork_header_request = match sync.peers.get(&peer_id) { Some(peer) if peer.asking == PeerAsking::ForkHeader => true, _ => false, @@ -382,124 +370,83 @@ impl SyncHandler { let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed { trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash); - sync.continue_sync(io); return Ok(()); } let item_count = r.item_count()?; trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, sync.state, block_set); if (sync.state == SyncState::Idle || sync.state == SyncState::WaitingPeers) && sync.old_blocks.is_none() { trace!(target: "sync", "Ignored unexpected block headers"); - sync.continue_sync(io); return Ok(()); } if sync.state == SyncState::Waiting { trace!(target: "sync", "Ignored block headers while waiting"); - sync.continue_sync(io); return Ok(()); } - let result = { + let result = { let downloader = match block_set { BlockSet::NewBlocks => &mut sync.new_blocks, BlockSet::OldBlocks => { match sync.old_blocks { None => { trace!(target: "sync", "Ignored block headers while block download is inactive"); - sync.continue_sync(io); return Ok(()); }, Some(ref mut blocks) => blocks, } } }; - downloader.import_headers(io, r, expected_hash) + downloader.import_headers(io, r, expected_hash)? }; - match result { - Err(DownloaderImportError::Useless) => { - sync.deactivate_peer(io, peer_id); - }, - Err(DownloaderImportError::Invalid) => { - io.disable_peer(peer_id); - sync.deactivate_peer(io, peer_id); - sync.continue_sync(io); - return Ok(()); - }, - Ok(DownloadAction::Reset) => { - // mark all outstanding requests as expired - trace!("Resetting downloads for {:?}", block_set); - for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { - p.reset_asking(); - } - + if let DownloadAction::Reset = result { + // mark all outstanding requests as expired + trace!("Resetting downloads for {:?}", block_set); + for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { + p.reset_asking(); } - Ok(DownloadAction::None) => {}, } sync.collect_blocks(io, block_set); - // give a task to the same peer first if received valuable headers. - sync.sync_peer(io, peer_id, false); - // give tasks to other peers - sync.continue_sync(io); Ok(()) } /// Called by peer once it has new block receipts - fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { sync.clear_peer_download(peer_id); let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) { trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id); - sync.continue_sync(io); return Ok(()); } let item_count = r.item_count()?; trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count); if item_count == 0 { - sync.deactivate_peer(io, peer_id); - } - else if sync.state == SyncState::Waiting { + Err(DownloaderImportError::Useless) + } else if sync.state == SyncState::Waiting { trace!(target: "sync", "Ignored block receipts while waiting"); - } - else - { - let result = { + Ok(()) + } else { + { let downloader = match block_set { BlockSet::NewBlocks => &mut sync.new_blocks, BlockSet::OldBlocks => match sync.old_blocks { None => { trace!(target: "sync", "Ignored block headers while block download is inactive"); - sync.continue_sync(io); return Ok(()); }, Some(ref mut blocks) => blocks, } }; - downloader.import_receipts(io, r) - }; - - match result { - Err(DownloaderImportError::Invalid) => { - io.disable_peer(peer_id); - sync.deactivate_peer(io, peer_id); - sync.continue_sync(io); - return Ok(()); - }, - Err(DownloaderImportError::Useless) => { - sync.deactivate_peer(io, peer_id); - }, - Ok(()) => (), + downloader.import_receipts(io, r)?; } - sync.collect_blocks(io, block_set); - sync.sync_peer(io, peer_id, false); + Ok(()) } - sync.continue_sync(io); - Ok(()) } /// Called when snapshot manifest is downloaded from a peer. - fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id); return Ok(()); @@ -507,43 +454,28 @@ impl SyncHandler { sync.clear_peer_download(peer_id); if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || sync.state != SyncState::SnapshotManifest { trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id); - sync.continue_sync(io); return Ok(()); } let manifest_rlp = r.at(0)?; - let manifest = match ManifestData::from_rlp(manifest_rlp.as_raw()) { - Err(e) => { - trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e); - io.disable_peer(peer_id); - sync.continue_sync(io); - return Ok(()); - } - Ok(manifest) => manifest, - }; + let manifest = ManifestData::from_rlp(manifest_rlp.as_raw())?; let is_supported_version = io.snapshot_service().supported_versions() .map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h); if !is_supported_version { trace!(target: "sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version); - io.disable_peer(peer_id); - sync.continue_sync(io); - return Ok(()); + return Err(DownloaderImportError::Invalid); } sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw())); io.snapshot_service().begin_restore(manifest); sync.state = SyncState::SnapshotData; - // give a task to the same peer first. - sync.sync_peer(io, peer_id, false); - // give tasks to other peers - sync.continue_sync(io); Ok(()) } /// Called when snapshot data is downloaded from a peer. - fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id); return Ok(()); @@ -551,7 +483,6 @@ impl SyncHandler { sync.clear_peer_download(peer_id); if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (sync.state != SyncState::SnapshotData && sync.state != SyncState::SnapshotWaiting) { trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id); - sync.continue_sync(io); return Ok(()); } @@ -569,7 +500,6 @@ impl SyncHandler { } sync.snapshot.clear(); - sync.continue_sync(io); return Ok(()); }, RestorationStatus::Initializing { .. } => { @@ -594,7 +524,6 @@ impl SyncHandler { Err(()) => { trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id); io.disconnect_peer(peer_id); - sync.continue_sync(io); return Ok(()); } } @@ -603,15 +532,12 @@ impl SyncHandler { // wait for snapshot restoration process to complete sync.state = SyncState::SnapshotWaiting; } - // give a task to the same peer first. - sync.sync_peer(io, peer_id, false); - // give tasks to other peers - sync.continue_sync(io); + Ok(()) } /// Called by peer to report status - fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { sync.handshaking_peers.remove(&peer_id); let protocol_version: u8 = r.val_at(0)?; let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0; @@ -647,23 +573,20 @@ impl SyncHandler { } let chain_info = io.chain().chain_info(); if peer.genesis != chain_info.genesis_hash { - io.disable_peer(peer_id); trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis); - return Ok(()); + return Err(DownloaderImportError::Invalid); } if peer.network_id != sync.network_id { - io.disable_peer(peer_id); trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id); - return Ok(()); + return Err(DownloaderImportError::Invalid); } if false || (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0)) || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0)) { - io.disable_peer(peer_id); trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); - return Ok(()); + return Err(DownloaderImportError::Invalid); } if sync.sync_start_time.is_none() { @@ -676,22 +599,15 @@ impl SyncHandler { sync.active_peers.insert(peer_id.clone()); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); - match sync.fork_block { - Some((fork_block, _)) => { - SyncRequester::request_fork_header(sync, io, peer_id, fork_block); - }, - _ => { - // when there's no `fork_block` defined we initialize the peer with - // `confirmation: ForkConfirmation::Confirmed`. - sync.sync_peer(io, peer_id, false); - } + if let Some((fork_block, _)) = sync.fork_block { + SyncRequester::request_fork_header(sync, io, peer_id, fork_block); } Ok(()) } /// Called when peer sends us new transactions - fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { // Accept transactions only when fully synced if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) { trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); @@ -715,7 +631,7 @@ impl SyncHandler { } /// Called when peer sends us signed private transaction packet - fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); @@ -729,7 +645,7 @@ impl SyncHandler { } /// Called when peer sends us new private transaction packet - fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(());