// Copyright 2015-2020 Parity Technologies (UK) Ltd. // This file is part of OpenEthereum. // OpenEthereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // OpenEthereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with OpenEthereum. If not, see . use bytes::Bytes; #[cfg(not(test))] use devp2p::PAYLOAD_SOFT_LIMIT; #[cfg(test)] pub const PAYLOAD_SOFT_LIMIT: usize = 100_000; use enum_primitive::FromPrimitive; use ethereum_types::H256; use network::{self, PeerId}; use parking_lot::RwLock; use rlp::{Rlp, RlpStream}; use std::cmp; use types::{ids::BlockId, BlockNumber}; use sync_io::SyncIo; use super::{ request_id::{prepend_request_id, strip_request_id, RequestId}, sync_packet::{PacketInfo, SyncPacket, SyncPacket::*}, }; use super::{ ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, }; use std::borrow::Borrow; /// The Chain Sync Supplier: answers requests from peers with available data pub struct SyncSupplier; impl SyncSupplier { /// Dispatch incoming requests and responses // Take a u8 and not a SyncPacketId because this is the entry point // to chain sync from the outside world. pub fn dispatch_packet( sync: &RwLock, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, data: &[u8], ) { if let Some(id) = SyncPacket::from_u8(packet_id) { let rlp_result = strip_request_id(data, sync.read().borrow(), &peer, &id); let result = match rlp_result { Ok((rlp, request_id)) => match id { GetPooledTransactionsPacket => SyncSupplier::return_rlp( io, &rlp, peer, request_id, SyncSupplier::return_pooled_transactions, |e| format!("Error sending pooled transactions: {:?}", e), ), GetBlockBodiesPacket => SyncSupplier::return_rlp( io, &rlp, peer, request_id, SyncSupplier::return_block_bodies, |e| format!("Error sending block bodies: {:?}", e), ), GetBlockHeadersPacket => SyncSupplier::return_rlp( io, &rlp, peer, request_id, SyncSupplier::return_block_headers, |e| format!("Error sending block headers: {:?}", e), ), GetReceiptsPacket => SyncSupplier::return_rlp( io, &rlp, peer, request_id, SyncSupplier::return_receipts, |e| format!("Error sending receipts: {:?}", e), ), GetSnapshotManifestPacket => SyncSupplier::return_rlp( io, &rlp, peer, request_id, SyncSupplier::return_snapshot_manifest, |e| format!("Error sending snapshot manifest: {:?}", e), ), GetSnapshotDataPacket => SyncSupplier::return_rlp( io, &rlp, peer, request_id, SyncSupplier::return_snapshot_data, |e| format!("Error sending snapshot data: {:?}", e), ), StatusPacket => { sync.write().on_packet(io, peer, packet_id, data); Ok(()) } // Packets that require the peer to be confirmed _ => { if !sync.read().peers.contains_key(&peer) { debug!(target: "sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); return; } debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); match id { ConsensusDataPacket => SyncHandler::on_consensus_packet(io, peer, &rlp), TransactionsPacket => { let res = { let sync_ro = sync.read(); SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) }; if res.is_err() { // peer sent invalid data, disconnect. io.disable_peer(peer); sync.write().deactivate_peer(io, peer); } } _ => { sync.write().on_packet(io, peer, packet_id, data); } } Ok(()) } }, Err(e) => Err(e.into()), }; match result { Err(PacketProcessError::Decoder(e)) => { debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) } Err(PacketProcessError::ClientBusy) => { sync.write().add_delayed_request(peer, packet_id, data) } Ok(()) => {} } } } /// Dispatch delayed request /// The main difference with dispatch packet is the direct send of the responses to the peer pub fn dispatch_delayed_request( sync: &RwLock, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, data: &[u8], ) { if let Some(id) = SyncPacket::from_u8(packet_id) { let rlp_result = strip_request_id(data, sync.read().borrow(), &peer, &id); let result = match rlp_result { Ok((rlp, request_id)) => match id { GetBlockHeadersPacket => SyncSupplier::send_rlp( io, &rlp, peer, request_id, SyncSupplier::return_block_headers, |e| format!("Error sending block headers: {:?}", e), ), _ => { debug!(target: "sync", "Unexpected packet {} was dispatched for delayed processing", packet_id); Ok(()) } }, Err(e) => Err(e.into()), }; match result { Err(PacketProcessError::Decoder(e)) => { debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) } Err(PacketProcessError::ClientBusy) => { sync.write().add_delayed_request(peer, packet_id, data) } Ok(()) => {} } } } /// Respond to GetBlockHeaders request fn return_block_headers(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { // Cannot return blocks, if forks processing is in progress, // The request should be postponed for later processing if io.chain().is_processing_fork() { return Err(PacketProcessError::ClientBusy); } // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] let max_headers: usize = r.val_at(1)?; let skip: usize = r.val_at(2)?; let reverse: bool = r.val_at(3)?; let last = io.chain().chain_info().best_block_number; let number = if r.at(0)?.size() == 32 { // id is a hash let hash: H256 = r.val_at(0)?; trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse); match io.chain().block_header(BlockId::Hash(hash)) { Some(hdr) => { let number = hdr.number().into(); debug_assert_eq!(hdr.hash(), hash); if max_headers == 1 || io.chain().block_hash(BlockId::Number(number)) != Some(hash) { // Non canonical header or single header requested // TODO: handle single-step reverse hashchains of non-canon hashes trace!(target:"sync", "Returning single header: {:?}", hash); let mut rlp = RlpStream::new_list(1); rlp.append_raw(&hdr.into_inner(), 1); return Ok(Some((BlockHeadersPacket, rlp))); } number } None => return Ok(Some((BlockHeadersPacket, RlpStream::new_list(0)))), //no such header, return nothing } } else { let number = r.val_at::(0)?; trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, number, max_headers, skip, reverse); number }; let mut number = if reverse { cmp::min(last, number) } else { cmp::max(0, number) }; let max_count = cmp::min(MAX_HEADERS_TO_SEND, max_headers); let mut count = 0; let mut data = Bytes::new(); let inc = skip.saturating_add(1) as BlockNumber; let overlay = io.chain_overlay().read(); // We are checking the `overlay` as well since it's where the ForkBlock // header is cached : so peers can confirm we are on the right fork, // even if we are not synced until the fork block while (number <= last || overlay.contains_key(&number)) && count < max_count { if let Some(hdr) = overlay.get(&number) { trace!(target: "sync", "{}: Returning cached fork header", peer_id); data.extend_from_slice(hdr); count += 1; } else if let Some(hdr) = io.chain().block_header(BlockId::Number(number)) { data.append(&mut hdr.into_inner()); count += 1; // Check that the packet won't be oversized if data.len() > PAYLOAD_SOFT_LIMIT { break; } } else { // No required block. break; } if reverse { if number <= inc || number == 0 { break; } number = number.saturating_sub(inc); } else { number = number.saturating_add(inc); } } let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count); Ok(Some((BlockHeadersPacket, rlp))) } /// Respond to GetPooledTransactions request fn return_pooled_transactions(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let mut added = 0; let mut rlp = RlpStream::new(); rlp.begin_unbounded_list(); for v in r { if let Ok(hash) = v.as_val::() { if let Some(tx) = io.chain().queued_transaction(hash) { tx.signed().rlp_append(&mut rlp); added += 1; if rlp.len() > PAYLOAD_SOFT_LIMIT { break; } } } } rlp.finalize_unbounded_list(); trace!(target: "sync", "{} -> GetPooledTransactions: returned {} entries", peer_id, added); Ok(Some((PooledTransactionsPacket, rlp))) } /// Respond to GetBlockBodies request fn return_block_bodies(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let mut count = r.item_count().unwrap_or(0); if count == 0 { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); return Ok(None); } count = cmp::min(count, MAX_BODIES_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); for i in 0..count { if let Some(body) = io.chain().block_body(BlockId::Hash(r.val_at::(i)?)) { data.append(&mut body.into_inner()); added += 1; // Check that the packet won't be oversized if data.len() > PAYLOAD_SOFT_LIMIT { break; } } } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added); Ok(Some((BlockBodiesPacket, rlp))) } fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { let mut count = rlp.item_count().unwrap_or(0); trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count); if count == 0 { debug!(target: "sync", "Empty GetReceipts request, ignoring."); return Ok(None); } count = cmp::min(count, MAX_RECEIPTS_HEADERS_TO_SEND); let mut added_headers = 0usize; let mut data = Bytes::new(); let mut total_bytes = 0; for i in 0..count { if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::(i)?) { let mut receipts_bytes = ::rlp::encode(&receipts); total_bytes += receipts_bytes.len(); if total_bytes > PAYLOAD_SOFT_LIMIT { break; } data.append(&mut receipts_bytes); added_headers += 1; } } let mut rlp_result = RlpStream::new_list(added_headers); rlp_result.append_raw(&data, added_headers); Ok(Some((ReceiptsPacket, rlp_result))) } /// Respond to GetSnapshotManifest request fn return_snapshot_manifest(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let count = r.item_count().unwrap_or(0); trace!(target: "warp", "{} -> GetSnapshotManifest", peer_id); if count != 0 { debug!(target: "warp", "Invalid GetSnapshotManifest request, ignoring."); return Ok(None); } let rlp = match io.snapshot_service().manifest() { Some(manifest) => { trace!(target: "warp", "{} <- SnapshotManifest", peer_id); let mut rlp = RlpStream::new_list(1); rlp.append_raw(&manifest.into_rlp(), 1); rlp } None => { trace!(target: "warp", "{}: No snapshot manifest to return", peer_id); RlpStream::new_list(0) } }; Ok(Some((SnapshotManifestPacket, rlp))) } /// Respond to GetSnapshotData request fn return_snapshot_data(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let hash: H256 = r.val_at(0)?; trace!(target: "warp", "{} -> GetSnapshotData {:?}", peer_id, hash); let rlp = match io.snapshot_service().chunk(hash) { Some(data) => { let mut rlp = RlpStream::new_list(1); trace!(target: "warp", "{} <- SnapshotData", peer_id); rlp.append(&data); rlp } None => { trace!(target: "warp", "{}: No snapshot data to return", peer_id); RlpStream::new_list(0) } }; Ok(Some((SnapshotDataPacket, rlp))) } fn return_rlp( io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, request_id: Option, rlp_func: FRlp, error_func: FError, ) -> Result<(), PacketProcessError> where FRlp: Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult, FError: FnOnce(network::Error) -> String, { let response = rlp_func(io, rlp, peer); if let Some((packet_id, rlp_stream)) = response? { let rlp_stream = prepend_request_id(rlp_stream, request_id); io.respond(packet_id.id(), rlp_stream.out()) .unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e))); } Ok(()) } fn send_rlp( io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, request_id: Option, rlp_func: FRlp, error_func: FError, ) -> Result<(), PacketProcessError> where FRlp: Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult, FError: FnOnce(network::Error) -> String, { let response = rlp_func(io, rlp, peer); match response { Err(e) => Err(e), Ok(Some((packet_id, rlp_stream))) => { let rlp_stream = prepend_request_id(rlp_stream, request_id); io.send(peer, packet_id, rlp_stream.out()) .unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e))); Ok(()) } _ => Ok(()), } } } #[cfg(test)] mod test { use super::{super::tests::*, *}; use blocks::SyncHeader; use bytes::Bytes; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient}; use ethereum_types::H256; use parking_lot::RwLock; use rlp::{Rlp, RlpStream}; use std::{collections::VecDeque, str::FromStr}; use tests::{helpers::TestIo, snapshot::TestSnapshotService}; #[test] fn return_block_headers() { fn make_hash_req(h: &H256, count: usize, skip: usize, reverse: bool) -> Bytes { let mut rlp = RlpStream::new_list(4); rlp.append(h); rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse { 1u32 } else { 0u32 }); rlp.out() } fn make_num_req(n: usize, count: usize, skip: usize, reverse: bool) -> Bytes { let mut rlp = RlpStream::new_list(4); rlp.append(&n); rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse { 1u32 } else { 0u32 }); rlp.out() } fn to_header_vec( rlp: ::chain::RlpResponseResult, eip1559_transition: BlockNumber, ) -> Vec { Rlp::new(&rlp.unwrap().unwrap().1.out()) .iter() .map(|r| SyncHeader::from_rlp(r.as_raw().to_vec(), eip1559_transition).unwrap()) .collect() } let mut client = TestBlockChainClient::new(); let eip1559_transition = client.spec.params().eip1559_transition; client.add_blocks(100, EachBlockWith::Nothing); let blocks: Vec<_> = (0..100) .map(|i| { (&client as &dyn BlockChainClient) .block(BlockId::Number(i as BlockNumber)) .map(|b| b.into_inner()) .unwrap() }) .collect(); let headers: Vec<_> = blocks .iter() .map(|b| { SyncHeader::from_rlp( Rlp::new(b).at(0).unwrap().as_raw().to_vec(), eip1559_transition, ) .unwrap() }) .collect(); let hashes: Vec<_> = headers.iter().map(|h| h.header.hash()).collect(); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let io = TestIo::new(&mut client, &ss, &queue, None); let unknown: H256 = H256::default(); let result = SyncSupplier::return_block_headers( &io, &Rlp::new(&make_hash_req(&unknown, 1, 0, false)), 0, ); assert!(to_header_vec(result, eip1559_transition).is_empty(),); let result = SyncSupplier::return_block_headers( &io, &Rlp::new(&make_hash_req(&unknown, 1, 0, true)), 0, ); assert!(to_header_vec(result, eip1559_transition).is_empty()); let result = SyncSupplier::return_block_headers( &io, &Rlp::new(&make_hash_req(&hashes[2], 1, 0, true)), 0, ); assert_eq!( to_header_vec(result, eip1559_transition), vec![headers[2].clone()] ); let result = SyncSupplier::return_block_headers( &io, &Rlp::new(&make_hash_req(&hashes[2], 1, 0, false)), 0, ); assert_eq!( to_header_vec(result, eip1559_transition), vec![headers[2].clone()] ); let result = SyncSupplier::return_block_headers( &io, &Rlp::new(&make_hash_req(&hashes[50], 3, 5, false)), 0, ); assert_eq!( to_header_vec(result, eip1559_transition), vec![ headers[50].clone(), headers[56].clone(), headers[62].clone() ] ); let result = SyncSupplier::return_block_headers( &io, &Rlp::new(&make_hash_req(&hashes[50], 3, 5, true)), 0, ); assert_eq!( to_header_vec(result, eip1559_transition), vec![ headers[50].clone(), headers[44].clone(), headers[38].clone() ] ); let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(2, 1, 0, true)), 0); assert_eq!( to_header_vec(result, eip1559_transition), vec![headers[2].clone()] ); let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(2, 1, 0, false)), 0); assert_eq!( to_header_vec(result, eip1559_transition), vec![headers[2].clone()] ); let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(50, 3, 5, false)), 0); assert_eq!( to_header_vec(result, eip1559_transition), vec![ headers[50].clone(), headers[56].clone(), headers[62].clone() ] ); let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(50, 3, 5, true)), 0); assert_eq!( to_header_vec(result, eip1559_transition), vec![ headers[50].clone(), headers[44].clone(), headers[38].clone() ] ); } #[test] fn respect_packet_limit() { let small_num_blocks = 10; let large_num_blocks = 50; let tx_per_block = 100; let mut client = TestBlockChainClient::new(); client.add_blocks(large_num_blocks, EachBlockWith::Transactions(tx_per_block)); let mut small_rlp_request = RlpStream::new_list(small_num_blocks); let mut large_rlp_request = RlpStream::new_list(large_num_blocks); for i in 0..small_num_blocks { let hash: H256 = client.block_hash(BlockId::Number(i as u64)).unwrap(); small_rlp_request.append(&hash); large_rlp_request.append(&hash); } for i in small_num_blocks..large_num_blocks { let hash: H256 = client.block_hash(BlockId::Number(i as u64)).unwrap(); large_rlp_request.append(&hash); } let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let io = TestIo::new(&mut client, &ss, &queue, None); let small_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&small_rlp_request.out()), 0); let small_result = small_result.unwrap().unwrap().1; assert_eq!( Rlp::new(&small_result.out()).item_count().unwrap(), small_num_blocks ); let large_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&large_rlp_request.out()), 0); let large_result = large_result.unwrap().unwrap().1; assert!(Rlp::new(&large_result.out()).item_count().unwrap() < large_num_blocks); } #[test] fn return_receipts_empty() { let mut client = TestBlockChainClient::new(); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let io = TestIo::new(&mut client, &ss, &queue, None); let result = SyncSupplier::return_receipts(&io, &Rlp::new(&[0xc0]), 0); assert!(result.is_ok()); } #[test] fn return_receipts() { let mut client = TestBlockChainClient::new(); let queue = RwLock::new(VecDeque::new()); let sync = dummy_sync_with_peer(H256::default(), &client); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut receipt_list = RlpStream::new_list(4); receipt_list.append( &H256::from_str("0000000000000000000000000000000000000000000000005555555555555555") .unwrap(), ); receipt_list.append( &H256::from_str("ff00000000000000000000000000000000000000000000000000000000000000") .unwrap(), ); receipt_list.append( &H256::from_str("fff0000000000000000000000000000000000000000000000000000000000000") .unwrap(), ); receipt_list.append( &H256::from_str("aff0000000000000000000000000000000000000000000000000000000000000") .unwrap(), ); let receipts_request = receipt_list.out(); // it returns rlp ONLY for hashes started with "f" let result = SyncSupplier::return_receipts(&io, &Rlp::new(&receipts_request.clone()), 0); assert!(result.is_ok()); let rlp_result = result.unwrap(); assert!(rlp_result.is_some()); // the length of two rlp-encoded receipts assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); SyncSupplier::dispatch_packet( &RwLock::new(sync), &mut io, 0usize, GetReceiptsPacket.id(), &receipts_request, ); assert_eq!(1, io.packets.len()); } }