From 43ee52090498d947f95ff0055ef21101855db4c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20M=C3=BCller?= Date: Mon, 5 Jul 2021 15:59:22 +0200 Subject: [PATCH] Implement eth/66 (#467) * Allow eth/66 * Add eth/66 request ids * fmt * Remove some leftovers * fmt * Change behaviour in case of missing peer info - Assume eth/66 protocol, not earlier one - Log just a trace, not an error --- crates/ethcore/sync/src/api.rs | 3 +- crates/ethcore/sync/src/chain/handler.rs | 59 +++--- crates/ethcore/sync/src/chain/mod.rs | 3 + crates/ethcore/sync/src/chain/request_id.rs | 148 ++++++++++++++ crates/ethcore/sync/src/chain/requester.rs | 7 +- crates/ethcore/sync/src/chain/supplier.rs | 193 +++++++++++-------- crates/ethcore/sync/src/chain/sync_packet.rs | 16 ++ crates/ethcore/sync/src/tests/helpers.rs | 4 +- 8 files changed, 319 insertions(+), 114 deletions(-) create mode 100644 crates/ethcore/sync/src/chain/request_id.rs diff --git a/crates/ethcore/sync/src/api.rs b/crates/ethcore/sync/src/api.rs index 14ad8773d..4a8de0323 100644 --- a/crates/ethcore/sync/src/api.rs +++ b/crates/ethcore/sync/src/api.rs @@ -33,7 +33,7 @@ use std::{ use chain::{ fork_filter::ForkFilterApi, ChainSyncApi, SyncState, SyncStatus as EthSyncStatus, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, + ETH_PROTOCOL_VERSION_66, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, }; use ethcore::{ client::{BlockChainClient, ChainMessageType, ChainNotify, NewBlocks}, @@ -571,6 +571,7 @@ impl ChainNotify for EthSync { ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, + ETH_PROTOCOL_VERSION_66, ], ) .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); diff --git a/crates/ethcore/sync/src/chain/handler.rs b/crates/ethcore/sync/src/chain/handler.rs index fdc4b9d90..265e3e4c8 100644 --- a/crates/ethcore/sync/src/chain/handler.rs +++ b/crates/ethcore/sync/src/chain/handler.rs @@ -32,14 +32,17 @@ use std::{cmp, mem, time::Instant}; use sync_io::SyncIo; use types::{block_status::BlockStatus, ids::BlockId, BlockNumber}; -use super::sync_packet::{ - PacketInfo, - SyncPacket::{self, *}, +use super::{ + request_id::strip_request_id, + sync_packet::{ + PacketInfo, + SyncPacket::{self, *}, + }, }; use super::{ BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester, - SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, + SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_66, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, }; @@ -55,27 +58,33 @@ impl SyncHandler { packet_id: u8, data: &[u8], ) { - let rlp = Rlp::new(data); if let Some(packet_id) = SyncPacket::from_u8(packet_id) { - let result = match packet_id { - StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp), - BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), - BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), - ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), - NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), - NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), - NewPooledTransactionHashesPacket => { - SyncHandler::on_peer_new_pooled_transaction_hashes(sync, io, peer, &rlp) - } - PooledTransactionsPacket => { - SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp) - } - SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), - SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), - _ => { - debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id()); - Ok(()) - } + let rlp_result = strip_request_id(data, sync, &peer, &packet_id); + + let result = match rlp_result { + Ok((rlp, _)) => match packet_id { + StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp), + BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), + BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), + ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), + NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), + NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), + NewPooledTransactionHashesPacket => { + SyncHandler::on_peer_new_pooled_transaction_hashes(sync, io, peer, &rlp) + } + PooledTransactionsPacket => { + SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp) + } + SnapshotManifestPacket => { + SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp) + } + SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), + _ => { + debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id()); + Ok(()) + } + }, + Err(e) => Err(e.into()), }; match result { @@ -797,7 +806,7 @@ impl SyncHandler { || peer.protocol_version > PAR_PROTOCOL_VERSION_2.0)) || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 - || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0)) + || peer.protocol_version > ETH_PROTOCOL_VERSION_66.0)) { trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); return Err(DownloaderImportError::Invalid); diff --git a/crates/ethcore/sync/src/chain/mod.rs b/crates/ethcore/sync/src/chain/mod.rs index dc72c88ea..8e0cecb7a 100644 --- a/crates/ethcore/sync/src/chain/mod.rs +++ b/crates/ethcore/sync/src/chain/mod.rs @@ -90,6 +90,7 @@ pub mod fork_filter; mod handler; mod propagator; +pub mod request_id; mod requester; mod supplier; pub mod sync_packet; @@ -153,6 +154,8 @@ impl From for PacketProcessError { } } +/// Version 66 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). +pub const ETH_PROTOCOL_VERSION_66: (u8, u8) = (66, 0x11); /// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11); /// 64 version of Ethereum protocol. diff --git a/crates/ethcore/sync/src/chain/request_id.rs b/crates/ethcore/sync/src/chain/request_id.rs new file mode 100644 index 000000000..29f24db8d --- /dev/null +++ b/crates/ethcore/sync/src/chain/request_id.rs @@ -0,0 +1,148 @@ +use bytes::Bytes; +use chain::{ + sync_packet::{PacketInfo, SyncPacket}, + ChainSync, PeerInfo, +}; +use network::PeerId; +use rlp::{DecoderError, Rlp, RlpStream}; + +pub type RequestId = u64; + +// Separate the eth/66 request id from a packet, if it exists. +pub fn strip_request_id<'a>( + data: &'a [u8], + sync: &ChainSync, + peer: &PeerId, + packet_id: &SyncPacket, +) -> Result<(Rlp<'a>, Option), DecoderError> { + let protocol_version = if let Some(peer_info) = sync.peers.get(peer) { + peer_info.protocol_version + } else { + trace!( + "Peer info missing for peer {}, assuming protocol version 66", + peer + ); + 66 + }; + + let has_request_id = protocol_version >= 66 && packet_id.has_request_id_in_eth_66(); + + do_strip_request_id(data, has_request_id) +} + +fn do_strip_request_id<'a>( + data: &'a [u8], + has_request_id: bool, +) -> Result<(Rlp<'a>, Option), DecoderError> { + let rlp = Rlp::new(data); + + if has_request_id { + let request_id: RequestId = rlp.val_at(0)?; + let stripped_rlp = rlp.at(1)?; + Ok((stripped_rlp, Some(request_id))) + } else { + Ok((rlp, None)) + } +} + +// Add a given eth/66 request id to a packet being built. +pub fn prepend_request_id(rlp: RlpStream, request_id: Option) -> RlpStream { + match request_id { + Some(ref id) => { + let mut stream = RlpStream::new_list(2); + stream.append(id); + stream.append_raw(&rlp.out(), 1); + stream + } + None => rlp, + } +} + +/// Prepend a new eth/66 request id to the packet if appropriate. +pub fn generate_request_id( + packet: Bytes, + peer: &PeerInfo, + packet_id: SyncPacket, +) -> (Bytes, Option) { + if peer.protocol_version >= 66 && packet_id.has_request_id_in_eth_66() { + do_generate_request_id(&packet) + } else { + (packet, None) + } +} + +fn do_generate_request_id(packet: &Bytes) -> (Bytes, Option) { + let request_id: RequestId = rand::random(); + + let mut rlp = RlpStream::new_list(2); + rlp.append(&request_id); + rlp.append_raw(packet, 1); + + (rlp.out(), Some(request_id)) +} + +#[cfg(test)] +mod tests { + use super::*; + use ethereum_types::H256; + + #[test] + fn test_prepend_request_id() { + let mut request = RlpStream::new_list(2); + request.append(&H256::from_low_u64_be(1)); + request.append(&H256::from_low_u64_be(2)); + + let with_id = prepend_request_id(request, Some(10)); + let rlp = Rlp::new(with_id.as_raw()); + let recovered_id: RequestId = rlp.val_at(0).unwrap(); + let recovered_request: Vec = rlp.at(1).unwrap().as_list().unwrap(); + + assert_eq!(recovered_id, 10); + assert_eq!( + recovered_request, + [H256::from_low_u64_be(1), H256::from_low_u64_be(2)] + ); + } + + #[test] + fn test_strip_request_id() { + let request = vec![ + H256::from_low_u64_be(1), + H256::from_low_u64_be(2), + H256::from_low_u64_be(3), + ]; + + let mut request_with_id = RlpStream::new_list(2); + request_with_id.append(&20u64); + request_with_id.append_list(&request); + let data = request_with_id.out(); + + let (rlp, id) = do_strip_request_id(&data, true).unwrap(); + + assert_eq!(id, Some(20)); + assert_eq!(rlp.as_list::().unwrap(), request); + } + + #[test] + fn test_generate_request_id() { + let request = vec![ + H256::from_low_u64_be(1), + H256::from_low_u64_be(2), + H256::from_low_u64_be(3), + ]; + + let mut stream = RlpStream::new_list(3); + for hash in &request { + stream.append(hash); + } + let data = stream.out(); + + let (new_data, id) = do_generate_request_id(&data); + + let recovered = Rlp::new(&new_data); + let recovered_id: RequestId = recovered.val_at(0).unwrap(); + let recovered_request: Vec = recovered.at(1).unwrap().as_list().unwrap(); + assert_eq!(recovered_id, id.unwrap()); + assert_eq!(recovered_request, request); + } +} diff --git a/crates/ethcore/sync/src/chain/requester.rs b/crates/ethcore/sync/src/chain/requester.rs index 67529a9d6..aed3313e9 100644 --- a/crates/ethcore/sync/src/chain/requester.rs +++ b/crates/ethcore/sync/src/chain/requester.rs @@ -23,7 +23,10 @@ use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; -use super::sync_packet::{SyncPacket::*, *}; +use super::{ + request_id::generate_request_id, + sync_packet::{SyncPacket::*, *}, +}; use super::{BlockSet, ChainSync, PeerAsking}; @@ -243,6 +246,8 @@ impl SyncRequester { peer.asking = asking; peer.ask_time = Instant::now(); + let (packet, _) = generate_request_id(packet, peer, packet_id); + let result = io.send(peer_id, packet_id, packet); if let Err(e) = result { diff --git a/crates/ethcore/sync/src/chain/supplier.rs b/crates/ethcore/sync/src/chain/supplier.rs index 931e9ea6d..5203b8e68 100644 --- a/crates/ethcore/sync/src/chain/supplier.rs +++ b/crates/ethcore/sync/src/chain/supplier.rs @@ -31,12 +31,16 @@ use types::{ids::BlockId, BlockNumber}; use sync_io::SyncIo; -use super::sync_packet::{PacketInfo, SyncPacket, SyncPacket::*}; +use super::{ + request_id::{prepend_request_id, strip_request_id, RequestId}, + sync_packet::{PacketInfo, SyncPacket, SyncPacket::*}, +}; use super::{ ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, }; +use std::borrow::Borrow; /// The Chain Sync Supplier: answers requests from peers with available data pub struct SyncSupplier; @@ -52,87 +56,98 @@ impl SyncSupplier { packet_id: u8, data: &[u8], ) { - let rlp = Rlp::new(data); - if let Some(id) = SyncPacket::from_u8(packet_id) { - let result = match id { - GetPooledTransactionsPacket => SyncSupplier::return_rlp( - io, - &rlp, - peer, - SyncSupplier::return_pooled_transactions, - |e| format!("Error sending pooled transactions: {:?}", e), - ), + let rlp_result = strip_request_id(data, sync.read().borrow(), &peer, &id); - GetBlockBodiesPacket => SyncSupplier::return_rlp( - io, - &rlp, - peer, - SyncSupplier::return_block_bodies, - |e| format!("Error sending block bodies: {:?}", e), - ), + let result = match rlp_result { + Ok((rlp, request_id)) => match id { + GetPooledTransactionsPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_pooled_transactions, + |e| format!("Error sending pooled transactions: {:?}", e), + ), - GetBlockHeadersPacket => SyncSupplier::return_rlp( - io, - &rlp, - peer, - SyncSupplier::return_block_headers, - |e| format!("Error sending block headers: {:?}", e), - ), + GetBlockBodiesPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_block_bodies, + |e| format!("Error sending block bodies: {:?}", e), + ), - GetReceiptsPacket => { - SyncSupplier::return_rlp(io, &rlp, peer, SyncSupplier::return_receipts, |e| { - format!("Error sending receipts: {:?}", e) - }) - } - GetSnapshotManifestPacket => SyncSupplier::return_rlp( - io, - &rlp, - peer, - SyncSupplier::return_snapshot_manifest, - |e| format!("Error sending snapshot manifest: {:?}", e), - ), + GetBlockHeadersPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_block_headers, + |e| format!("Error sending block headers: {:?}", e), + ), - GetSnapshotDataPacket => SyncSupplier::return_rlp( - io, - &rlp, - peer, - SyncSupplier::return_snapshot_data, - |e| format!("Error sending snapshot data: {:?}", e), - ), + GetReceiptsPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_receipts, + |e| format!("Error sending receipts: {:?}", e), + ), - StatusPacket => { - sync.write().on_packet(io, peer, packet_id, data); - Ok(()) - } - // Packets that require the peer to be confirmed - _ => { - if !sync.read().peers.contains_key(&peer) { - debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); - return; + GetSnapshotManifestPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_snapshot_manifest, + |e| format!("Error sending snapshot manifest: {:?}", e), + ), + + GetSnapshotDataPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_snapshot_data, + |e| format!("Error sending snapshot data: {:?}", e), + ), + + StatusPacket => { + sync.write().on_packet(io, peer, packet_id, data); + Ok(()) } - debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); + // Packets that require the peer to be confirmed + _ => { + if !sync.read().peers.contains_key(&peer) { + debug!(target: "sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); + return; + } + debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); - match id { - ConsensusDataPacket => SyncHandler::on_consensus_packet(io, peer, &rlp), - TransactionsPacket => { - let res = { - let sync_ro = sync.read(); - SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) - }; - if res.is_err() { - // peer sent invalid data, disconnect. - io.disable_peer(peer); - sync.write().deactivate_peer(io, peer); + match id { + ConsensusDataPacket => SyncHandler::on_consensus_packet(io, peer, &rlp), + TransactionsPacket => { + let res = { + let sync_ro = sync.read(); + SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) + }; + if res.is_err() { + // peer sent invalid data, disconnect. + io.disable_peer(peer); + sync.write().deactivate_peer(io, peer); + } + } + _ => { + sync.write().on_packet(io, peer, packet_id, data); } } - _ => { - sync.write().on_packet(io, peer, packet_id, data); - } + Ok(()) } - - Ok(()) - } + }, + Err(e) => Err(e.into()), }; match result { @@ -156,22 +171,26 @@ impl SyncSupplier { packet_id: u8, data: &[u8], ) { - let rlp = Rlp::new(data); - if let Some(id) = SyncPacket::from_u8(packet_id) { - let result = match id { - GetBlockHeadersPacket => SyncSupplier::send_rlp( - io, - &rlp, - peer, - SyncSupplier::return_block_headers, - |e| format!("Error sending block headers: {:?}", e), - ), + let rlp_result = strip_request_id(data, sync.read().borrow(), &peer, &id); - _ => { - debug!(target:"sync", "Unexpected packet {} was dispatched for delayed processing", packet_id); - Ok(()) - } + let result = match rlp_result { + Ok((rlp, request_id)) => match id { + GetBlockHeadersPacket => SyncSupplier::send_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_block_headers, + |e| format!("Error sending block headers: {:?}", e), + ), + + _ => { + debug!(target: "sync", "Unexpected packet {} was dispatched for delayed processing", packet_id); + Ok(()) + } + }, + Err(e) => Err(e.into()), }; match result { @@ -394,6 +413,7 @@ impl SyncSupplier { io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, + request_id: Option, rlp_func: FRlp, error_func: FError, ) -> Result<(), PacketProcessError> @@ -403,6 +423,7 @@ impl SyncSupplier { { let response = rlp_func(io, rlp, peer); if let Some((packet_id, rlp_stream)) = response? { + let rlp_stream = prepend_request_id(rlp_stream, request_id); io.respond(packet_id.id(), rlp_stream.out()) .unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e))); } @@ -413,6 +434,7 @@ impl SyncSupplier { io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, + request_id: Option, rlp_func: FRlp, error_func: FError, ) -> Result<(), PacketProcessError> @@ -424,6 +446,7 @@ impl SyncSupplier { match response { Err(e) => Err(e), Ok(Some((packet_id, rlp_stream))) => { + let rlp_stream = prepend_request_id(rlp_stream, request_id); io.send(peer, packet_id, rlp_stream.out()) .unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e))); Ok(()) diff --git a/crates/ethcore/sync/src/chain/sync_packet.rs b/crates/ethcore/sync/src/chain/sync_packet.rs index dd2afc39a..e25dd7b50 100644 --- a/crates/ethcore/sync/src/chain/sync_packet.rs +++ b/crates/ethcore/sync/src/chain/sync_packet.rs @@ -68,6 +68,7 @@ use self::SyncPacket::*; pub trait PacketInfo { fn id(&self) -> PacketId; fn protocol(&self) -> ProtocolId; + fn has_request_id_in_eth_66(&self) -> bool; } // The mechanism to match packet ids and protocol may be improved @@ -102,6 +103,21 @@ impl PacketInfo for SyncPacket { fn id(&self) -> PacketId { (*self) as PacketId } + + fn has_request_id_in_eth_66(&self) -> bool { + // Note: NodeDataPacket and GetNodeDataPacket also get a request id in eth-66. + match self { + GetBlockHeadersPacket + | BlockHeadersPacket + | GetBlockBodiesPacket + | BlockBodiesPacket + | GetPooledTransactionsPacket + | PooledTransactionsPacket + | GetReceiptsPacket + | ReceiptsPacket => true, + _ => false, + } + } } #[cfg(test)] diff --git a/crates/ethcore/sync/src/tests/helpers.rs b/crates/ethcore/sync/src/tests/helpers.rs index bd49baf36..87e6c140c 100644 --- a/crates/ethcore/sync/src/tests/helpers.rs +++ b/crates/ethcore/sync/src/tests/helpers.rs @@ -18,7 +18,7 @@ use api::PAR_PROTOCOL; use bytes::Bytes; use chain::{ sync_packet::{PacketInfo, SyncPacket}, - ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_65, PAR_PROTOCOL_VERSION_2, + ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_66, PAR_PROTOCOL_VERSION_2, }; use ethcore::{ client::{ @@ -172,7 +172,7 @@ where if protocol == PAR_PROTOCOL { PAR_PROTOCOL_VERSION_2.0 } else { - ETH_PROTOCOL_VERSION_65.0 + ETH_PROTOCOL_VERSION_66.0 } }