From c03cc154685a655b383e9227165acc9dfff60eba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20M=C3=BCller?= Date: Tue, 27 Apr 2021 15:33:40 +0200 Subject: [PATCH] Implement eth/65 (#352) * EIP-2464: eth/65: transaction announcements and retrievals * Updates to eth-65 implementatiom - Replace deprecated syntax and method name - Replace call to removed method - Reimplement rlp::Encodable for SignedTransaction * fmt * More updates - Replace calls to removed methods - More dyns * Apply requested changes - Avoid unused variable warnings - Avoid implementing Encodable trait for Transaction - Update message encoding to EIP-2718 transaction format * Update sync stats of eth-65 peers properly * fmt & fix error propagation * Fix: Add correct subset of transactions to stats. * Update list of last sent transactions correctly. Co-authored-by: Artem Vorotnikov Co-authored-by: Karim Agha Co-authored-by: rakita --- crates/ethcore/src/client/client.rs | 10 +- crates/ethcore/src/client/test_client.rs | 9 +- crates/ethcore/src/client/traits.rs | 5 +- crates/ethcore/sync/src/api.rs | 10 +- crates/ethcore/sync/src/chain/handler.rs | 79 +++++++++- crates/ethcore/sync/src/chain/mod.rs | 154 +++++++++++++++++-- crates/ethcore/sync/src/chain/propagator.rs | 80 ++++++---- crates/ethcore/sync/src/chain/requester.rs | 31 +++- crates/ethcore/sync/src/chain/supplier.rs | 40 +++-- crates/ethcore/sync/src/chain/sync_packet.rs | 6 + crates/ethcore/sync/src/tests/helpers.rs | 4 +- crates/rpc/src/v1/impls/eth.rs | 2 +- 12 files changed, 349 insertions(+), 81 deletions(-) diff --git a/crates/ethcore/src/client/client.rs b/crates/ethcore/src/client/client.rs index 9021c45ca..3ac3544b1 100644 --- a/crates/ethcore/src/client/client.rs +++ b/crates/ethcore/src/client/client.rs @@ -2344,11 +2344,15 @@ impl BlockChainClient for Client { Some(keys) } - fn transaction(&self, id: TransactionId) -> Option { + fn block_transaction(&self, id: TransactionId) -> Option { self.transaction_address(id) .and_then(|address| self.chain.read().transaction(&address)) } + fn queued_transaction(&self, hash: H256) -> Option> { + self.importer.miner.transaction(&hash) + } + fn uncle(&self, id: UncleId) -> Option { let index = id.position; self.block_body(id.block) @@ -3031,11 +3035,11 @@ impl super::traits::EngineClient for Client { } fn block_number(&self, id: BlockId) -> Option { - BlockChainClient::block_number(self, id) + ::block_number(self, id) } fn block_header(&self, id: BlockId) -> Option { - BlockChainClient::block_header(self, id) + ::block_header(self, id) } } diff --git a/crates/ethcore/src/client/test_client.rs b/crates/ethcore/src/client/test_client.rs index 960e95915..5dbb70002 100644 --- a/crates/ethcore/src/client/test_client.rs +++ b/crates/ethcore/src/client/test_client.rs @@ -809,9 +809,12 @@ impl BlockChainClient for TestBlockChainClient { ) -> Option> { None } - fn transaction(&self, _id: TransactionId) -> Option { + fn block_transaction(&self, _id: TransactionId) -> Option { None // Simple default. } + fn queued_transaction(&self, _hash: H256) -> Option> { + None + } fn uncle(&self, _id: UncleId) -> Option { None // Simple default. @@ -1144,11 +1147,11 @@ impl super::traits::EngineClient for TestBlockChainClient { } fn block_number(&self, id: BlockId) -> Option { - BlockChainClient::block_number(self, id) + ::block_number(self, id) } fn block_header(&self, id: BlockId) -> Option { - BlockChainClient::block_header(self, id) + ::block_header(self, id) } } diff --git a/crates/ethcore/src/client/traits.rs b/crates/ethcore/src/client/traits.rs index 6546710b0..80eea0503 100644 --- a/crates/ethcore/src/client/traits.rs +++ b/crates/ethcore/src/client/traits.rs @@ -312,7 +312,10 @@ pub trait BlockChainClient: ) -> Option>; /// Get transaction with given hash. - fn transaction(&self, id: TransactionId) -> Option; + fn block_transaction(&self, id: TransactionId) -> Option; + + /// Get pool transaction with a given hash. + fn queued_transaction(&self, hash: H256) -> Option>; /// Get uncle with given id. fn uncle(&self, id: UncleId) -> Option; diff --git a/crates/ethcore/sync/src/api.rs b/crates/ethcore/sync/src/api.rs index cb75861de..bc1d7fd78 100644 --- a/crates/ethcore/sync/src/api.rs +++ b/crates/ethcore/sync/src/api.rs @@ -32,8 +32,8 @@ use std::{ use chain::{ fork_filter::ForkFilterApi, ChainSyncApi, SyncState, SyncStatus as EthSyncStatus, - ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_1, - PAR_PROTOCOL_VERSION_2, + ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, }; use ethcore::{ client::{BlockChainClient, ChainMessageType, ChainNotify, NewBlocks}, @@ -564,7 +564,11 @@ impl ChainNotify for EthSync { .register_protocol( self.eth_handler.clone(), self.subprotocol_name, - &[ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64], + &[ + ETH_PROTOCOL_VERSION_63, + ETH_PROTOCOL_VERSION_64, + ETH_PROTOCOL_VERSION_65, + ], ) .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); // register the warp sync subprotocol diff --git a/crates/ethcore/sync/src/chain/handler.rs b/crates/ethcore/sync/src/chain/handler.rs index 556c677b5..0797251c3 100644 --- a/crates/ethcore/sync/src/chain/handler.rs +++ b/crates/ethcore/sync/src/chain/handler.rs @@ -33,17 +33,14 @@ use sync_io::SyncIo; use types::{block_status::BlockStatus, ids::BlockId, BlockNumber}; use super::sync_packet::{ - PacketInfo, SyncPacket, - SyncPacket::{ - BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket, - ReceiptsPacket, SnapshotDataPacket, SnapshotManifestPacket, StatusPacket, - }, + PacketInfo, + SyncPacket::{self, *}, }; use super::{ BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester, - SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, + SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, + MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, }; /// The Chain Sync Handler: handles responses from peers @@ -67,6 +64,12 @@ impl SyncHandler { ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), + NewPooledTransactionHashesPacket => { + SyncHandler::on_peer_new_pooled_transaction_hashes(sync, io, peer, &rlp) + } + PooledTransactionsPacket => { + SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp) + } SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), _ => { @@ -732,6 +735,8 @@ impl SyncHandler { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + unfetched_pooled_transactions: Default::default(), + asking_pooled_transactions: Default::default(), ask_time: Instant::now(), last_sent_transactions: Default::default(), expired: false, @@ -792,7 +797,7 @@ impl SyncHandler { || peer.protocol_version > PAR_PROTOCOL_VERSION_2.0)) || (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 - || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0)) + || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0)) { trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); return Err(DownloaderImportError::Invalid); @@ -815,6 +820,64 @@ impl SyncHandler { Ok(()) } + /// Called when peer sends us a set of new pooled transactions + pub fn on_peer_new_pooled_transaction_hashes( + sync: &mut ChainSync, + io: &mut dyn SyncIo, + peer_id: PeerId, + tx_rlp: &Rlp, + ) -> Result<(), DownloaderImportError> { + for item in tx_rlp { + let hash = item + .as_val::() + .map_err(|_| DownloaderImportError::Invalid)?; + + if io.chain().queued_transaction(hash).is_none() { + sync.peers + .get_mut(&peer_id) + .map(|peer| peer.unfetched_pooled_transactions.insert(hash)); + } + } + + Ok(()) + } + + /// Called when peer sends us a list of pooled transactions + pub fn on_peer_pooled_transactions( + sync: &ChainSync, + io: &mut dyn SyncIo, + peer_id: PeerId, + tx_rlp: &Rlp, + ) -> Result<(), DownloaderImportError> { + let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) { + Some(peer) => peer, + None => { + trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id); + return Ok(()); + } + }; + + let item_count = tx_rlp.item_count()?; + if item_count > peer.asking_pooled_transactions.len() { + trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id); + return Err(DownloaderImportError::Invalid); + } + trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count); + let mut transactions = Vec::with_capacity(item_count); + for i in 0..item_count { + let rlp = tx_rlp.at(i)?; + let tx = if rlp.is_list() { + rlp.as_raw() + } else { + rlp.data()? + } + .to_vec(); + transactions.push(tx); + } + io.chain().queue_transactions(transactions, peer_id); + Ok(()) + } + /// Called when peer sends us new transactions pub fn on_peer_transactions( sync: &ChainSync, diff --git a/crates/ethcore/sync/src/chain/mod.rs b/crates/ethcore/sync/src/chain/mod.rs index 85b5079c0..a8b3bdd60 100644 --- a/crates/ethcore/sync/src/chain/mod.rs +++ b/crates/ethcore/sync/src/chain/mod.rs @@ -153,6 +153,8 @@ impl From for PacketProcessError { } } +/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count). +pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11); /// 64 version of Ethereum protocol. pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11); /// 63 version of Ethereum protocol. @@ -165,6 +167,7 @@ pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16); pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_HEADERS_TO_SEND: usize = 512; pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; +pub const MAX_TRANSACTIONS_TO_REQUEST: usize = 256; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; @@ -184,6 +187,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(5); const HEADERS_TIMEOUT: Duration = Duration::from_secs(15); const BODIES_TIMEOUT: Duration = Duration::from_secs(20); const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); +const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10); const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); @@ -286,6 +290,7 @@ pub enum PeerAsking { BlockHeaders, BlockBodies, BlockReceipts, + PooledTransactions, SnapshotManifest, SnapshotData, } @@ -337,6 +342,10 @@ pub struct PeerInfo { asking_blocks: Vec, /// Holds requested header hash if currently requesting block header by hash asking_hash: Option, + /// Hashes of transactions to be requested. + unfetched_pooled_transactions: H256FastSet, + /// Hashes of the transactions we're requesting. + asking_pooled_transactions: Vec, /// Holds requested snapshot chunk hash if any. asking_snapshot_data: Option, /// Request timestamp @@ -686,6 +695,55 @@ pub struct ChainSync { warp_sync: WarpSync, } +#[derive(Debug, Default)] +struct GetPooledTransactionsReport { + found: H256FastSet, + missing: H256FastSet, + not_sent: H256FastSet, +} + +impl GetPooledTransactionsReport { + fn generate( + mut asked: Vec, + received: impl IntoIterator, + ) -> Result { + let mut out = GetPooledTransactionsReport::default(); + + let asked_set = asked.iter().copied().collect::(); + let mut asked_iter = asked.drain(std::ops::RangeFull); + let mut txs = received.into_iter(); + let mut next_received: Option = None; + loop { + if let Some(received) = next_received { + if !asked_set.contains(&received) { + return Err(received); + } + + if let Some(requested) = asked_iter.next() { + if requested == received { + next_received = None; + out.found.insert(requested); + } else { + out.missing.insert(requested); + } + } else { + break; + } + } else { + if let Some(tx) = txs.next() { + next_received = Some(tx); + } else { + break; + } + } + } + + out.not_sent = asked_iter.collect(); + + Ok(out) + } +} + impl ChainSync { pub fn new( config: SyncConfig, @@ -736,7 +794,7 @@ impl ChainSync { SyncStatus { state: self.state.clone(), - protocol_version: ETH_PROTOCOL_VERSION_64.0, + protocol_version: ETH_PROTOCOL_VERSION_65.0, network_id: self.network_id, start_block_number: self.starting_block, last_imported_block_number: Some(last_imported_number), @@ -784,10 +842,37 @@ impl ChainSync { /// Updates transactions were received by a peer pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) { - if let Some(peer_info) = self.peers.get_mut(&peer_id) { - peer_info - .last_sent_transactions - .extend(txs.iter().map(|tx| tx.hash())); + // Remove imported txs from all request queues + let imported = txs.iter().map(|tx| tx.hash()).collect::(); + for (pid, peer_info) in &mut self.peers { + peer_info.unfetched_pooled_transactions = peer_info + .unfetched_pooled_transactions + .difference(&imported) + .copied() + .collect(); + if *pid == peer_id { + match GetPooledTransactionsReport::generate( + std::mem::replace(&mut peer_info.asking_pooled_transactions, Vec::new()), + txs.iter().map(UnverifiedTransaction::hash), + ) { + Ok(report) => { + // Some transactions were not received in this batch because of size. + // Add them back to request feed. + peer_info.unfetched_pooled_transactions = peer_info + .unfetched_pooled_transactions + .union(&report.not_sent) + .copied() + .collect(); + } + Err(_unknown_tx) => { + // punish peer? + } + } + + peer_info + .last_sent_transactions + .extend(txs.iter().map(|tx| tx.hash())); + } } } @@ -1120,20 +1205,27 @@ impl ChainSync { // check queue fullness let ancient_block_fullness = io.chain().ancient_block_queue_fullness(); if force || equal_or_higher_difficulty { - let mut is_complete = false; - if let Some(old_blocks) = self.old_blocks.as_mut() { - // check if ancient queue can take more request or not. - if ancient_block_fullness < 0.8 { - if let Some(request) = old_blocks.request_blocks(peer_id, io, num_active_peers) { - SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); - return; - } - is_complete = old_blocks.is_complete(); + if ancient_block_fullness < 0.8 { + if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(peer_id, io, num_active_peers)) { + SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); + return; } } - if is_complete { // if old_blocks is in complete state, set it to None. - self.old_blocks = None; - } + + // and if we have nothing else to do, get the peer to give us at least some of announced but unfetched transactions + let mut to_send = Default::default(); + if let Some(peer) = self.peers.get_mut(&peer_id) { + if peer.asking_pooled_transactions.is_empty() { + to_send = peer.unfetched_pooled_transactions.drain().take(MAX_TRANSACTIONS_TO_REQUEST).collect::>(); + peer.asking_pooled_transactions = to_send.clone(); + } + } + + if !to_send.is_empty() { + SyncRequester::request_pooled_transactions(self, io, peer_id, &to_send); + + return; + } } else { trace!( target: "sync", @@ -1321,6 +1413,7 @@ impl ChainSync { PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT, PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT, PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT, + PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT, PeerAsking::Nothing => false, PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, @@ -1629,6 +1722,8 @@ pub mod tests { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + unfetched_pooled_transactions: Default::default(), + asking_pooled_transactions: Default::default(), ask_time: Instant::now(), last_sent_transactions: Default::default(), expired: false, @@ -1806,4 +1901,29 @@ pub mod tests { let status = io.chain.miner.queue_status(); assert_eq!(status.status.transaction_count, 0); } + + #[test] + fn generate_pooled_transactions_report() { + let asked = vec![1, 2, 3, 4, 5, 6, 7] + .into_iter() + .map(H256::from_low_u64_be); + let received = vec![2, 4, 5].into_iter().map(H256::from_low_u64_be); + + let report = GetPooledTransactionsReport::generate(asked.collect(), received).unwrap(); + assert_eq!( + report.found, + vec![2, 4, 5] + .into_iter() + .map(H256::from_low_u64_be) + .collect() + ); + assert_eq!( + report.missing, + vec![1, 3].into_iter().map(H256::from_low_u64_be).collect() + ); + assert_eq!( + report.not_sent, + vec![6, 7].into_iter().map(H256::from_low_u64_be).collect() + ); + } } diff --git a/crates/ethcore/sync/src/chain/propagator.rs b/crates/ethcore/sync/src/chain/propagator.rs index 02b72df1a..d8045d556 100644 --- a/crates/ethcore/sync/src/chain/propagator.rs +++ b/crates/ethcore/sync/src/chain/propagator.rs @@ -25,16 +25,15 @@ use rlp::RlpStream; use sync_io::SyncIo; use types::{blockchain_info::BlockChainInfo, transaction::SignedTransaction, BlockNumber}; -use super::sync_packet::{ - SyncPacket, - SyncPacket::{ConsensusDataPacket, NewBlockHashesPacket, NewBlockPacket, TransactionsPacket}, -}; +use super::sync_packet::SyncPacket::{self, *}; use super::{ - random, ChainSync, MAX_PEERS_PROPAGATION, MAX_PEER_LAG_PROPAGATION, + random, ChainSync, ETH_PROTOCOL_VERSION_65, MAX_PEERS_PROPAGATION, MAX_PEER_LAG_PROPAGATION, MAX_TRANSACTION_PACKET_SIZE, MIN_PEERS_PROPAGATION, }; +const NEW_POOLED_HASHES_LIMIT: usize = 4096; + /// The Chain Sync Propagator: propagates data to peers pub struct SyncPropagator; @@ -175,14 +174,29 @@ impl SyncPropagator { } packet.out() }; + let all_transactions_hashes_rlp = + rlp::encode_list(&all_transactions_hashes.iter().copied().collect::>()); // Clear old transactions from stats sync.transactions_stats.retain(&all_transactions_hashes); - let send_packet = |io: &mut dyn SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { + let send_packet = |io: &mut dyn SyncIo, + peer_id: PeerId, + is_hashes: bool, + sent: usize, + rlp: Bytes| { let size = rlp.len(); - SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp); - trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); + SyncPropagator::send_packet( + io, + peer_id, + if is_hashes { + NewPooledTransactionHashesPacket + } else { + TransactionsPacket + }, + rlp, + ); + trace!(target: "sync", "{:02} <- {} ({} entries; {} bytes)", peer_id, if is_hashes { "NewPooledTransactionHashes" } else { "Transactions" }, sent, size); }; let block_number = io.chain().chain_info().best_block_number; @@ -200,6 +214,8 @@ impl SyncPropagator { let peer_info = sync.peers.get_mut(&peer_id) .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); + let is_hashes = peer_info.protocol_version >= ETH_PROTOCOL_VERSION_65.0; + // Send all transactions, if the peer doesn't know about anything if peer_info.last_sent_transactions.is_empty() { // update stats @@ -209,12 +225,14 @@ impl SyncPropagator { } peer_info.last_sent_transactions = all_transactions_hashes.clone(); - send_packet( - io, - peer_id, - all_transactions_hashes.len(), - all_transactions_rlp.clone(), - ); + let rlp = { + if is_hashes { + all_transactions_hashes_rlp.clone() + } else { + all_transactions_rlp.clone() + } + }; + send_packet(io, peer_id, is_hashes, all_transactions_hashes.len(), rlp); sent_to_peers.insert(peer_id); max_sent = cmp::max(max_sent, all_transactions_hashes.len()); continue; @@ -231,32 +249,38 @@ impl SyncPropagator { // Construct RLP let (packet, to_send) = { - let mut to_send = to_send; + let mut to_send_new = HashSet::new(); let mut packet = RlpStream::new(); packet.begin_unbounded_list(); - let mut pushed = 0; for tx in &transactions { let hash = tx.hash(); if to_send.contains(&hash) { - tx.rlp_append(&mut packet); - pushed += 1; - // this is not hard limit and we are okay with it. Max default tx size is 300k. - if packet.as_raw().len() >= MAX_TRANSACTION_PACKET_SIZE { - // Maximal packet size reached just proceed with sending - debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); - to_send = to_send.into_iter().take(pushed).collect(); - break; + if is_hashes { + if to_send_new.len() >= NEW_POOLED_HASHES_LIMIT { + debug!(target: "sync", "NewPooledTransactionHashes length limit reached. Sending incomplete list of {}/{} transactions.", to_send_new.len(), to_send.len()); + break; + } + packet.append(&hash); + to_send_new.insert(hash); + } else { + tx.rlp_append(&mut packet); + to_send_new.insert(hash); + // this is not hard limit and we are okay with it. Max default tx size is 300k. + if packet.as_raw().len() >= MAX_TRANSACTION_PACKET_SIZE { + // Maximal packet size reached just proceed with sending + debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", to_send_new.len(), to_send.len()); + break; + } } } } packet.finalize_unbounded_list(); - (packet, to_send) + (packet, to_send_new) }; // Update stats let id = io.peer_session_info(peer_id).and_then(|info| info.id); for hash in &to_send { - // update stats stats.propagated(hash, id, block_number); } @@ -265,7 +289,7 @@ impl SyncPropagator { .chain(&to_send) .cloned() .collect(); - send_packet(io, peer_id, to_send.len(), packet.out()); + send_packet(io, peer_id, is_hashes, to_send.len(), packet.out()); sent_to_peers.insert(peer_id); max_sent = cmp::max(max_sent, to_send.len()); } @@ -468,6 +492,8 @@ mod tests { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, + unfetched_pooled_transactions: Default::default(), + asking_pooled_transactions: Default::default(), ask_time: Instant::now(), last_sent_transactions: Default::default(), expired: false, diff --git a/crates/ethcore/sync/src/chain/requester.rs b/crates/ethcore/sync/src/chain/requester.rs index 1aba74f6b..67529a9d6 100644 --- a/crates/ethcore/sync/src/chain/requester.rs +++ b/crates/ethcore/sync/src/chain/requester.rs @@ -23,13 +23,7 @@ use std::time::Instant; use sync_io::SyncIo; use types::BlockNumber; -use super::sync_packet::{ - SyncPacket, - SyncPacket::{ - GetBlockBodiesPacket, GetBlockHeadersPacket, GetReceiptsPacket, GetSnapshotDataPacket, - GetSnapshotManifestPacket, - }, -}; +use super::sync_packet::{SyncPacket::*, *}; use super::{BlockSet, ChainSync, PeerAsking}; @@ -109,6 +103,29 @@ impl SyncRequester { ); } + /// Request pooled transactions from a peer + pub fn request_pooled_transactions( + sync: &mut ChainSync, + io: &mut dyn SyncIo, + peer_id: PeerId, + hashes: &[H256], + ) { + trace!(target: "sync", "{} <- GetPooledTransactions: {:?}", peer_id, hashes); + let mut rlp = RlpStream::new_list(hashes.len()); + for h in hashes { + rlp.append(h); + } + + SyncRequester::send_request( + sync, + io, + peer_id, + PeerAsking::PooledTransactions, + PooledTransactionsPacket, + rlp.out(), + ) + } + /// Find some headers or blocks to download for a peer. pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) { // find chunk data to download diff --git a/crates/ethcore/sync/src/chain/supplier.rs b/crates/ethcore/sync/src/chain/supplier.rs index 7713352c5..ee0686ffa 100644 --- a/crates/ethcore/sync/src/chain/supplier.rs +++ b/crates/ethcore/sync/src/chain/supplier.rs @@ -31,15 +31,7 @@ use types::{ids::BlockId, BlockNumber}; use sync_io::SyncIo; -use super::sync_packet::{ - PacketInfo, SyncPacket, - SyncPacket::{ - BlockBodiesPacket, BlockHeadersPacket, ConsensusDataPacket, GetBlockBodiesPacket, - GetBlockHeadersPacket, GetReceiptsPacket, GetSnapshotDataPacket, GetSnapshotManifestPacket, - ReceiptsPacket, SnapshotDataPacket, SnapshotManifestPacket, StatusPacket, - TransactionsPacket, - }, -}; +use super::sync_packet::{PacketInfo, SyncPacket, SyncPacket::*}; use super::{ ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, @@ -64,6 +56,14 @@ impl SyncSupplier { if let Some(id) = SyncPacket::from_u8(packet_id) { let result = match id { + GetPooledTransactionsPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + SyncSupplier::return_pooled_transactions, + |e| format!("Error sending pooled transactions: {:?}", e), + ), + GetBlockBodiesPacket => SyncSupplier::return_rlp( io, &rlp, @@ -273,6 +273,28 @@ impl SyncSupplier { Ok(Some((BlockHeadersPacket, rlp))) } + /// Respond to GetPooledTransactions request + fn return_pooled_transactions(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let mut added = 0; + let mut rlp = RlpStream::new(); + rlp.begin_unbounded_list(); + for v in r { + if let Ok(hash) = v.as_val::() { + if let Some(tx) = io.chain().queued_transaction(hash) { + tx.signed().rlp_append(&mut rlp); + added += 1; + if rlp.len() > PAYLOAD_SOFT_LIMIT { + break; + } + } + } + } + rlp.finalize_unbounded_list(); + + trace!(target: "sync", "{} -> GetPooledTransactions: returned {} entries", peer_id, added); + Ok(Some((PooledTransactionsPacket, rlp))) + } + /// Respond to GetBlockBodies request fn return_block_bodies(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { let mut count = r.item_count().unwrap_or(0); diff --git a/crates/ethcore/sync/src/chain/sync_packet.rs b/crates/ethcore/sync/src/chain/sync_packet.rs index a868e1c35..dd2afc39a 100644 --- a/crates/ethcore/sync/src/chain/sync_packet.rs +++ b/crates/ethcore/sync/src/chain/sync_packet.rs @@ -44,6 +44,9 @@ pub enum SyncPacket { GetBlockBodiesPacket = 0x05, BlockBodiesPacket = 0x06, NewBlockPacket = 0x07, + NewPooledTransactionHashesPacket = 0x08, + GetPooledTransactionsPacket = 0x09, + PooledTransactionsPacket = 0x0a, //GetNodeDataPacket = 0x0d, //NodeDataPacket = 0x0e, @@ -80,6 +83,9 @@ impl PacketInfo for SyncPacket { | GetBlockBodiesPacket | BlockBodiesPacket | NewBlockPacket + | NewPooledTransactionHashesPacket + | GetPooledTransactionsPacket + | PooledTransactionsPacket //| GetNodeDataPacket //| NodeDataPacket | GetReceiptsPacket diff --git a/crates/ethcore/sync/src/tests/helpers.rs b/crates/ethcore/sync/src/tests/helpers.rs index 4beb7d1f1..bd49baf36 100644 --- a/crates/ethcore/sync/src/tests/helpers.rs +++ b/crates/ethcore/sync/src/tests/helpers.rs @@ -18,7 +18,7 @@ use api::PAR_PROTOCOL; use bytes::Bytes; use chain::{ sync_packet::{PacketInfo, SyncPacket}, - ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_2, + ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_65, PAR_PROTOCOL_VERSION_2, }; use ethcore::{ client::{ @@ -172,7 +172,7 @@ where if protocol == PAR_PROTOCOL { PAR_PROTOCOL_VERSION_2.0 } else { - ETH_PROTOCOL_VERSION_64.0 + ETH_PROTOCOL_VERSION_65.0 } } diff --git a/crates/rpc/src/v1/impls/eth.rs b/crates/rpc/src/v1/impls/eth.rs index 3c59a151a..2180bb323 100644 --- a/crates/rpc/src/v1/impls/eth.rs +++ b/crates/rpc/src/v1/impls/eth.rs @@ -309,7 +309,7 @@ where } fn transaction(&self, id: PendingTransactionId) -> Result> { - let client_transaction = |id| match self.client.transaction(id) { + let client_transaction = |id| match self.client.block_transaction(id) { Some(t) => Ok(Some(Transaction::from_localized(t))), None => Ok(None), };