From 19ca9ad460cbf1fa3fced7197420935473580b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 10 Dec 2016 21:22:19 +0100 Subject: [PATCH] Prevent broadcasting transactions to peer that send them. --- ethcore/src/client/chain_notify.rs | 7 ++-- ethcore/src/client/client.rs | 9 ++--- ethcore/src/client/test_client.rs | 2 +- ethcore/src/client/traits.rs | 4 +- ethcore/src/service.rs | 6 +-- .../dapps/localtx/Transaction/transaction.js | 25 +----------- rpc/src/v1/tests/helpers/sync_provider.rs | 6 --- rpc/src/v1/tests/mocked/parity.rs | 2 +- rpc/src/v1/types/sync.rs | 12 +----- sync/src/api.rs | 8 ++-- sync/src/chain.rs | 11 +++-- sync/src/transactions_stats.rs | 40 ------------------- 12 files changed, 25 insertions(+), 107 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 50ff20e38..ddab542fb 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -41,11 +41,10 @@ pub trait ChainNotify : Send + Sync { // does nothing by default } - /// fires when new transactions are imported - fn transactions_imported(&self, + /// fires when new transactions are received from a peer + fn transactions_received(&self, _hashes: Vec, - _peer_id: Option, - _block_num: u64, + _peer_id: usize, ) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 1387dad9a..c258ed1eb 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -559,15 +559,14 @@ impl Client { } /// Import transactions from the IO queue - pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option) -> usize { + pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize { trace!(target: "external_tx", "Importing queued"); let _timer = PerfTimer::new("import_queued_transactions"); self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); let txs: Vec = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect(); let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect(); - let block_number = self.chain_info().best_block_number; self.notify(|notify| { - notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number); + notify.transactions_received(hashes.clone(), peer_id); }); let results = self.miner.import_external_transactions(self, txs); results.len() @@ -1269,14 +1268,14 @@ impl BlockChainClient for Client { (*self.build_last_hashes(self.chain.read().best_block_hash())).clone() } - fn queue_transactions(&self, transactions: Vec, node_id: Option) { + fn queue_transactions(&self, transactions: Vec, peer_id: usize) { let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); trace!(target: "external_tx", "Queue size: {}", queue_size); if queue_size > MAX_TX_QUEUE_SIZE { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); - match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) { + match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) { Ok(_) => { self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index a2af13794..7f27c9151 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn queue_transactions(&self, transactions: Vec, _peer_id: Option) { + fn queue_transactions(&self, transactions: Vec, _peer_id: usize) { // import right here let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); self.miner.import_external_transactions(self, txs); diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index ccf07ea3f..fed864607 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::collections::BTreeMap; -use util::{U256, Address, H256, H512, H2048, Bytes, Itertools}; +use util::{U256, Address, H256, H2048, Bytes, Itertools}; use util::stats::Histogram; use blockchain::TreeRoute; use verification::queue::QueueInfo as BlockQueueInfo; @@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send { fn last_hashes(&self) -> LastHashes; /// Queue transactions for importing. - fn queue_transactions(&self, transactions: Vec, peer_id: Option); + fn queue_transactions(&self, transactions: Vec, peer_id: usize); /// list all transactions fn pending_transactions(&self) -> Vec; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index b595843a8..9b96911e4 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -39,7 +39,7 @@ pub enum ClientIoMessage { /// A block is ready BlockVerified, /// New transaction RLPs are ready to be imported - NewTransactions(Vec, Option), + NewTransactions(Vec, usize), /// Begin snapshot restoration BeginRestoration(ManifestData), /// Feed a state chunk to the snapshot service @@ -196,8 +196,8 @@ impl IoHandler for ClientIoHandler { match *net_message { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } - ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => { - self.client.import_queued_transactions(transactions, peer_id.clone()); + ClientIoMessage::NewTransactions(ref transactions, peer_id) => { + self.client.import_queued_transactions(transactions, peer_id); } ClientIoMessage::BeginRestoration(ref manifest) => { if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { diff --git a/js/src/dapps/localtx/Transaction/transaction.js b/js/src/dapps/localtx/Transaction/transaction.js index c9ca10ba5..d1c98f360 100644 --- a/js/src/dapps/localtx/Transaction/transaction.js +++ b/js/src/dapps/localtx/Transaction/transaction.js @@ -86,17 +86,6 @@ class BaseTransaction extends Component { ); } - - renderReceived (stats) { - const noOfPeers = Object.keys(stats.receivedFrom).length; - const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0); - - return ( - - { noOfPropagations } ({ noOfPeers } peers) - - ); - } } export class Transaction extends BaseTransaction { @@ -113,8 +102,7 @@ export class Transaction extends BaseTransaction { isLocal: false, stats: { firstSeen: 0, - propagatedTo: {}, - receivedFrom: {} + propagatedTo: {} } }; @@ -140,9 +128,6 @@ export class Transaction extends BaseTransaction { # Propagated - - # Received - ); @@ -179,9 +164,6 @@ export class Transaction extends BaseTransaction { { this.renderPropagation(stats) } - - { this.renderReceived(stats) } - ); } @@ -210,8 +192,7 @@ export class LocalTransaction extends BaseTransaction { static defaultProps = { stats: { - propagatedTo: {}, - receivedFrom: {} + propagatedTo: {} } }; @@ -335,8 +316,6 @@ export class LocalTransaction extends BaseTransaction { { this.renderStatus() }
{ status === 'pending' ? this.renderPropagation(stats) : null } -
- { status === 'pending' ? this.renderReceived(stats) : null } ); diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index aa7e8d849..2517abd46 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -106,18 +106,12 @@ impl SyncProvider for TestSyncProvider { propagated_to: map![ 128.into() => 16 ], - received_from: map![ - 1.into() => 10 - ], }, 5.into() => TransactionStats { first_seen: 16, propagated_to: map![ 16.into() => 1 ], - received_from: map![ - 256.into() => 2 - ], } ] } diff --git a/rpc/src/v1/tests/mocked/parity.rs b/rpc/src/v1/tests/mocked/parity.rs index 45ee4aa75..9b4daaccd 100644 --- a/rpc/src/v1/tests/mocked/parity.rs +++ b/rpc/src/v1/tests/mocked/parity.rs @@ -363,7 +363,7 @@ fn rpc_parity_transactions_stats() { let io = deps.default_client(); let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":10}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100":2}}},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#; assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index 65d989156..8d3726e7a 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -127,9 +127,6 @@ pub struct TransactionStats { /// Peers this transaction was propagated to with count. #[serde(rename="propagatedTo")] pub propagated_to: BTreeMap, - /// Peers that propagated this transaction back. - #[serde(rename="receivedFrom")] - pub received_from: BTreeMap, } impl From for PeerInfo { @@ -161,10 +158,6 @@ impl From for TransactionStats { .into_iter() .map(|(id, count)| (id.into(), count)) .collect(), - received_from: s.received_from - .into_iter() - .map(|(id, count)| (id.into(), count)) - .collect(), } } } @@ -216,12 +209,9 @@ mod tests { propagated_to: map![ 10.into() => 50 ], - received_from: map![ - 1.into() => 1000 - ], }; let serialized = serde_json::to_string(&stats).unwrap(); - assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#) + assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 0f3695fe9..c675ee6d3 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -99,8 +99,6 @@ pub struct TransactionStats { pub first_seen: u64, /// Peers it was propagated to. pub propagated_to: BTreeMap, - /// Peers that propagated the transaction back. - pub received_from: BTreeMap, } /// Peer connection information @@ -338,9 +336,9 @@ impl ChainNotify for EthSync { self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } - fn transactions_imported(&self, hashes: Vec, peer_id: Option, block_number: u64) { + fn transactions_received(&self, hashes: Vec, peer_id: PeerId) { let mut sync = self.sync_handler.sync.write(); - sync.transactions_imported(hashes, peer_id, block_number); + sync.transactions_received(hashes, peer_id); } } @@ -351,7 +349,7 @@ struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); - self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.persistent_peer_id()) + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer()) } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 243c6b431..ecd95c68a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -432,10 +432,10 @@ impl ChainSync { self.transactions_stats.stats() } - /// Updates statistics for imported transactions. - pub fn transactions_imported(&mut self, hashes: Vec, peer_id: Option, block_number: u64) { - for hash in hashes { - self.transactions_stats.received(hash, peer_id, block_number); + /// Updates transactions were received by a peer + pub fn transactions_received(&mut self, hashes: Vec, peer_id: PeerId) { + if let Some(mut peer_info) = self.peers.get_mut(&peer_id) { + peer_info.last_sent_transactions.extend(&hashes); } } @@ -1416,8 +1416,7 @@ impl ChainSync { let tx = rlp.as_raw().to_vec(); transactions.push(tx); } - let id = io.peer_session_info(peer_id).and_then(|info| info.id); - io.chain().queue_transactions(transactions, id); + io.chain().queue_transactions(transactions, peer_id); Ok(()) } diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs index a91a860e5..fa8eb6e82 100644 --- a/sync/src/transactions_stats.rs +++ b/sync/src/transactions_stats.rs @@ -26,7 +26,6 @@ type BlockNumber = u64; pub struct Stats { first_seen: BlockNumber, propagated_to: HashMap, - received_from: HashMap, } impl Stats { @@ -34,7 +33,6 @@ impl Stats { Stats { first_seen: number, propagated_to: Default::default(), - received_from: Default::default(), } } } @@ -47,10 +45,6 @@ impl<'a> From<&'a Stats> for TransactionStats { .iter() .map(|(hash, size)| (*hash, *size)) .collect(), - received_from: other.received_from - .iter() - .map(|(hash, size)| (*hash, *size)) - .collect(), } } } @@ -69,14 +63,6 @@ impl TransactionsStats { *count = count.saturating_add(1); } - /// Increase number of back-propagations from given `enodeid`. - pub fn received(&mut self, hash: H256, enode_id: Option, current_block_num: BlockNumber) { - let enode_id = enode_id.unwrap_or_default(); - let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); - let mut count = stats.received_from.entry(enode_id).or_insert(0); - *count = count.saturating_add(1); - } - /// Returns propagation stats for given hash or `None` if hash is not known. #[cfg(test)] pub fn get(&self, hash: &H256) -> Option<&Stats> { @@ -127,32 +113,6 @@ mod tests { enodeid1 => 2, enodeid2 => 1 ], - received_from: Default::default(), - })); - } - - #[test] - fn should_keep_track_of_back_propagations() { - // given - let mut stats = TransactionsStats::default(); - let hash = 5.into(); - let enodeid1 = 2.into(); - let enodeid2 = 5.into(); - - // when - stats.received(hash, Some(enodeid1), 5); - stats.received(hash, Some(enodeid1), 10); - stats.received(hash, Some(enodeid2), 15); - - // then - let stats = stats.get(&hash); - assert_eq!(stats, Some(&Stats { - first_seen: 5, - propagated_to: Default::default(), - received_from: hash_map![ - enodeid1 => 2, - enodeid2 => 1 - ] })); }