From 4febd0eb934f09267d5d89f040a39d160d997804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 16 Nov 2016 10:45:55 +0100 Subject: [PATCH] Maintaining the statistics for propagation of pending transactions --- ethcore/src/client/client.rs | 4 +- ethcore/src/miner/mod.rs | 12 ++-- sync/src/chain.rs | 86 ++++++++++++++++---------- sync/src/lib.rs | 1 + sync/src/transactions_stats.rs | 109 +++++++++++++++++++++++++++++++++ 5 files changed, 172 insertions(+), 40 deletions(-) create mode 100644 sync/src/transactions_stats.rs diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ec59e01cf..151d7e10e 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -262,7 +262,7 @@ impl Client { } } - /// Register an action to be done if a mode change happens. + /// Register an action to be done if a mode change happens. pub fn on_mode_change(&self, f: F) where F: 'static + FnMut(&Mode) + Send { *self.on_mode_change.lock() = Some(Box::new(f)); } @@ -890,7 +890,7 @@ impl BlockChainClient for Client { trace!(target: "mode", "Making callback..."); f(&*mode) }, - _ => {} + _ => {} } } match new_mode { diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index da93dc0b7..2eb09cdf1 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -41,16 +41,16 @@ //! } //! ``` -mod miner; -mod external; -mod transaction_queue; mod banning_queue; -mod work_notify; +mod external; +mod miner; mod price_info; +mod transaction_queue; +mod work_notify; -pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin}; -pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit}; pub use self::external::{ExternalMiner, ExternalMinerService}; +pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit}; +pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin}; pub use client::TransactionImportResult; use std::collections::BTreeMap; diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 65fca4069..6f2605b27 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -102,6 +102,7 @@ use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as Do use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; +use transactions_stats::TransactionsStats; known_heap_size!(0, PeerInfo); @@ -257,7 +258,7 @@ enum ForkConfirmation { Unconfirmed, /// Peers chain is too short to confirm the fork. TooShort, - /// Fork is confurmed. + /// Fork is confirmed. Confirmed, } @@ -338,6 +339,8 @@ pub struct ChainSync { handshaking_peers: HashMap, /// Sync start timestamp. Measured when first peer is connected sync_start_time: Option, + /// Transactions propagation statistics + transactions_stats: TransactionsStats, } type RlpResponseResult = Result, PacketDecodeError>; @@ -360,6 +363,7 @@ impl ChainSync { fork_block: config.fork_block, snapshot: Snapshot::new(), sync_start_time: None, + transactions_stats: TransactionsStats::default(), }; sync.update_targets(chain); sync @@ -1867,38 +1871,52 @@ impl ChainSync { packet.out() }; + // Clear old transactions from stats + self.transactions_stats.retain(&all_transactions_hashes); + // sqrt(x)/x scaled to max u32 let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let small = self.peers.len() < MIN_PEERS_PROPAGATION; - let lucky_peers = self.peers.iter_mut() - .filter(|_| small || ::rand::random::() < fraction) - .take(MAX_PEERS_PROPAGATION) - .filter_map(|(peer_id, mut peer_info)| { - // Send all transactions - if peer_info.last_sent_transactions.is_empty() { - peer_info.last_sent_transactions = all_transactions_hashes.clone(); - return Some((*peer_id, all_transactions_rlp.clone())); - } - - // Get hashes of all transactions to send to this peer - let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::>(); - if to_send.is_empty() { - return None; - } - - // Construct RLP - let mut packet = RlpStream::new_list(to_send.len()); - for tx in &transactions { - if to_send.contains(&tx.hash()) { - packet.append(tx); + let lucky_peers = { + let stats = &mut self.transactions_stats; + self.peers.iter_mut() + .filter(|_| small || ::rand::random::() < fraction) + .take(MAX_PEERS_PROPAGATION) + .filter_map(|(peer_id, mut peer_info)| { + // Send all transactions + if peer_info.last_sent_transactions.is_empty() { + // update stats + for hash in &all_transactions_hashes { + let id = io.peer_session_info(*peer_id).and_then(|info| info.id); + stats.propagated(*hash, id); + } + peer_info.last_sent_transactions = all_transactions_hashes.clone(); + return Some((*peer_id, all_transactions_rlp.clone())); } - } - peer_info.last_sent_transactions = all_transactions_hashes.clone(); - Some((*peer_id, packet.out())) - }) - .collect::>(); + // Get hashes of all transactions to send to this peer + let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::>(); + if to_send.is_empty() { + return None; + } + + // Construct RLP + let mut packet = RlpStream::new_list(to_send.len()); + for tx in &transactions { + if to_send.contains(&tx.hash()) { + packet.append(tx); + // update stats + let peer_id = io.peer_session_info(*peer_id).and_then(|info| info.id); + stats.propagated(tx.hash(), peer_id); + } + } + + peer_info.last_sent_transactions = all_transactions_hashes.clone(); + Some((*peer_id, packet.out())) + }) + .collect::>() + }; // Send RLPs let sent = lucky_peers.len(); @@ -2273,18 +2291,23 @@ mod tests { let peer_count = sync.propagate_new_transactions(&mut io); // Try to propagate same transactions for the second time let peer_count2 = sync.propagate_new_transactions(&mut io); + // Even after new block transactions should not be propagated twice + sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); + // Try to propagate same transactions for the third time + let peer_count3 = sync.propagate_new_transactions(&mut io); // 1 message should be send assert_eq!(1, io.queue.len()); // 1 peer should be updated but only once assert_eq!(1, peer_count); assert_eq!(0, peer_count2); + assert_eq!(0, peer_count3); // TRANSACTIONS_PACKET assert_eq!(0x02, io.queue[0].packet_id); } #[test] - fn propagates_transactions_again_after_new_block() { + fn propagates_new_transactions_after_new_block() { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); client.insert_transaction_to_queue(); @@ -2293,15 +2316,14 @@ mod tests { let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &mut queue, None); let peer_count = sync.propagate_new_transactions(&mut io); + io.chain.insert_transaction_to_queue(); + // New block import should trigger propagation. sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); - // Try to propagate same transactions for the second time - let peer_count2 = sync.propagate_new_transactions(&mut io); // 2 message should be send assert_eq!(2, io.queue.len()); - // 1 peer should be updated twice + // 1 peer should receive the message assert_eq!(1, peer_count); - assert_eq!(1, peer_count2); // TRANSACTIONS_PACKET assert_eq!(0x02, io.queue[0].packet_id); assert_eq!(0x02, io.queue[1].packet_id); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 532c05711..25350d410 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -51,6 +51,7 @@ mod blocks; mod block_sync; mod sync_io; mod snapshot; +mod transactions_stats; #[cfg(test)] mod tests; diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs new file mode 100644 index 000000000..ed0a2aedd --- /dev/null +++ b/sync/src/transactions_stats.rs @@ -0,0 +1,109 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Transaction Stats + +use std::collections::{HashSet, HashMap}; +use util::{H256, H512}; +use util::hash::H256FastMap; + +type NodeId = H512; +type BlockNumber = u64; + +#[derive(Debug, Default, PartialEq)] +pub struct Stats { + first_seen: BlockNumber, + propagated_to: HashMap, +} + +#[derive(Debug, Default)] +pub struct TransactionsStats { + pending_transactions: H256FastMap, +} + +impl TransactionsStats { + /// Increases number of propagations to given `enodeid`. + pub fn propagated(&mut self, hash: H256, enode_id: Option) { + let enode_id = enode_id.unwrap_or_default(); + let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::default()); + let mut count = stats.propagated_to.entry(enode_id).or_insert(0); + *count = count.saturating_add(1); + } + + /// Returns propagation stats for given hash or `None` if hash is not known. + pub fn stats(&self, hash: &H256) -> Option<&Stats> { + self.pending_transactions.get(hash) + } + + /// Retains only transactions present in given `HashSet`. + pub fn retain(&mut self, hashes: &HashSet) { + let to_remove = self.pending_transactions.keys() + .filter(|hash| !hashes.contains(hash)) + .cloned() + .collect::>(); + + for hash in to_remove { + self.pending_transactions.remove(&hash); + } + } +} + +#[cfg(test)] +mod tests { + + use std::collections::{HashMap, HashSet}; + use super::{Stats, TransactionsStats}; + + #[test] + fn should_keep_track_of_propagations() { + // given + let mut stats = TransactionsStats::default(); + let hash = 5.into(); + let enodeid1 = 2.into(); + let enodeid2 = 5.into(); + + // when + stats.propagated(hash, Some(enodeid1)); + stats.propagated(hash, Some(enodeid1)); + stats.propagated(hash, Some(enodeid2)); + + // then + let stats = stats.stats(&hash); + assert_eq!(stats, Some(&Stats { + first_seen: 0, + propagated_to: hash_map![ + enodeid1 => 2, + enodeid2 => 1 + ] + })); + } + + #[test] + fn should_remove_hash_from_tracking() { + // given + let mut stats = TransactionsStats::default(); + let hash = 5.into(); + let enodeid1 = 5.into(); + stats.propagated(hash, Some(enodeid1)); + + // when + stats.retain(&HashSet::new()); + + // then + let stats = stats.stats(&hash); + assert_eq!(stats, None); + } +}