Implement eth/65 (#352)

* EIP-2464: eth/65: transaction announcements and retrievals

* Updates to eth-65 implementatiom

- Replace deprecated syntax and method name
- Replace call to removed method
- Reimplement rlp::Encodable for SignedTransaction

* fmt

* More updates

- Replace calls to removed methods
- More dyns

* Apply requested changes

- Avoid unused variable warnings
- Avoid implementing Encodable trait for Transaction
- Update message encoding to EIP-2718 transaction format

* Update sync stats of eth-65 peers properly

* fmt & fix error propagation

* Fix: Add correct subset of transactions to stats.

* Update list of last sent transactions correctly.

Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me>
Co-authored-by: Karim Agha <karim.dev@gmail.com>
Co-authored-by: rakita <rakita@users.noreply.github.com>
This commit is contained in:
Jochen Müller 2021-04-27 15:33:40 +02:00 committed by GitHub
parent 3b3ecf6676
commit c03cc15468
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 349 additions and 81 deletions

View File

@ -2344,11 +2344,15 @@ impl BlockChainClient for Client {
Some(keys) Some(keys)
} }
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> { fn block_transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
self.transaction_address(id) self.transaction_address(id)
.and_then(|address| self.chain.read().transaction(&address)) .and_then(|address| self.chain.read().transaction(&address))
} }
fn queued_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>> {
self.importer.miner.transaction(&hash)
}
fn uncle(&self, id: UncleId) -> Option<encoded::Header> { fn uncle(&self, id: UncleId) -> Option<encoded::Header> {
let index = id.position; let index = id.position;
self.block_body(id.block) self.block_body(id.block)
@ -3031,11 +3035,11 @@ impl super::traits::EngineClient for Client {
} }
fn block_number(&self, id: BlockId) -> Option<BlockNumber> { fn block_number(&self, id: BlockId) -> Option<BlockNumber> {
BlockChainClient::block_number(self, id) <dyn BlockChainClient>::block_number(self, id)
} }
fn block_header(&self, id: BlockId) -> Option<encoded::Header> { fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
BlockChainClient::block_header(self, id) <dyn BlockChainClient>::block_header(self, id)
} }
} }

View File

@ -809,9 +809,12 @@ impl BlockChainClient for TestBlockChainClient {
) -> Option<Vec<H256>> { ) -> Option<Vec<H256>> {
None None
} }
fn transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> { fn block_transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
None // Simple default. None // Simple default.
} }
fn queued_transaction(&self, _hash: H256) -> Option<Arc<VerifiedTransaction>> {
None
}
fn uncle(&self, _id: UncleId) -> Option<encoded::Header> { fn uncle(&self, _id: UncleId) -> Option<encoded::Header> {
None // Simple default. None // Simple default.
@ -1144,11 +1147,11 @@ impl super::traits::EngineClient for TestBlockChainClient {
} }
fn block_number(&self, id: BlockId) -> Option<BlockNumber> { fn block_number(&self, id: BlockId) -> Option<BlockNumber> {
BlockChainClient::block_number(self, id) <dyn BlockChainClient>::block_number(self, id)
} }
fn block_header(&self, id: BlockId) -> Option<encoded::Header> { fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
BlockChainClient::block_header(self, id) <dyn BlockChainClient>::block_header(self, id)
} }
} }

View File

@ -312,7 +312,10 @@ pub trait BlockChainClient:
) -> Option<Vec<H256>>; ) -> Option<Vec<H256>>;
/// Get transaction with given hash. /// Get transaction with given hash.
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>; fn block_transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;
/// Get pool transaction with a given hash.
fn queued_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>>;
/// Get uncle with given id. /// Get uncle with given id.
fn uncle(&self, id: UncleId) -> Option<encoded::Header>; fn uncle(&self, id: UncleId) -> Option<encoded::Header>;

View File

