Maintaining the statistics for propagation of pending transactions

This commit is contained in:
Tomasz Drwięga 2016-11-16 10:45:55 +01:00
parent 8dc7fcbe07
commit 4febd0eb93
5 changed files with 172 additions and 40 deletions

View File

@ -262,7 +262,7 @@ impl Client {
} }
} }
/// Register an action to be done if a mode change happens. /// Register an action to be done if a mode change happens.
pub fn on_mode_change<F>(&self, f: F) where F: 'static + FnMut(&Mode) + Send { pub fn on_mode_change<F>(&self, f: F) where F: 'static + FnMut(&Mode) + Send {
*self.on_mode_change.lock() = Some(Box::new(f)); *self.on_mode_change.lock() = Some(Box::new(f));
} }
@ -890,7 +890,7 @@ impl BlockChainClient for Client {
trace!(target: "mode", "Making callback..."); trace!(target: "mode", "Making callback...");
f(&*mode) f(&*mode)
}, },
_ => {} _ => {}
} }
} }
match new_mode { match new_mode {

View File

@ -41,16 +41,16 @@
//! } //! }
//! ``` //! ```
mod miner;
mod external;
mod transaction_queue;
mod banning_queue; mod banning_queue;
mod work_notify; mod external;
mod miner;
mod price_info; mod price_info;
mod transaction_queue;
mod work_notify;
pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin};
pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit};
pub use self::external::{ExternalMiner, ExternalMinerService}; pub use self::external::{ExternalMiner, ExternalMinerService};
pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit};
pub use self::transaction_queue::{TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin};
pub use client::TransactionImportResult; pub use client::TransactionImportResult;
use std::collections::BTreeMap; use std::collections::BTreeMap;

View File

@ -102,6 +102,7 @@ use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as Do
use snapshot::{Snapshot, ChunkType}; use snapshot::{Snapshot, ChunkType};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
use transactions_stats::TransactionsStats;
known_heap_size!(0, PeerInfo); known_heap_size!(0, PeerInfo);
@ -257,7 +258,7 @@ enum ForkConfirmation {
Unconfirmed, Unconfirmed,
/// Peers chain is too short to confirm the fork. /// Peers chain is too short to confirm the fork.
TooShort, TooShort,
/// Fork is confurmed. /// Fork is confirmed.
Confirmed, Confirmed,
} }
@ -338,6 +339,8 @@ pub struct ChainSync {
handshaking_peers: HashMap<PeerId, u64>, handshaking_peers: HashMap<PeerId, u64>,
/// Sync start timestamp. Measured when first peer is connected /// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<u64>, sync_start_time: Option<u64>,
/// Transactions propagation statistics
transactions_stats: TransactionsStats,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -360,6 +363,7 @@ impl ChainSync {
fork_block: config.fork_block, fork_block: config.fork_block,
snapshot: Snapshot::new(), snapshot: Snapshot::new(),
sync_start_time: None, sync_start_time: None,
transactions_stats: TransactionsStats::default(),
}; };
sync.update_targets(chain); sync.update_targets(chain);
sync sync
@ -1867,38 +1871,52 @@ impl ChainSync {
packet.out() packet.out()
}; };
// Clear old transactions from stats
self.transactions_stats.retain(&all_transactions_hashes);
// sqrt(x)/x scaled to max u32 // sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION; let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let lucky_peers = self.peers.iter_mut() let lucky_peers = {
.filter(|_| small || ::rand::random::<u32>() < fraction) let stats = &mut self.transactions_stats;
.take(MAX_PEERS_PROPAGATION) self.peers.iter_mut()
.filter_map(|(peer_id, mut peer_info)| { .filter(|_| small || ::rand::random::<u32>() < fraction)
// Send all transactions .take(MAX_PEERS_PROPAGATION)
if peer_info.last_sent_transactions.is_empty() { .filter_map(|(peer_id, mut peer_info)| {
peer_info.last_sent_transactions = all_transactions_hashes.clone(); // Send all transactions
return Some((*peer_id, all_transactions_rlp.clone())); if peer_info.last_sent_transactions.is_empty() {
} // update stats
for hash in &all_transactions_hashes {
// Get hashes of all transactions to send to this peer let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::<HashSet<_>>(); stats.propagated(*hash, id);
if to_send.is_empty() { }
return None; peer_info.last_sent_transactions = all_transactions_hashes.clone();
} return Some((*peer_id, all_transactions_rlp.clone()));
// Construct RLP
let mut packet = RlpStream::new_list(to_send.len());
for tx in &transactions {
if to_send.contains(&tx.hash()) {
packet.append(tx);
} }
}
peer_info.last_sent_transactions = all_transactions_hashes.clone(); // Get hashes of all transactions to send to this peer
Some((*peer_id, packet.out())) let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions).cloned().collect::<HashSet<_>>();
}) if to_send.is_empty() {
.collect::<Vec<_>>(); return None;
}
// Construct RLP
let mut packet = RlpStream::new_list(to_send.len());
for tx in &transactions {
if to_send.contains(&tx.hash()) {
packet.append(tx);
// update stats
let peer_id = io.peer_session_info(*peer_id).and_then(|info| info.id);
stats.propagated(tx.hash(), peer_id);
}
}
peer_info.last_sent_transactions = all_transactions_hashes.clone();
Some((*peer_id, packet.out()))
})
.collect::<Vec<_>>()
};
// Send RLPs // Send RLPs
let sent = lucky_peers.len(); let sent = lucky_peers.len();
@ -2273,18 +2291,23 @@ mod tests {
let peer_count = sync.propagate_new_transactions(&mut io); let peer_count = sync.propagate_new_transactions(&mut io);
// Try to propagate same transactions for the second time // Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io); let peer_count2 = sync.propagate_new_transactions(&mut io);
// Even after new block transactions should not be propagated twice
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the third time
let peer_count3 = sync.propagate_new_transactions(&mut io);
// 1 message should be send // 1 message should be send
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
// 1 peer should be updated but only once // 1 peer should be updated but only once
assert_eq!(1, peer_count); assert_eq!(1, peer_count);
assert_eq!(0, peer_count2); assert_eq!(0, peer_count2);
assert_eq!(0, peer_count3);
// TRANSACTIONS_PACKET // TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id); assert_eq!(0x02, io.queue[0].packet_id);
} }
#[test] #[test]
fn propagates_transactions_again_after_new_block() { fn propagates_new_transactions_after_new_block() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle); client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue(); client.insert_transaction_to_queue();
@ -2293,15 +2316,14 @@ mod tests {
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None); let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peer_count = sync.propagate_new_transactions(&mut io); let peer_count = sync.propagate_new_transactions(&mut io);
io.chain.insert_transaction_to_queue();
// New block import should trigger propagation.
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);
// 2 message should be send // 2 message should be send
assert_eq!(2, io.queue.len()); assert_eq!(2, io.queue.len());
// 1 peer should be updated twice // 1 peer should receive the message
assert_eq!(1, peer_count); assert_eq!(1, peer_count);
assert_eq!(1, peer_count2);
// TRANSACTIONS_PACKET // TRANSACTIONS_PACKET
assert_eq!(0x02, io.queue[0].packet_id); assert_eq!(0x02, io.queue[0].packet_id);
assert_eq!(0x02, io.queue[1].packet_id); assert_eq!(0x02, io.queue[1].packet_id);

