Prevent broadcasting transactions to peer that send them.
This commit is contained in:
		
							parent
							
								
									aaf6da4c00
								
							
						
					
					
						commit
						19ca9ad460
					
				@ -41,11 +41,10 @@ pub trait ChainNotify : Send + Sync {
 | 
			
		||||
		// does nothing by default
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// fires when new transactions are imported
 | 
			
		||||
	fn transactions_imported(&self,
 | 
			
		||||
	/// fires when new transactions are received from a peer
 | 
			
		||||
	fn transactions_received(&self,
 | 
			
		||||
		_hashes: Vec<H256>,
 | 
			
		||||
		_peer_id: Option<H512>,
 | 
			
		||||
		_block_num: u64,
 | 
			
		||||
		_peer_id: usize,
 | 
			
		||||
	) {
 | 
			
		||||
		// does nothing by default
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -559,15 +559,14 @@ impl Client {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Import transactions from the IO queue
 | 
			
		||||
	pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: Option<H512>) -> usize {
 | 
			
		||||
	pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize {
 | 
			
		||||
		trace!(target: "external_tx", "Importing queued");
 | 
			
		||||
		let _timer = PerfTimer::new("import_queued_transactions");
 | 
			
		||||
		self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
 | 
			
		||||
		let txs: Vec<SignedTransaction> = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
 | 
			
		||||
		let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect();
 | 
			
		||||
		let block_number = self.chain_info().best_block_number;
 | 
			
		||||
		self.notify(|notify| {
 | 
			
		||||
			notify.transactions_imported(hashes.clone(), peer_id.clone(), block_number);
 | 
			
		||||
			notify.transactions_received(hashes.clone(), peer_id);
 | 
			
		||||
		});
 | 
			
		||||
		let results = self.miner.import_external_transactions(self, txs);
 | 
			
		||||
		results.len()
 | 
			
		||||
@ -1269,14 +1268,14 @@ impl BlockChainClient for Client {
 | 
			
		||||
		(*self.build_last_hashes(self.chain.read().best_block_hash())).clone()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn queue_transactions(&self, transactions: Vec<Bytes>, node_id: Option<H512>) {
 | 
			
		||||
	fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
 | 
			
		||||
		let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
 | 
			
		||||
		trace!(target: "external_tx", "Queue size: {}", queue_size);
 | 
			
		||||
		if queue_size > MAX_TX_QUEUE_SIZE {
 | 
			
		||||
			debug!("Ignoring {} transactions: queue is full", transactions.len());
 | 
			
		||||
		} else {
 | 
			
		||||
			let len = transactions.len();
 | 
			
		||||
			match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, node_id)) {
 | 
			
		||||
			match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) {
 | 
			
		||||
				Ok(_) => {
 | 
			
		||||
					self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
@ -657,7 +657,7 @@ impl BlockChainClient for TestBlockChainClient {
 | 
			
		||||
		unimplemented!();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: Option<H512>) {
 | 
			
		||||
	fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: usize) {
 | 
			
		||||
		// import right here
 | 
			
		||||
		let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
 | 
			
		||||
		self.miner.import_external_transactions(self, txs);
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,7 @@
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use std::collections::BTreeMap;
 | 
			
		||||
use util::{U256, Address, H256, H512, H2048, Bytes, Itertools};
 | 
			
		||||
use util::{U256, Address, H256, H2048, Bytes, Itertools};
 | 
			
		||||
use util::stats::Histogram;
 | 
			
		||||
use blockchain::TreeRoute;
 | 
			
		||||
use verification::queue::QueueInfo as BlockQueueInfo;
 | 
			
		||||
@ -200,7 +200,7 @@ pub trait BlockChainClient : Sync + Send {
 | 
			
		||||
	fn last_hashes(&self) -> LastHashes;
 | 
			
		||||
 | 
			
		||||
	/// Queue transactions for importing.
 | 
			
		||||
	fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: Option<H512>);
 | 
			
		||||
	fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);
 | 
			
		||||
 | 
			
		||||
	/// list all transactions
 | 
			
		||||
	fn pending_transactions(&self) -> Vec<SignedTransaction>;
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,7 @@ pub enum ClientIoMessage {
 | 
			
		||||
	/// A block is ready
 | 
			
		||||
	BlockVerified,
 | 
			
		||||
	/// New transaction RLPs are ready to be imported
 | 
			
		||||
	NewTransactions(Vec<Bytes>, Option<H512>),
 | 
			
		||||
	NewTransactions(Vec<Bytes>, usize),
 | 
			
		||||
	/// Begin snapshot restoration
 | 
			
		||||
	BeginRestoration(ManifestData),
 | 
			
		||||
	/// Feed a state chunk to the snapshot service
 | 
			
		||||
@ -196,8 +196,8 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
 | 
			
		||||
 | 
			
		||||
		match *net_message {
 | 
			
		||||
			ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
 | 
			
		||||
			ClientIoMessage::NewTransactions(ref transactions, ref peer_id) => {
 | 
			
		||||
				self.client.import_queued_transactions(transactions, peer_id.clone());
 | 
			
		||||
			ClientIoMessage::NewTransactions(ref transactions, peer_id) => {
 | 
			
		||||
				self.client.import_queued_transactions(transactions, peer_id);
 | 
			
		||||
			}
 | 
			
		||||
			ClientIoMessage::BeginRestoration(ref manifest) => {
 | 
			
		||||
				if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
 | 
			
		||||
 | 
			
		||||
@ -86,17 +86,6 @@ class BaseTransaction extends Component {
 | 
			
		||||
      </span>
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  renderReceived (stats) {
 | 
			
		||||
    const noOfPeers = Object.keys(stats.receivedFrom).length;
 | 
			
		||||
    const noOfPropagations = Object.values(stats.receivedFrom).reduce((sum, val) => sum + val, 0);
 | 
			
		||||
 | 
			
		||||
    return (
 | 
			
		||||
      <span className={ styles.nowrap }>
 | 
			
		||||
        { noOfPropagations } ({ noOfPeers } peers)
 | 
			
		||||
      </span>
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export class Transaction extends BaseTransaction {
 | 
			
		||||
@ -113,8 +102,7 @@ export class Transaction extends BaseTransaction {
 | 
			
		||||
    isLocal: false,
 | 
			
		||||
    stats: {
 | 
			
		||||
      firstSeen: 0,
 | 
			
		||||
      propagatedTo: {},
 | 
			
		||||
      receivedFrom: {}
 | 
			
		||||
      propagatedTo: {}
 | 
			
		||||
    }
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
@ -140,9 +128,6 @@ export class Transaction extends BaseTransaction {
 | 
			
		||||
        <th>
 | 
			
		||||
          # Propagated
 | 
			
		||||
        </th>
 | 
			
		||||
        <th>
 | 
			
		||||
          # Received
 | 
			
		||||
        </th>
 | 
			
		||||
        <th />
 | 
			
		||||
      </tr>
 | 
			
		||||
    );
 | 
			
		||||
@ -179,9 +164,6 @@ export class Transaction extends BaseTransaction {
 | 
			
		||||
        <td>
 | 
			
		||||
          { this.renderPropagation(stats) }
 | 
			
		||||
        </td>
 | 
			
		||||
        <td>
 | 
			
		||||
          { this.renderReceived(stats) }
 | 
			
		||||
        </td>
 | 
			
		||||
      </tr>
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
@ -210,8 +192,7 @@ export class LocalTransaction extends BaseTransaction {
 | 
			
		||||
 | 
			
		||||
  static defaultProps = {
 | 
			
		||||
    stats: {
 | 
			
		||||
      propagatedTo: {},
 | 
			
		||||
      receivedFrom: {}
 | 
			
		||||
      propagatedTo: {}
 | 
			
		||||
    }
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
@ -335,8 +316,6 @@ export class LocalTransaction extends BaseTransaction {
 | 
			
		||||
          { this.renderStatus() }
 | 
			
		||||
          <br />
 | 
			
		||||
          { status === 'pending' ? this.renderPropagation(stats) : null }
 | 
			
		||||
          <br />
 | 
			
		||||
          { status === 'pending' ? this.renderReceived(stats) : null }
 | 
			
		||||
        </td>
 | 
			
		||||
      </tr>
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
@ -106,18 +106,12 @@ impl SyncProvider for TestSyncProvider {
 | 
			
		||||
				propagated_to: map![
 | 
			
		||||
					128.into() => 16
 | 
			
		||||
				],
 | 
			
		||||
				received_from: map![
 | 
			
		||||
					1.into() => 10
 | 
			
		||||
				],
 | 
			
		||||
			},
 | 
			
		||||
			5.into() => TransactionStats {
 | 
			
		||||
				first_seen: 16,
 | 
			
		||||
				propagated_to: map![
 | 
			
		||||
					16.into() => 1
 | 
			
		||||
				],
 | 
			
		||||
				received_from: map![
 | 
			
		||||
					256.into() => 2
 | 
			
		||||
				],
 | 
			
		||||
			}
 | 
			
		||||
		]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -363,7 +363,7 @@ fn rpc_parity_transactions_stats() {
 | 
			
		||||
	let io = deps.default_client();
 | 
			
		||||
 | 
			
		||||
	let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#;
 | 
			
		||||
	let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":10}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100":2}}},"id":1}"#;
 | 
			
		||||
	let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#;
 | 
			
		||||
 | 
			
		||||
	assert_eq!(io.handle_request_sync(request), Some(response.to_owned()));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -127,9 +127,6 @@ pub struct TransactionStats {
 | 
			
		||||
	/// Peers this transaction was propagated to with count.
 | 
			
		||||
	#[serde(rename="propagatedTo")]
 | 
			
		||||
	pub propagated_to: BTreeMap<H512, usize>,
 | 
			
		||||
	/// Peers that propagated this transaction back.
 | 
			
		||||
	#[serde(rename="receivedFrom")]
 | 
			
		||||
	pub received_from: BTreeMap<H512, usize>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<SyncPeerInfo> for PeerInfo {
 | 
			
		||||
@ -161,10 +158,6 @@ impl From<SyncTransactionStats> for TransactionStats {
 | 
			
		||||
				.into_iter()
 | 
			
		||||
				.map(|(id, count)| (id.into(), count))
 | 
			
		||||
				.collect(),
 | 
			
		||||
			received_from: s.received_from
 | 
			
		||||
				.into_iter()
 | 
			
		||||
				.map(|(id, count)| (id.into(), count))
 | 
			
		||||
				.collect(),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -216,12 +209,9 @@ mod tests {
 | 
			
		||||
			propagated_to: map![
 | 
			
		||||
				10.into() => 50
 | 
			
		||||
			],
 | 
			
		||||
			received_from: map![
 | 
			
		||||
				1.into() => 1000
 | 
			
		||||
			],
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		let serialized = serde_json::to_string(&stats).unwrap();
 | 
			
		||||
		assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50},"receivedFrom":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001":1000}}"#)
 | 
			
		||||
		assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -99,8 +99,6 @@ pub struct TransactionStats {
 | 
			
		||||
	pub first_seen: u64,
 | 
			
		||||
	/// Peers it was propagated to.
 | 
			
		||||
	pub propagated_to: BTreeMap<H512, usize>,
 | 
			
		||||
	/// Peers that propagated the transaction back.
 | 
			
		||||
	pub received_from: BTreeMap<H512, usize>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Peer connection information
 | 
			
		||||
@ -338,9 +336,9 @@ impl ChainNotify for EthSync {
 | 
			
		||||
		self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn transactions_imported(&self, hashes: Vec<H256>, peer_id: Option<H512>, block_number: u64) {
 | 
			
		||||
	fn transactions_received(&self, hashes: Vec<H256>, peer_id: PeerId) {
 | 
			
		||||
		let mut sync = self.sync_handler.sync.write();
 | 
			
		||||
		sync.transactions_imported(hashes, peer_id, block_number);
 | 
			
		||||
		sync.transactions_received(hashes, peer_id);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -351,7 +349,7 @@ struct TxRelay(Arc<BlockChainClient>);
 | 
			
		||||
impl LightHandler for TxRelay {
 | 
			
		||||
	fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
 | 
			
		||||
		trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
 | 
			
		||||
		self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.persistent_peer_id())
 | 
			
		||||
		self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -432,10 +432,10 @@ impl ChainSync {
 | 
			
		||||
		self.transactions_stats.stats()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Updates statistics for imported transactions.
 | 
			
		||||
	pub fn transactions_imported(&mut self, hashes: Vec<H256>, peer_id: Option<H512>, block_number: u64) {
 | 
			
		||||
		for hash in hashes {
 | 
			
		||||
			self.transactions_stats.received(hash, peer_id, block_number);
 | 
			
		||||
	/// Updates transactions were received by a peer
 | 
			
		||||
	pub fn transactions_received(&mut self, hashes: Vec<H256>, peer_id: PeerId) {
 | 
			
		||||
		if let Some(mut peer_info) = self.peers.get_mut(&peer_id) {
 | 
			
		||||
			peer_info.last_sent_transactions.extend(&hashes);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -1416,8 +1416,7 @@ impl ChainSync {
 | 
			
		||||
			let tx = rlp.as_raw().to_vec();
 | 
			
		||||
			transactions.push(tx);
 | 
			
		||||
		}
 | 
			
		||||
		let id = io.peer_session_info(peer_id).and_then(|info| info.id);
 | 
			
		||||
		io.chain().queue_transactions(transactions, id);
 | 
			
		||||
		io.chain().queue_transactions(transactions, peer_id);
 | 
			
		||||
		Ok(())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -26,7 +26,6 @@ type BlockNumber = u64;
 | 
			
		||||
pub struct Stats {
 | 
			
		||||
	first_seen: BlockNumber,
 | 
			
		||||
	propagated_to: HashMap<NodeId, usize>,
 | 
			
		||||
	received_from: HashMap<NodeId, usize>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Stats {
 | 
			
		||||
@ -34,7 +33,6 @@ impl Stats {
 | 
			
		||||
		Stats {
 | 
			
		||||
			first_seen: number,
 | 
			
		||||
			propagated_to: Default::default(),
 | 
			
		||||
			received_from: Default::default(),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -47,10 +45,6 @@ impl<'a> From<&'a Stats> for TransactionStats {
 | 
			
		||||
				.iter()
 | 
			
		||||
				.map(|(hash, size)| (*hash, *size))
 | 
			
		||||
				.collect(),
 | 
			
		||||
			received_from: other.received_from
 | 
			
		||||
				.iter()
 | 
			
		||||
				.map(|(hash, size)| (*hash, *size))
 | 
			
		||||
				.collect(),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -69,14 +63,6 @@ impl TransactionsStats {
 | 
			
		||||
		*count = count.saturating_add(1);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Increase number of back-propagations from given `enodeid`.
 | 
			
		||||
	pub fn received(&mut self, hash: H256, enode_id: Option<NodeId>, current_block_num: BlockNumber) {
 | 
			
		||||
		let enode_id = enode_id.unwrap_or_default();
 | 
			
		||||
		let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num));
 | 
			
		||||
		let mut count = stats.received_from.entry(enode_id).or_insert(0);
 | 
			
		||||
		*count = count.saturating_add(1);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Returns propagation stats for given hash or `None` if hash is not known.
 | 
			
		||||
	#[cfg(test)]
 | 
			
		||||
	pub fn get(&self, hash: &H256) -> Option<&Stats> {
 | 
			
		||||
@ -127,32 +113,6 @@ mod tests {
 | 
			
		||||
				enodeid1 => 2,
 | 
			
		||||
				enodeid2 => 1
 | 
			
		||||
			],
 | 
			
		||||
			received_from: Default::default(),
 | 
			
		||||
		}));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn should_keep_track_of_back_propagations() {
 | 
			
		||||
		// given
 | 
			
		||||
		let mut stats = TransactionsStats::default();
 | 
			
		||||
		let hash = 5.into();
 | 
			
		||||
		let enodeid1 = 2.into();
 | 
			
		||||
		let enodeid2 = 5.into();
 | 
			
		||||
 | 
			
		||||
		// when
 | 
			
		||||
		stats.received(hash, Some(enodeid1), 5);
 | 
			
		||||
		stats.received(hash, Some(enodeid1), 10);
 | 
			
		||||
		stats.received(hash, Some(enodeid2), 15);
 | 
			
		||||
 | 
			
		||||
		// then
 | 
			
		||||
		let stats = stats.get(&hash);
 | 
			
		||||
		assert_eq!(stats, Some(&Stats {
 | 
			
		||||
			first_seen: 5,
 | 
			
		||||
			propagated_to: Default::default(),
 | 
			
		||||
			received_from: hash_map![
 | 
			
		||||
				enodeid1 => 2,
 | 
			
		||||
				enodeid2 => 1
 | 
			
		||||
			]
 | 
			
		||||
		}));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user