@ -32,8 +32,8 @@ use std::{
use chain::{ use chain::{
fork_filter::ForkFilterApi, ChainSyncApi, SyncState, SyncStatus as EthSyncStatus, fork_filter::ForkFilterApi, ChainSyncApi, SyncState, SyncStatus as EthSyncStatus,
ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_1, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65,
PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2,
}; };
use ethcore::{ use ethcore::{
client::{BlockChainClient, ChainMessageType, ChainNotify, NewBlocks}, client::{BlockChainClient, ChainMessageType, ChainNotify, NewBlocks},
@ -564,7 +564,11 @@ impl ChainNotify for EthSync {
.register_protocol( .register_protocol(
self.eth_handler.clone(), self.eth_handler.clone(),
self.subprotocol_name, self.subprotocol_name,
&[ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64], &[
ETH_PROTOCOL_VERSION_63,
ETH_PROTOCOL_VERSION_64,
ETH_PROTOCOL_VERSION_65,
],
) )
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol // register the warp sync subprotocol

View File

@ -33,17 +33,14 @@ use sync_io::SyncIo;
use types::{block_status::BlockStatus, ids::BlockId, BlockNumber}; use types::{block_status::BlockStatus, ids::BlockId, BlockNumber};
use super::sync_packet::{ use super::sync_packet::{
PacketInfo, SyncPacket, PacketInfo,
SyncPacket::{ SyncPacket::{self, *},
BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket,
ReceiptsPacket, SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
},
}; };
use super::{ use super::{
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester, BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2,
}; };
/// The Chain Sync Handler: handles responses from peers /// The Chain Sync Handler: handles responses from peers
@ -67,6 +64,12 @@ impl SyncHandler {
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
NewPooledTransactionHashesPacket => {
SyncHandler::on_peer_new_pooled_transaction_hashes(sync, io, peer, &rlp)
}
PooledTransactionsPacket => {
SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp)
}
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
_ => { _ => {
@ -732,6 +735,8 @@ impl SyncHandler {
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
asking_hash: None, asking_hash: None,
unfetched_pooled_transactions: Default::default(),
asking_pooled_transactions: Default::default(),
ask_time: Instant::now(), ask_time: Instant::now(),
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
expired: false, expired: false,
@ -792,7 +797,7 @@ impl SyncHandler {
|| peer.protocol_version > PAR_PROTOCOL_VERSION_2.0)) || peer.protocol_version > PAR_PROTOCOL_VERSION_2.0))
|| (!warp_protocol || (!warp_protocol
&& (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0
|| peer.protocol_version > ETH_PROTOCOL_VERSION_64.0)) || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0))
{ {
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Err(DownloaderImportError::Invalid); return Err(DownloaderImportError::Invalid);
@ -815,6 +820,64 @@ impl SyncHandler {
Ok(()) Ok(())
} }
/// Called when peer sends us a set of new pooled transactions
pub fn on_peer_new_pooled_transaction_hashes(
sync: &mut ChainSync,
io: &mut dyn SyncIo,
peer_id: PeerId,
tx_rlp: &Rlp,
) -> Result<(), DownloaderImportError> {
for item in tx_rlp {
let hash = item
.as_val::<H256>()
.map_err(|_| DownloaderImportError::Invalid)?;
if io.chain().queued_transaction(hash).is_none() {
sync.peers
.get_mut(&peer_id)
.map(|peer| peer.unfetched_pooled_transactions.insert(hash));
}
}
Ok(())
}
/// Called when peer sends us a list of pooled transactions
pub fn on_peer_pooled_transactions(
sync: &ChainSync,
io: &mut dyn SyncIo,
peer_id: PeerId,
tx_rlp: &Rlp,
) -> Result<(), DownloaderImportError> {
let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) {
Some(peer) => peer,
None => {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
return Ok(());
}
};
let item_count = tx_rlp.item_count()?;
if item_count > peer.asking_pooled_transactions.len() {
trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id);
return Err(DownloaderImportError::Invalid);
}
trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count);
let mut transactions = Vec::with_capacity(item_count);
for i in 0..item_count {
let rlp = tx_rlp.at(i)?;
let tx = if rlp.is_list() {
rlp.as_raw()
} else {
rlp.data()?
}
.to_vec();
transactions.push(tx);
}
io.chain().queue_transactions(transactions, peer_id);
Ok(())
}
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
pub fn on_peer_transactions( pub fn on_peer_transactions(
sync: &ChainSync, sync: &ChainSync,

View File

@ -153,6 +153,8 @@ impl From<DecoderError> for PacketProcessError {
} }
} }
/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11);
/// 64 version of Ethereum protocol. /// 64 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11); pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
/// 63 version of Ethereum protocol. /// 63 version of Ethereum protocol.
@ -165,6 +167,7 @@ pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);
pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_BODIES_TO_SEND: usize = 256;
pub const MAX_HEADERS_TO_SEND: usize = 512; pub const MAX_HEADERS_TO_SEND: usize = 512;
pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
pub const MAX_TRANSACTIONS_TO_REQUEST: usize = 256;
const MIN_PEERS_PROPAGATION: usize = 4; const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
@ -184,6 +187,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(5);
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15); const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const BODIES_TIMEOUT: Duration = Duration::from_secs(20); const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10); const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10);
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
@ -286,6 +290,7 @@ pub enum PeerAsking {
BlockHeaders, BlockHeaders,
BlockBodies, BlockBodies,
BlockReceipts, BlockReceipts,
PooledTransactions,
SnapshotManifest, SnapshotManifest,
SnapshotData, SnapshotData,
} }
@ -337,6 +342,10 @@ pub struct PeerInfo {
asking_blocks: Vec<H256>, asking_blocks: Vec<H256>,
/// Holds requested header hash if currently requesting block header by hash /// Holds requested header hash if currently requesting block header by hash
asking_hash: Option<H256>, asking_hash: Option<H256>,
/// Hashes of transactions to be requested.
unfetched_pooled_transactions: H256FastSet,
/// Hashes of the transactions we're requesting.
asking_pooled_transactions: Vec<H256>,
/// Holds requested snapshot chunk hash if any. /// Holds requested snapshot chunk hash if any.
asking_snapshot_data: Option<H256>, asking_snapshot_data: Option<H256>,
/// Request timestamp /// Request timestamp
@ -686,6 +695,55 @@ pub struct ChainSync {
warp_sync: WarpSync, warp_sync: WarpSync,
} }
#[derive(Debug, Default)]
struct GetPooledTransactionsReport {
found: H256FastSet,
missing: H256FastSet,
not_sent: H256FastSet,
}
impl GetPooledTransactionsReport {
fn generate(
mut asked: Vec<H256>,
received: impl IntoIterator<Item = H256>,
) -> Result<Self, H256> {
let mut out = GetPooledTransactionsReport::default();
let asked_set = asked.iter().copied().collect::<H256FastSet>();
let mut asked_iter = asked.drain(std::ops::RangeFull);
let mut txs = received.into_iter();
let mut next_received: Option<H256> = None;
loop {
if let Some(received) = next_received {
if !asked_set.contains(&received) {
return Err(received);
}
if let Some(requested) = asked_iter.next() {
if requested == received {
next_received = None;
out.found.insert(requested);
} else {
out.missing.insert(requested);
}
} else {
break;
}
} else {
if let Some(tx) = txs.next() {
next_received = Some(tx);
} else {
break;
}
}
}
out.not_sent = asked_iter.collect();
Ok(out)
}
}
impl ChainSync { impl ChainSync {
pub fn new( pub fn new(
config: SyncConfig, config: SyncConfig,
@ -736,7 +794,7 @@ impl ChainSync {
SyncStatus { SyncStatus {
state: self.state.clone(), state: self.state.clone(),
protocol_version: ETH_PROTOCOL_VERSION_64.0, protocol_version: ETH_PROTOCOL_VERSION_65.0,
network_id: self.network_id, network_id: self.network_id,
start_block_number: self.starting_block, start_block_number: self.starting_block,
last_imported_block_number: Some(last_imported_number), last_imported_block_number: Some(last_imported_number),
@ -784,10 +842,37 @@ impl ChainSync {
/// Updates transactions were received by a peer /// Updates transactions were received by a peer
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) { pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
if let Some(peer_info) = self.peers.get_mut(&peer_id) { // Remove imported txs from all request queues
peer_info let imported = txs.iter().map(|tx| tx.hash()).collect::<H256FastSet>();
.last_sent_transactions for (pid, peer_info) in &mut self.peers {
.extend(txs.iter().map(|tx| tx.hash())); peer_info.unfetched_pooled_transactions = peer_info
.unfetched_pooled_transactions
.difference(&imported)
.copied()
.collect();
if *pid == peer_id {
match GetPooledTransactionsReport::generate(
std::mem::replace(&mut peer_info.asking_pooled_transactions, Vec::new()),
txs.iter().map(UnverifiedTransaction::hash),
) {
Ok(report) => {
// Some transactions were not received in this batch because of size.
// Add them back to request feed.
peer_info.unfetched_pooled_transactions = peer_info
.unfetched_pooled_transactions
.union(&report.not_sent)
.copied()
.collect();
}
Err(_unknown_tx) => {
// punish peer?
}
}
peer_info
.last_sent_transactions
.extend(txs.iter().map(|tx| tx.hash()));
}
} }
} }
@ -1120,20 +1205,27 @@ impl ChainSync {
// check queue fullness // check queue fullness
let ancient_block_fullness = io.chain().ancient_block_queue_fullness(); let ancient_block_fullness = io.chain().ancient_block_queue_fullness();
if force || equal_or_higher_difficulty { if force || equal_or_higher_difficulty {
let mut is_complete = false; if ancient_block_fullness < 0.8 {
if let Some(old_blocks) = self.old_blocks.as_mut() { if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(peer_id, io, num_active_peers)) {
// check if ancient queue can take more request or not. SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks);
if ancient_block_fullness < 0.8 { return;
if let Some(request) = old_blocks.request_blocks(peer_id, io, num_active_peers) {
SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks);
return;
}
is_complete = old_blocks.is_complete();
} }
} }
if is_complete { // if old_blocks is in complete state, set it to None.
self.old_blocks = None; // and if we have nothing else to do, get the peer to give us at least some of announced but unfetched transactions
} let mut to_send = Default::default();
if let Some(peer) = self.peers.get_mut(&peer_id) {
if peer.asking_pooled_transactions.is_empty() {
to_send = peer.unfetched_pooled_transactions.drain().take(MAX_TRANSACTIONS_TO_REQUEST).collect::<Vec<_>>();
peer.asking_pooled_transactions = to_send.clone();
}
}
if !to_send.is_empty() {
SyncRequester::request_pooled_transactions(self, io, peer_id, &to_send);
return;
}
} else { } else {
trace!( trace!(
target: "sync", target: "sync",
@ -1321,6 +1413,7 @@ impl ChainSync {
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT, PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT, PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT, PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT,
PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT,
PeerAsking::Nothing => false, PeerAsking::Nothing => false,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT, PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT, PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
@ -1629,6 +1722,8 @@ pub mod tests {
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
asking_hash: None, asking_hash: None,
unfetched_pooled_transactions: Default::default(),
asking_pooled_transactions: Default::default(),
ask_time: Instant::now(), ask_time: Instant::now(),
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
expired: false, expired: false,
@ -1806,4 +1901,29 @@ pub mod tests {
let status = io.chain.miner.queue_status(); let status = io.chain.miner.queue_status();
assert_eq!(status.status.transaction_count, 0); assert_eq!(status.status.transaction_count, 0);
} }
#[test]
fn generate_pooled_transactions_report() {
let asked = vec![1, 2, 3, 4, 5, 6, 7]
.into_iter()
.map(H256::from_low_u64_be);
let received = vec![2, 4, 5].into_iter().map(H256::from_low_u64_be);
let report = GetPooledTransactionsReport::generate(asked.collect(), received).unwrap();
assert_eq!(
report.found,
vec![2, 4, 5]
.into_iter()
.map(H256::from_low_u64_be)
.collect()
);
assert_eq!(
report.missing,
vec![1, 3].into_iter().map(H256::from_low_u64_be).collect()
);
assert_eq!(
report.not_sent,
vec![6, 7].into_iter().map(H256::from_low_u64_be).collect()
);
}
} }

View File

@ -25,16 +25,15 @@ use rlp::RlpStream;
use sync_io::SyncIo; use sync_io::SyncIo;
use types::{blockchain_info::BlockChainInfo, transaction::SignedTransaction, BlockNumber}; use types::{blockchain_info::BlockChainInfo, transaction::SignedTransaction, BlockNumber};
use super::sync_packet::{ use super::sync_packet::SyncPacket::{self, *};
SyncPacket,
SyncPacket::{ConsensusDataPacket, NewBlockHashesPacket, NewBlockPacket, TransactionsPacket},
};
use super::{ use super::{
random, ChainSync, MAX_PEERS_PROPAGATION, MAX_PEER_LAG_PROPAGATION, random, ChainSync, ETH_PROTOCOL_VERSION_65, MAX_PEERS_PROPAGATION, MAX_PEER_LAG_PROPAGATION,
MAX_TRANSACTION_PACKET_SIZE, MIN_PEERS_PROPAGATION, MAX_TRANSACTION_PACKET_SIZE, MIN_PEERS_PROPAGATION,
}; };
const NEW_POOLED_HASHES_LIMIT: usize = 4096;
/// The Chain Sync Propagator: propagates data to peers /// The Chain Sync Propagator: propagates data to peers
pub struct SyncPropagator; pub struct SyncPropagator;
@ -175,14 +174,29 @@ impl SyncPropagator {
} }
packet.out() packet.out()
}; };
let all_transactions_hashes_rlp =
rlp::encode_list(&all_transactions_hashes.iter().copied().collect::<Vec<_>>());
// Clear old transactions from stats // Clear old transactions from stats
sync.transactions_stats.retain(&all_transactions_hashes); sync.transactions_stats.retain(&all_transactions_hashes);
let send_packet = |io: &mut dyn SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { let send_packet = |io: &mut dyn SyncIo,
peer_id: PeerId,
is_hashes: bool,
sent: usize,
rlp: Bytes| {
let size = rlp.len(); let size = rlp.len();
SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp); SyncPropagator::send_packet(
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); io,
peer_id,
if is_hashes {
NewPooledTransactionHashesPacket
} else {
TransactionsPacket
},
rlp,
);
trace!(target: "sync", "{:02} <- {} ({} entries; {} bytes)", peer_id, if is_hashes { "NewPooledTransactionHashes" } else { "Transactions" }, sent, size);
}; };
let block_number = io.chain().chain_info().best_block_number; let block_number = io.chain().chain_info().best_block_number;
@ -200,6 +214,8 @@ impl SyncPropagator {
let peer_info = sync.peers.get_mut(&peer_id) let peer_info = sync.peers.get_mut(&peer_id)
.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
let is_hashes = peer_info.protocol_version >= ETH_PROTOCOL_VERSION_65.0;
// Send all transactions, if the peer doesn't know about anything // Send all transactions, if the peer doesn't know about anything
if peer_info.last_sent_transactions.is_empty() { if peer_info.last_sent_transactions.is_empty() {
// update stats // update stats
@ -209,12 +225,14 @@ impl SyncPropagator {
} }
peer_info.last_sent_transactions = all_transactions_hashes.clone(); peer_info.last_sent_transactions = all_transactions_hashes.clone();
send_packet( let rlp = {
io, if is_hashes {
peer_id, all_transactions_hashes_rlp.clone()
all_transactions_hashes.len(), } else {
all_transactions_rlp.clone(), all_transactions_rlp.clone()
); }
};
send_packet(io, peer_id, is_hashes, all_transactions_hashes.len(), rlp);
sent_to_peers.insert(peer_id); sent_to_peers.insert(peer_id);
max_sent = cmp::max(max_sent, all_transactions_hashes.len()); max_sent = cmp::max(max_sent, all_transactions_hashes.len());
continue; continue;
@ -231,32 +249,38 @@ impl SyncPropagator {
// Construct RLP // Construct RLP
let (packet, to_send) = { let (packet, to_send) = {
let mut to_send = to_send; let mut to_send_new = HashSet::new();
let mut packet = RlpStream::new(); let mut packet = RlpStream::new();
packet.begin_unbounded_list(); packet.begin_unbounded_list();
let mut pushed = 0;
for tx in &transactions { for tx in &transactions {
let hash = tx.hash(); let hash = tx.hash();
if to_send.contains(&hash) { if to_send.contains(&hash) {
tx.rlp_append(&mut packet); if is_hashes {
pushed += 1; if to_send_new.len() >= NEW_POOLED_HASHES_LIMIT {
// this is not hard limit and we are okay with it. Max default tx size is 300k. debug!(target: "sync", "NewPooledTransactionHashes length limit reached. Sending incomplete list of {}/{} transactions.", to_send_new.len(), to_send.len());
if packet.as_raw().len() >= MAX_TRANSACTION_PACKET_SIZE { break;
// Maximal packet size reached just proceed with sending }
debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); packet.append(&hash);
to_send = to_send.into_iter().take(pushed).collect(); to_send_new.insert(hash);
break; } else {
tx.rlp_append(&mut packet);
to_send_new.insert(hash);
// this is not hard limit and we are okay with it. Max default tx size is 300k.
if packet.as_raw().len() >= MAX_TRANSACTION_PACKET_SIZE {
// Maximal packet size reached just proceed with sending
debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", to_send_new.len(), to_send.len());
break;
}
} }
} }
} }
packet.finalize_unbounded_list(); packet.finalize_unbounded_list();
(packet, to_send) (packet, to_send_new)
}; };
// Update stats // Update stats
let id = io.peer_session_info(peer_id).and_then(|info| info.id); let id = io.peer_session_info(peer_id).and_then(|info| info.id);
for hash in &to_send { for hash in &to_send {
// update stats
stats.propagated(hash, id, block_number); stats.propagated(hash, id, block_number);
} }
@ -265,7 +289,7 @@ impl SyncPropagator {
.chain(&to_send) .chain(&to_send)
.cloned() .cloned()
.collect(); .collect();
send_packet(io, peer_id, to_send.len(), packet.out()); send_packet(io, peer_id, is_hashes, to_send.len(), packet.out());
sent_to_peers.insert(peer_id); sent_to_peers.insert(peer_id);
max_sent = cmp::max(max_sent, to_send.len()); max_sent = cmp::max(max_sent, to_send.len());
} }
@ -468,6 +492,8 @@ mod tests {
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
asking_hash: None, asking_hash: None,
unfetched_pooled_transactions: Default::default(),
asking_pooled_transactions: Default::default(),
ask_time: Instant::now(), ask_time: Instant::now(),
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
expired: false, expired: false,

View File

@ -23,13 +23,7 @@ use std::time::Instant;
use sync_io::SyncIo; use sync_io::SyncIo;
use types::BlockNumber; use types::BlockNumber;
use super::sync_packet::{ use super::sync_packet::{SyncPacket::*, *};
SyncPacket,
SyncPacket::{
GetBlockBodiesPacket, GetBlockHeadersPacket, GetReceiptsPacket, GetSnapshotDataPacket,
GetSnapshotManifestPacket,
},
};
use super::{BlockSet, ChainSync, PeerAsking}; use super::{BlockSet, ChainSync, PeerAsking};
@ -109,6 +103,29 @@ impl SyncRequester {
); );
} }
/// Request pooled transactions from a peer
pub fn request_pooled_transactions(
sync: &mut ChainSync,
io: &mut dyn SyncIo,
peer_id: PeerId,
hashes: &[H256],
) {
trace!(target: "sync", "{} <- GetPooledTransactions: {:?}", peer_id, hashes);
let mut rlp = RlpStream::new_list(hashes.len());
for h in hashes {
rlp.append(h);
}
SyncRequester::send_request(
sync,
io,
peer_id,
PeerAsking::PooledTransactions,
PooledTransactionsPacket,
rlp.out(),
)
}
/// Find some headers or blocks to download for a peer. /// Find some headers or blocks to download for a peer.
pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) { pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) {
// find chunk data to download // find chunk data to download

View File

@ -31,15 +31,7 @@ use types::{ids::BlockId, BlockNumber};
use sync_io::SyncIo; use sync_io::SyncIo;
use super::sync_packet::{ use super::sync_packet::{PacketInfo, SyncPacket, SyncPacket::*};
PacketInfo, SyncPacket,
SyncPacket::{
BlockBodiesPacket, BlockHeadersPacket, ConsensusDataPacket, GetBlockBodiesPacket,
GetBlockHeadersPacket, GetReceiptsPacket, GetSnapshotDataPacket, GetSnapshotManifestPacket,
ReceiptsPacket, SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
TransactionsPacket,
},
};
use super::{ use super::{
ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND,
@ -64,6 +56,14 @@ impl SyncSupplier {
if let Some(id) = SyncPacket::from_u8(packet_id) { if let Some(id) = SyncPacket::from_u8(packet_id) {
let result = match id { let result = match id {
GetPooledTransactionsPacket => SyncSupplier::return_rlp(
io,
&rlp,
peer,
SyncSupplier::return_pooled_transactions,
|e| format!("Error sending pooled transactions: {:?}", e),
),
GetBlockBodiesPacket => SyncSupplier::return_rlp( GetBlockBodiesPacket => SyncSupplier::return_rlp(
io, io,
&rlp, &rlp,
@ -273,6 +273,28 @@ impl SyncSupplier {
Ok(Some((BlockHeadersPacket, rlp))) Ok(Some((BlockHeadersPacket, rlp)))
} }
/// Respond to GetPooledTransactions request
fn return_pooled_transactions(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let mut added = 0;
let mut rlp = RlpStream::new();
rlp.begin_unbounded_list();
for v in r {
if let Ok(hash) = v.as_val::<H256>() {
if let Some(tx) = io.chain().queued_transaction(hash) {
tx.signed().rlp_append(&mut rlp);
added += 1;
if rlp.len() > PAYLOAD_SOFT_LIMIT {
break;
}
}
}
}
rlp.finalize_unbounded_list();
trace!(target: "sync", "{} -> GetPooledTransactions: returned {} entries", peer_id, added);
Ok(Some((PooledTransactionsPacket, rlp)))
}
/// Respond to GetBlockBodies request /// Respond to GetBlockBodies request
fn return_block_bodies(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { fn return_block_bodies(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = r.item_count().unwrap_or(0); let mut count = r.item_count().unwrap_or(0);

View File

@ -44,6 +44,9 @@ pub enum SyncPacket {
GetBlockBodiesPacket = 0x05, GetBlockBodiesPacket = 0x05,
BlockBodiesPacket = 0x06, BlockBodiesPacket = 0x06,
NewBlockPacket = 0x07, NewBlockPacket = 0x07,
NewPooledTransactionHashesPacket = 0x08,
GetPooledTransactionsPacket = 0x09,
PooledTransactionsPacket = 0x0a,
//GetNodeDataPacket = 0x0d, //GetNodeDataPacket = 0x0d,
//NodeDataPacket = 0x0e, //NodeDataPacket = 0x0e,
@ -80,6 +83,9 @@ impl PacketInfo for SyncPacket {
| GetBlockBodiesPacket | GetBlockBodiesPacket
| BlockBodiesPacket | BlockBodiesPacket
| NewBlockPacket | NewBlockPacket
| NewPooledTransactionHashesPacket
| GetPooledTransactionsPacket
| PooledTransactionsPacket
//| GetNodeDataPacket //| GetNodeDataPacket
//| NodeDataPacket //| NodeDataPacket
| GetReceiptsPacket | GetReceiptsPacket

View File

@ -18,7 +18,7 @@ use api::PAR_PROTOCOL;
use bytes::Bytes; use bytes::Bytes;
use chain::{ use chain::{
sync_packet::{PacketInfo, SyncPacket}, sync_packet::{PacketInfo, SyncPacket},
ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_2, ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_65, PAR_PROTOCOL_VERSION_2,
}; };
use ethcore::{ use ethcore::{
client::{ client::{
@ -172,7 +172,7 @@ where
if protocol == PAR_PROTOCOL { if protocol == PAR_PROTOCOL {
PAR_PROTOCOL_VERSION_2.0 PAR_PROTOCOL_VERSION_2.0
} else { } else {
ETH_PROTOCOL_VERSION_64.0 ETH_PROTOCOL_VERSION_65.0
} }
} }

View File

@ -309,7 +309,7 @@ where
} }
fn transaction(&self, id: PendingTransactionId) -> Result<Option<Transaction>> { fn transaction(&self, id: PendingTransactionId) -> Result<Option<Transaction>> {
let client_transaction = |id| match self.client.transaction(id) { let client_transaction = |id| match self.client.block_transaction(id) {
Some(t) => Ok(Some(Transaction::from_localized(t))), Some(t) => Ok(Some(Transaction::from_localized(t))),
None => Ok(None), None => Ok(None),
}; };