diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index 3908992d1..e974626d0 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -16,7 +16,7 @@ //! Binary representation of types -use util::{U256, U512, H256, H2048, Address}; +use util::{U256, U512, H256, H512, H2048, Address}; use std::mem; use std::collections::{VecDeque, BTreeMap}; use std::ops::Range; @@ -800,6 +800,7 @@ binary_fixed_size!(bool); binary_fixed_size!(U256); binary_fixed_size!(U512); binary_fixed_size!(H256); +binary_fixed_size!(H512); binary_fixed_size!(H2048); binary_fixed_size!(Address); binary_fixed_size!(BinHandshake); diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 1b8ee9695..a952d54ec 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -34,7 +34,7 @@ use ethcore::account_provider::AccountProvider; use jsonrpc_core::Error; use v1::traits::Parity; -use v1::types::{Bytes, U256, H160, H256, H512, Peers, Transaction, RpcSettings, Histogram}; +use v1::types::{Bytes, U256, H160, H256, H512, Peers, Transaction, RpcSettings, Histogram, TransactionStats}; use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings}; use v1::helpers::dispatch::DEFAULT_MAC; @@ -259,6 +259,16 @@ impl Parity for ParityClient where Ok(take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::>()) } + fn pending_transactions_stats(&self) -> Result, Error> { + try!(self.active()); + + let stats = take_weak!(self.sync).transactions_stats(); + Ok(stats.into_iter() + .map(|(hash, stats)| (hash.into(), stats.into())) + .collect() + ) + } + fn signer_port(&self) -> Result { try!(self.active()); diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index e0f811fc0..24be33417 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,8 +16,9 @@ //! Test implementation of SyncProvider. -use util::{RwLock}; -use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo}; +use std::collections::BTreeMap; +use util::{H256, RwLock}; +use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo, TransactionStats}; /// TestSyncProvider config. pub struct Config { @@ -74,7 +75,7 @@ impl SyncProvider for TestSyncProvider { PeerInfo { id: Some("node1".to_owned()), client_version: "Parity/1".to_owned(), - capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], + capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], remote_address: "127.0.0.1:7777".to_owned(), local_address: "127.0.0.1:8888".to_owned(), eth_version: 62, @@ -84,7 +85,7 @@ impl SyncProvider for TestSyncProvider { PeerInfo { id: None, client_version: "Parity/2".to_owned(), - capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], + capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], remote_address: "Handshake".to_owned(), local_address: "127.0.0.1:3333".to_owned(), eth_version: 64, @@ -97,5 +98,22 @@ impl SyncProvider for TestSyncProvider { fn enode(&self) -> Option { None } + + fn transactions_stats(&self) -> BTreeMap { + map![ + 1.into() => TransactionStats { + first_seen: 10, + propagated_to: map![ + 128.into() => 16 + ] + }, + 5.into() => TransactionStats { + first_seen: 16, + propagated_to: map![ + 16.into() => 1 + ] + } + ] + } } diff --git a/rpc/src/v1/tests/mocked/parity.rs b/rpc/src/v1/tests/mocked/parity.rs index b5c8187c7..8c06d4c7d 100644 --- a/rpc/src/v1/tests/mocked/parity.rs +++ b/rpc/src/v1/tests/mocked/parity.rs @@ -355,3 +355,15 @@ fn rpc_parity_next_nonce() { assert_eq!(io1.handle_request_sync(&request), Some(response1.to_owned())); assert_eq!(io2.handle_request_sync(&request), Some(response2.to_owned())); } + +#[test] +fn rpc_parity_transactions_stats() { + let deps = Dependencies::new(); + let io = deps.default_client(); + + let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#; + + assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); +} + diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index f8c219a89..946da8149 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -19,7 +19,7 @@ use jsonrpc_core::Error; use std::collections::BTreeMap; use v1::helpers::auto_args::Wrap; -use v1::types::{H160, H256, H512, U256, Bytes, Peers, Transaction, RpcSettings, Histogram}; +use v1::types::{H160, H256, H512, U256, Bytes, Peers, Transaction, RpcSettings, Histogram, TransactionStats}; build_rpc_trait! { /// Parity-specific rpc interface. @@ -115,6 +115,10 @@ build_rpc_trait! { #[rpc(name = "parity_pendingTransactions")] fn pending_transactions(&self) -> Result, Error>; + /// Returns propagation statistics on transactions pending in the queue. + #[rpc(name = "parity_pendingTransactionsStats")] + fn pending_transactions_stats(&self) -> Result, Error>; + /// Returns current Trusted Signer port or an error if signer is disabled. #[rpc(name = "parity_signerPort")] fn signer_port(&self) -> Result; diff --git a/rpc/src/v1/types/mod.rs.in b/rpc/src/v1/types/mod.rs.in index b1e4ae2c9..29e50aae5 100644 --- a/rpc/src/v1/types/mod.rs.in +++ b/rpc/src/v1/types/mod.rs.in @@ -43,7 +43,7 @@ pub use self::filter::{Filter, FilterChanges}; pub use self::hash::{H64, H160, H256, H512, H520, H2048}; pub use self::index::Index; pub use self::log::Log; -pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo}; +pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo, TransactionStats}; pub use self::transaction::Transaction; pub use self::transaction_request::TransactionRequest; pub use self::receipt::Receipt; diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index a0f61e799..6f8938be9 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -14,9 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethsync::PeerInfo as SyncPeerInfo; +use std::collections::BTreeMap; +use ethsync::{PeerInfo as SyncPeerInfo, TransactionStats as SyncTransactionStats}; use serde::{Serialize, Serializer}; -use v1::types::U256; +use v1::types::{U256, H512}; /// Sync info #[derive(Default, Debug, Serialize, PartialEq)] @@ -117,8 +118,19 @@ impl Serialize for SyncStatus { } } +/// Propagation statistics for pending transaction. +#[derive(Default, Debug, Serialize)] +pub struct TransactionStats { + /// Block no this transaction was first seen. + #[serde(rename="firstSeen")] + pub first_seen: u64, + /// Peers this transaction was propagated to with count. + #[serde(rename="propagatedTo")] + pub propagated_to: BTreeMap, +} + impl From for PeerInfo { - fn from(p: SyncPeerInfo) -> PeerInfo { + fn from(p: SyncPeerInfo) -> Self { PeerInfo { id: p.id, name: p.client_version, @@ -138,10 +150,23 @@ impl From for PeerInfo { } } +impl From for TransactionStats { + fn from(s: SyncTransactionStats) -> Self { + TransactionStats { + first_seen: s.first_seen, + propagated_to: s.propagated_to + .into_iter() + .map(|(id, count)| (id.into(), count)) + .collect() + } + } +} + #[cfg(test)] mod tests { use serde_json; - use super::{SyncInfo, SyncStatus, Peers}; + use std::collections::BTreeMap; + use super::{SyncInfo, SyncStatus, Peers, TransactionStats}; #[test] fn test_serialize_sync_info() { @@ -176,4 +201,17 @@ mod tests { let serialized = serde_json::to_string(&t).unwrap(); assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#) } + + #[test] + fn test_serialize_transaction_stats() { + let stats = TransactionStats { + first_seen: 100, + propagated_to: map![ + 10.into() => 50 + ] + }; + + let serialized = serde_json::to_string(&stats).unwrap(); + assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#) + } } diff --git a/sync/build.rs b/sync/build.rs index c465d5e34..db881e328 100644 --- a/sync/build.rs +++ b/sync/build.rs @@ -18,4 +18,5 @@ extern crate ethcore_ipc_codegen; fn main() { ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", cfg!(feature="ipc")).unwrap(); + ethcore_ipc_codegen::derive_ipc_cond("src/transactions_stats.rs", cfg!(feature="ipc")).unwrap(); } diff --git a/sync/src/api.rs b/sync/src/api.rs index d9dbbd263..3191483e4 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -15,13 +15,13 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::collections::HashMap; +use std::collections::{HashMap, BTreeMap}; use std::io; use util::Bytes; use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError, AllowIP as NetworkAllowIP}; -use util::{U256, H256}; +use util::{U256, H256, H512}; use io::{TimerToken}; use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::snapshot::SnapshotService; @@ -76,6 +76,16 @@ pub trait SyncProvider: Send + Sync { /// Get the enode if available. fn enode(&self) -> Option; + + /// Returns propagation count for pending transactions. + fn transactions_stats(&self) -> BTreeMap; +} + +/// Transaction stats +#[derive(Debug, Binary)] +pub struct TransactionStats { + pub first_seen: u64, + pub propagated_to: BTreeMap, } /// Peer connection information @@ -150,6 +160,14 @@ impl SyncProvider for EthSync { fn enode(&self) -> Option { self.network.external_url() } + + fn transactions_stats(&self) -> BTreeMap { + let sync = self.handler.sync.read(); + sync.transactions_stats() + .iter() + .map(|(hash, stats)| (*hash, stats.into())) + .collect() + } } struct SyncProtocolHandler { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6f2605b27..d255949b9 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -102,7 +102,7 @@ use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as Do use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; -use transactions_stats::TransactionsStats; +use transactions_stats::{TransactionsStats, Stats as TransactionStats}; known_heap_size!(0, PeerInfo); @@ -412,6 +412,11 @@ impl ChainSync { .collect() } + /// Returns transactions propagation statistics + pub fn transactions_stats(&self) -> &H256FastMap { + self.transactions_stats.stats() + } + /// Abort all sync activity pub fn abort(&mut self, io: &mut SyncIo) { self.reset_and_continue(io); @@ -1877,6 +1882,7 @@ impl ChainSync { // sqrt(x)/x scaled to max u32 let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let small = self.peers.len() < MIN_PEERS_PROPAGATION; + let block_number = io.chain().chain_info().best_block_number; let lucky_peers = { let stats = &mut self.transactions_stats; @@ -1889,7 +1895,7 @@ impl ChainSync { // update stats for hash in &all_transactions_hashes { let id = io.peer_session_info(*peer_id).and_then(|info| info.id); - stats.propagated(*hash, id); + stats.propagated(*hash, id, block_number); } peer_info.last_sent_transactions = all_transactions_hashes.clone(); return Some((*peer_id, all_transactions_rlp.clone())); @@ -1907,8 +1913,8 @@ impl ChainSync { if to_send.contains(&tx.hash()) { packet.append(tx); // update stats - let peer_id = io.peer_session_info(*peer_id).and_then(|info| info.id); - stats.propagated(tx.hash(), peer_id); + let id = io.peer_session_info(*peer_id).and_then(|info| info.id); + stats.propagated(tx.hash(), id, block_number); } } @@ -2362,6 +2368,21 @@ mod tests { assert_eq!(0x02, io.queue[1].packet_id); } + #[test] + fn should_maintain_transations_propagation_stats() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, EachBlockWith::Uncle); + client.insert_transaction_to_queue(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client); + let mut queue = VecDeque::new(); + let ss = TestSnapshotService::new(); + let mut io = TestIo::new(&mut client, &ss, &mut queue, None); + sync.propagate_new_transactions(&mut io); + + let stats = sync.transactions_stats(); + assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.") + } + #[test] fn handles_peer_new_block_malformed() { let mut client = TestBlockChainClient::new(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 25350d410..2061e4e3a 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -62,7 +62,7 @@ mod api { } pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, - ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP}; + ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats}; pub use chain::{SyncStatus, SyncState}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; diff --git a/sync/src/transactions_stats.rs b/sync/src/transactions_stats.rs index ed0a2aedd..8c5eb6dda 100644 --- a/sync/src/transactions_stats.rs +++ b/sync/src/transactions_stats.rs @@ -14,8 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Transaction Stats - +use api::TransactionStats; use std::collections::{HashSet, HashMap}; use util::{H256, H512}; use util::hash::H256FastMap; @@ -23,12 +22,33 @@ use util::hash::H256FastMap; type NodeId = H512; type BlockNumber = u64; -#[derive(Debug, Default, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct Stats { first_seen: BlockNumber, propagated_to: HashMap, } +impl Stats { + pub fn new(number: BlockNumber) -> Self { + Stats { + first_seen: number, + propagated_to: Default::default(), + } + } +} + +impl<'a> From<&'a Stats> for TransactionStats { + fn from(other: &'a Stats) -> Self { + TransactionStats { + first_seen: other.first_seen, + propagated_to: other.propagated_to + .iter() + .map(|(hash, size)| (*hash, *size)) + .collect(), + } + } +} + #[derive(Debug, Default)] pub struct TransactionsStats { pending_transactions: H256FastMap, @@ -36,18 +56,23 @@ pub struct TransactionsStats { impl TransactionsStats { /// Increases number of propagations to given `enodeid`. - pub fn propagated(&mut self, hash: H256, enode_id: Option) { + pub fn propagated(&mut self, hash: H256, enode_id: Option, current_block_num: BlockNumber) { let enode_id = enode_id.unwrap_or_default(); - let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::default()); + let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num)); let mut count = stats.propagated_to.entry(enode_id).or_insert(0); *count = count.saturating_add(1); } /// Returns propagation stats for given hash or `None` if hash is not known. - pub fn stats(&self, hash: &H256) -> Option<&Stats> { + #[cfg(test)] + pub fn get(&self, hash: &H256) -> Option<&Stats> { self.pending_transactions.get(hash) } + pub fn stats(&self) -> &H256FastMap { + &self.pending_transactions + } + /// Retains only transactions present in given `HashSet`. pub fn retain(&mut self, hashes: &HashSet) { let to_remove = self.pending_transactions.keys() @@ -76,14 +101,14 @@ mod tests { let enodeid2 = 5.into(); // when - stats.propagated(hash, Some(enodeid1)); - stats.propagated(hash, Some(enodeid1)); - stats.propagated(hash, Some(enodeid2)); + stats.propagated(hash, Some(enodeid1), 5); + stats.propagated(hash, Some(enodeid1), 10); + stats.propagated(hash, Some(enodeid2), 15); // then - let stats = stats.stats(&hash); + let stats = stats.get(&hash); assert_eq!(stats, Some(&Stats { - first_seen: 0, + first_seen: 5, propagated_to: hash_map![ enodeid1 => 2, enodeid2 => 1 @@ -97,13 +122,13 @@ mod tests { let mut stats = TransactionsStats::default(); let hash = 5.into(); let enodeid1 = 5.into(); - stats.propagated(hash, Some(enodeid1)); + stats.propagated(hash, Some(enodeid1), 10); // when stats.retain(&HashSet::new()); // then - let stats = stats.stats(&hash); + let stats = stats.get(&hash); assert_eq!(stats, None); } }