// Copyright 2015-2020 Parity Technologies (UK) Ltd. // This file is part of Parity Ethereum. // Parity Ethereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Ethereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . use std::time::Instant; use crate::{ block_sync::BlockRequest, sync_io::SyncIo }; use bytes::Bytes; use ethereum_types::H256; use log::{debug, trace, warn}; use network::{PeerId}; use rlp::RlpStream; use common_types::BlockNumber; use super::sync_packet::SyncPacket; use super::sync_packet::SyncPacket::{ GetBlockHeadersPacket, GetBlockBodiesPacket, GetReceiptsPacket, GetSnapshotManifestPacket, GetSnapshotDataPacket, GetPrivateStatePacket, }; use super::{ BlockSet, ChainSync, PeerAsking, }; /// The Chain Sync Requester: requesting data to other peers pub struct SyncRequester; impl SyncRequester { /// Perform block download request` pub fn request_blocks(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, request: BlockRequest, block_set: BlockSet) { match request { BlockRequest::Headers { start, count, skip } => { SyncRequester::request_headers_by_hash(sync, io, peer_id, &start, count, skip, false, block_set); }, BlockRequest::Bodies { hashes } => { SyncRequester::request_bodies(sync, io, peer_id, hashes, block_set); }, BlockRequest::Receipts { hashes } => { SyncRequester::request_receipts(sync, io, peer_id, hashes, block_set); }, } } /// Request block bodies from a peer fn request_bodies(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, hashes: Vec, set: BlockSet) { let mut rlp = RlpStream::new_list(hashes.len()); trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set); for h in &hashes { rlp.append(&h.clone()); } SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GetBlockBodiesPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); } /// Request headers from a peer by block number pub fn request_fork_header(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, n: BlockNumber) { trace!(target: "sync", "{} <- GetForkHeader: at {}", peer_id, n); let mut rlp = RlpStream::new_list(4); rlp.append(&n); rlp.append(&1u32); rlp.append(&0u32); rlp.append(&0u32); SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GetBlockHeadersPacket, rlp.out()); } /// Find some headers or blocks to download from a peer. pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) { // find chunk data to download if let Some(hash) = sync.snapshot.needed_chunk() { if let Some(mut peer) = sync.peers.get_mut(&peer_id) { peer.asking_snapshot_data = Some(hash.clone()); } SyncRequester::request_snapshot_chunk(sync, io, peer_id, &hash); } } /// Request snapshot manifest from a peer. pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) { trace!(target: "sync", "{}: requesting a snapshot manifest", peer_id); SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp::EMPTY_LIST_RLP.to_vec()); } pub fn request_private_state(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, hash: &H256) { trace!(target: "privatetx", "{} <- GetPrivateStatePacket", peer_id); let mut rlp = RlpStream::new_list(1); rlp.append(hash); SyncRequester::send_request(sync, io, peer_id, PeerAsking::PrivateState, GetPrivateStatePacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_private_state = Some(hash.clone()); } /// Request headers from a peer by block hash fn request_headers_by_hash(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) { trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set); let mut rlp = RlpStream::new_list(4); rlp.append(h); rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GetBlockHeadersPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_hash = Some(h.clone()); peer.block_set = Some(set); } /// Request block receipts from a peer fn request_receipts(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, hashes: Vec, set: BlockSet) { let mut rlp = RlpStream::new_list(hashes.len()); trace!(target: "sync", "{} <- GetBlockReceipts: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set); for h in &hashes { rlp.append(&h.clone()); } SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GetReceiptsPacket, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); } /// Request snapshot chunk from a peer. fn request_snapshot_chunk(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, chunk: &H256) { trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk); let mut rlp = RlpStream::new_list(1); rlp.append(chunk); SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GetSnapshotDataPacket, rlp.out()); } /// Generic request sender fn send_request(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacket, packet: Bytes) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } peer.asking = asking; peer.ask_time = Instant::now(); let result = io.send(peer_id, packet_id, packet); if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); io.disconnect_peer(peer_id); } } } }