View File

@ -51,6 +51,7 @@ mod blocks;
mod block_sync; mod block_sync;
mod sync_io; mod sync_io;
mod snapshot; mod snapshot;
mod transactions_stats;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View File

@ -0,0 +1,109 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Stats
use std::collections::{HashSet, HashMap};
use util::{H256, H512};
use util::hash::H256FastMap;
type NodeId = H512;
type BlockNumber = u64;
#[derive(Debug, Default, PartialEq)]
pub struct Stats {
first_seen: BlockNumber,
propagated_to: HashMap<NodeId, usize>,
}
#[derive(Debug, Default)]
pub struct TransactionsStats {
pending_transactions: H256FastMap<Stats>,
}
impl TransactionsStats {
/// Increases number of propagations to given `enodeid`.
pub fn propagated(&mut self, hash: H256, enode_id: Option<NodeId>) {
let enode_id = enode_id.unwrap_or_default();
let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::default());
let mut count = stats.propagated_to.entry(enode_id).or_insert(0);
*count = count.saturating_add(1);
}
/// Returns propagation stats for given hash or `None` if hash is not known.
pub fn stats(&self, hash: &H256) -> Option<&Stats> {
self.pending_transactions.get(hash)
}
/// Retains only transactions present in given `HashSet`.
pub fn retain(&mut self, hashes: &HashSet<H256>) {
let to_remove = self.pending_transactions.keys()
.filter(|hash| !hashes.contains(hash))
.cloned()
.collect::<Vec<_>>();
for hash in to_remove {
self.pending_transactions.remove(&hash);
}
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use super::{Stats, TransactionsStats};
#[test]
fn should_keep_track_of_propagations() {
// given
let mut stats = TransactionsStats::default();
let hash = 5.into();
let enodeid1 = 2.into();
let enodeid2 = 5.into();
// when
stats.propagated(hash, Some(enodeid1));
stats.propagated(hash, Some(enodeid1));
stats.propagated(hash, Some(enodeid2));
// then
let stats = stats.stats(&hash);
assert_eq!(stats, Some(&Stats {
first_seen: 0,
propagated_to: hash_map![
enodeid1 => 2,
enodeid2 => 1
]
}));
}
#[test]
fn should_remove_hash_from_tracking() {
// given
let mut stats = TransactionsStats::default();
let hash = 5.into();
let enodeid1 = 5.into();
stats.propagated(hash, Some(enodeid1));
// when
stats.retain(&HashSet::new());
// then
let stats = stats.stats(&hash);
assert_eq!(stats, None);
}
}