Stats RPC

This commit is contained in:
Tomasz Drwięga 2016-11-16 13:37:21 +01:00
parent 4febd0eb93
commit 78b5c743f6
12 changed files with 180 additions and 32 deletions

View File

@ -16,7 +16,7 @@
//! Binary representation of types //! Binary representation of types
use util::{U256, U512, H256, H2048, Address}; use util::{U256, U512, H256, H512, H2048, Address};
use std::mem; use std::mem;
use std::collections::{VecDeque, BTreeMap}; use std::collections::{VecDeque, BTreeMap};
use std::ops::Range; use std::ops::Range;
@ -800,6 +800,7 @@ binary_fixed_size!(bool);
binary_fixed_size!(U256); binary_fixed_size!(U256);
binary_fixed_size!(U512); binary_fixed_size!(U512);
binary_fixed_size!(H256); binary_fixed_size!(H256);
binary_fixed_size!(H512);
binary_fixed_size!(H2048); binary_fixed_size!(H2048);
binary_fixed_size!(Address); binary_fixed_size!(Address);
binary_fixed_size!(BinHandshake); binary_fixed_size!(BinHandshake);

View File

@ -34,7 +34,7 @@ use ethcore::account_provider::AccountProvider;
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::traits::Parity; use v1::traits::Parity;
use v1::types::{Bytes, U256, H160, H256, H512, Peers, Transaction, RpcSettings, Histogram}; use v1::types::{Bytes, U256, H160, H256, H512, Peers, Transaction, RpcSettings, Histogram, TransactionStats};
use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings}; use v1::helpers::{errors, SigningQueue, SignerService, NetworkSettings};
use v1::helpers::dispatch::DEFAULT_MAC; use v1::helpers::dispatch::DEFAULT_MAC;
@ -259,6 +259,16 @@ impl<C, M, S: ?Sized> Parity for ParityClient<C, M, S> where
Ok(take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::<Vec<_>>()) Ok(take_weak!(self.miner).all_transactions().into_iter().map(Into::into).collect::<Vec<_>>())
} }
fn pending_transactions_stats(&self) -> Result<BTreeMap<H256, TransactionStats>, Error> {
try!(self.active());
let stats = take_weak!(self.sync).transactions_stats();
Ok(stats.into_iter()
.map(|(hash, stats)| (hash.into(), stats.into()))
.collect()
)
}
fn signer_port(&self) -> Result<u16, Error> { fn signer_port(&self) -> Result<u16, Error> {
try!(self.active()); try!(self.active());

View File

@ -16,8 +16,9 @@
//! Test implementation of SyncProvider. //! Test implementation of SyncProvider.
use util::{RwLock}; use std::collections::BTreeMap;
use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo}; use util::{H256, RwLock};
use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo, TransactionStats};
/// TestSyncProvider config. /// TestSyncProvider config.
pub struct Config { pub struct Config {
@ -74,7 +75,7 @@ impl SyncProvider for TestSyncProvider {
PeerInfo { PeerInfo {
id: Some("node1".to_owned()), id: Some("node1".to_owned()),
client_version: "Parity/1".to_owned(), client_version: "Parity/1".to_owned(),
capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()],
remote_address: "127.0.0.1:7777".to_owned(), remote_address: "127.0.0.1:7777".to_owned(),
local_address: "127.0.0.1:8888".to_owned(), local_address: "127.0.0.1:8888".to_owned(),
eth_version: 62, eth_version: 62,
@ -84,7 +85,7 @@ impl SyncProvider for TestSyncProvider {
PeerInfo { PeerInfo {
id: None, id: None,
client_version: "Parity/2".to_owned(), client_version: "Parity/2".to_owned(),
capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()],
remote_address: "Handshake".to_owned(), remote_address: "Handshake".to_owned(),
local_address: "127.0.0.1:3333".to_owned(), local_address: "127.0.0.1:3333".to_owned(),
eth_version: 64, eth_version: 64,
@ -97,5 +98,22 @@ impl SyncProvider for TestSyncProvider {
fn enode(&self) -> Option<String> { fn enode(&self) -> Option<String> {
None None
} }
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
map![
1.into() => TransactionStats {
first_seen: 10,
propagated_to: map![
128.into() => 16
]
},
5.into() => TransactionStats {
first_seen: 16,
propagated_to: map![
16.into() => 1
]
}
]
}
} }

View File

@ -355,3 +355,15 @@ fn rpc_parity_next_nonce() {
assert_eq!(io1.handle_request_sync(&request), Some(response1.to_owned())); assert_eq!(io1.handle_request_sync(&request), Some(response1.to_owned()));
assert_eq!(io2.handle_request_sync(&request), Some(response2.to_owned())); assert_eq!(io2.handle_request_sync(&request), Some(response2.to_owned()));
} }
#[test]
fn rpc_parity_transactions_stats() {
let deps = Dependencies::new();
let io = deps.default_client();
let request = r#"{"jsonrpc": "2.0", "method": "parity_pendingTransactionsStats", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"0x0000000000000000000000000000000000000000000000000000000000000001":{"firstSeen":10,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080":16}},"0x0000000000000000000000000000000000000000000000000000000000000005":{"firstSeen":16,"propagatedTo":{"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010":1}}},"id":1}"#;
assert_eq!(io.handle_request_sync(request), Some(response.to_owned()));
}

View File

@ -19,7 +19,7 @@ use jsonrpc_core::Error;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use v1::helpers::auto_args::Wrap; use v1::helpers::auto_args::Wrap;
use v1::types::{H160, H256, H512, U256, Bytes, Peers, Transaction, RpcSettings, Histogram}; use v1::types::{H160, H256, H512, U256, Bytes, Peers, Transaction, RpcSettings, Histogram, TransactionStats};
build_rpc_trait! { build_rpc_trait! {
/// Parity-specific rpc interface. /// Parity-specific rpc interface.
@ -115,6 +115,10 @@ build_rpc_trait! {
#[rpc(name = "parity_pendingTransactions")] #[rpc(name = "parity_pendingTransactions")]
fn pending_transactions(&self) -> Result<Vec<Transaction>, Error>; fn pending_transactions(&self) -> Result<Vec<Transaction>, Error>;
/// Returns propagation statistics on transactions pending in the queue.
#[rpc(name = "parity_pendingTransactionsStats")]
fn pending_transactions_stats(&self) -> Result<BTreeMap<H256, TransactionStats>, Error>;
/// Returns current Trusted Signer port or an error if signer is disabled. /// Returns current Trusted Signer port or an error if signer is disabled.
#[rpc(name = "parity_signerPort")] #[rpc(name = "parity_signerPort")]
fn signer_port(&self) -> Result<u16, Error>; fn signer_port(&self) -> Result<u16, Error>;

View File

@ -43,7 +43,7 @@ pub use self::filter::{Filter, FilterChanges};
pub use self::hash::{H64, H160, H256, H512, H520, H2048}; pub use self::hash::{H64, H160, H256, H512, H520, H2048};
pub use self::index::Index; pub use self::index::Index;
pub use self::log::Log; pub use self::log::Log;
pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo}; pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo, TransactionStats};
pub use self::transaction::Transaction; pub use self::transaction::Transaction;
pub use self::transaction_request::TransactionRequest; pub use self::transaction_request::TransactionRequest;
pub use self::receipt::Receipt; pub use self::receipt::Receipt;

View File

@ -14,9 +14,10 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethsync::PeerInfo as SyncPeerInfo; use std::collections::BTreeMap;
use ethsync::{PeerInfo as SyncPeerInfo, TransactionStats as SyncTransactionStats};
use serde::{Serialize, Serializer}; use serde::{Serialize, Serializer};
use v1::types::U256; use v1::types::{U256, H512};
/// Sync info /// Sync info
#[derive(Default, Debug, Serialize, PartialEq)] #[derive(Default, Debug, Serialize, PartialEq)]
@ -117,8 +118,19 @@ impl Serialize for SyncStatus {
} }
} }
/// Propagation statistics for pending transaction.
#[derive(Default, Debug, Serialize)]
pub struct TransactionStats {
/// Block no this transaction was first seen.
#[serde(rename="firstSeen")]
pub first_seen: u64,
/// Peers this transaction was propagated to with count.
#[serde(rename="propagatedTo")]
pub propagated_to: BTreeMap<H512, usize>,
}
impl From<SyncPeerInfo> for PeerInfo { impl From<SyncPeerInfo> for PeerInfo {
fn from(p: SyncPeerInfo) -> PeerInfo { fn from(p: SyncPeerInfo) -> Self {
PeerInfo { PeerInfo {
id: p.id, id: p.id,
name: p.client_version, name: p.client_version,
@ -138,10 +150,23 @@ impl From<SyncPeerInfo> for PeerInfo {
} }
} }
impl From<SyncTransactionStats> for TransactionStats {
fn from(s: SyncTransactionStats) -> Self {
TransactionStats {
first_seen: s.first_seen,
propagated_to: s.propagated_to
.into_iter()
.map(|(id, count)| (id.into(), count))
.collect()
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use serde_json; use serde_json;
use super::{SyncInfo, SyncStatus, Peers}; use std::collections::BTreeMap;
use super::{SyncInfo, SyncStatus, Peers, TransactionStats};
#[test] #[test]
fn test_serialize_sync_info() { fn test_serialize_sync_info() {
@ -176,4 +201,17 @@ mod tests {
let serialized = serde_json::to_string(&t).unwrap(); let serialized = serde_json::to_string(&t).unwrap();
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#) assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#)
} }
#[test]
fn test_serialize_transaction_stats() {
let stats = TransactionStats {
first_seen: 100,
propagated_to: map![
10.into() => 50
]
};
let serialized = serde_json::to_string(&stats).unwrap();
assert_eq!(serialized, r#"{"firstSeen":100,"propagatedTo":{"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a":50}}"#)
}
} }

View File

@ -18,4 +18,5 @@ extern crate ethcore_ipc_codegen;
fn main() { fn main() {
ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", cfg!(feature="ipc")).unwrap(); ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", cfg!(feature="ipc")).unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/transactions_stats.rs", cfg!(feature="ipc")).unwrap();
} }

View File

@ -15,13 +15,13 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap; use std::collections::{HashMap, BTreeMap};
use std::io; use std::io;
use util::Bytes; use util::Bytes;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId, use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError,
AllowIP as NetworkAllowIP}; AllowIP as NetworkAllowIP};
use util::{U256, H256}; use util::{U256, H256, H512};
use io::{TimerToken}; use io::{TimerToken};
use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::client::{BlockChainClient, ChainNotify};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
@ -76,6 +76,16 @@ pub trait SyncProvider: Send + Sync {
/// Get the enode if available. /// Get the enode if available.
fn enode(&self) -> Option<String>; fn enode(&self) -> Option<String>;
/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats>;
}
/// Transaction stats
#[derive(Debug, Binary)]
pub struct TransactionStats {
pub first_seen: u64,
pub propagated_to: BTreeMap<H512, usize>,
} }
/// Peer connection information /// Peer connection information
@ -150,6 +160,14 @@ impl SyncProvider for EthSync {
fn enode(&self) -> Option<String> { fn enode(&self) -> Option<String> {
self.network.external_url() self.network.external_url()
} }
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.handler.sync.read();
sync.transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
.collect()
}
} }
struct SyncProtocolHandler { struct SyncProtocolHandler {

View File

@ -102,7 +102,7 @@ use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as Do
use snapshot::{Snapshot, ChunkType}; use snapshot::{Snapshot, ChunkType};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
use transactions_stats::TransactionsStats; use transactions_stats::{TransactionsStats, Stats as TransactionStats};
known_heap_size!(0, PeerInfo); known_heap_size!(0, PeerInfo);
@ -412,6 +412,11 @@ impl ChainSync {
.collect() .collect()
} }
/// Returns transactions propagation statistics
pub fn transactions_stats(&self) -> &H256FastMap<TransactionStats> {
self.transactions_stats.stats()
}
/// Abort all sync activity /// Abort all sync activity
pub fn abort(&mut self, io: &mut SyncIo) { pub fn abort(&mut self, io: &mut SyncIo) {
self.reset_and_continue(io); self.reset_and_continue(io);
@ -1877,6 +1882,7 @@ impl ChainSync {
// sqrt(x)/x scaled to max u32 // sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let small = self.peers.len() < MIN_PEERS_PROPAGATION; let small = self.peers.len() < MIN_PEERS_PROPAGATION;
let block_number = io.chain().chain_info().best_block_number;
let lucky_peers = { let lucky_peers = {
let stats = &mut self.transactions_stats; let stats = &mut self.transactions_stats;
@ -1889,7 +1895,7 @@ impl ChainSync {
// update stats // update stats
for hash in &all_transactions_hashes { for hash in &all_transactions_hashes {
let id = io.peer_session_info(*peer_id).and_then(|info| info.id); let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
stats.propagated(*hash, id); stats.propagated(*hash, id, block_number);
} }
peer_info.last_sent_transactions = all_transactions_hashes.clone(); peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((*peer_id, all_transactions_rlp.clone())); return Some((*peer_id, all_transactions_rlp.clone()));
@ -1907,8 +1913,8 @@ impl ChainSync {
if to_send.contains(&tx.hash()) { if to_send.contains(&tx.hash()) {
packet.append(tx); packet.append(tx);
// update stats // update stats
let peer_id = io.peer_session_info(*peer_id).and_then(|info| info.id); let id = io.peer_session_info(*peer_id).and_then(|info| info.id);
stats.propagated(tx.hash(), peer_id); stats.propagated(tx.hash(), id, block_number);
} }
} }
@ -2362,6 +2368,21 @@ mod tests {
assert_eq!(0x02, io.queue[1].packet_id); assert_eq!(0x02, io.queue[1].packet_id);
} }
#[test]
fn should_maintain_transations_propagation_stats() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
let mut queue = VecDeque::new();
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
sync.propagate_new_transactions(&mut io);
let stats = sync.transactions_stats();
assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.")
}
#[test] #[test]
fn handles_peer_new_block_malformed() { fn handles_peer_new_block_malformed() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();

View File

@ -62,7 +62,7 @@ mod api {
} }
pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig,
ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP}; ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats};
pub use chain::{SyncStatus, SyncState}; pub use chain::{SyncStatus, SyncState};
pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};

View File

@ -14,8 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Transaction Stats use api::TransactionStats;
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use util::{H256, H512}; use util::{H256, H512};
use util::hash::H256FastMap; use util::hash::H256FastMap;
@ -23,12 +22,33 @@ use util::hash::H256FastMap;
type NodeId = H512; type NodeId = H512;
type BlockNumber = u64; type BlockNumber = u64;
#[derive(Debug, Default, PartialEq)] #[derive(Debug, PartialEq, Clone)]
pub struct Stats { pub struct Stats {
first_seen: BlockNumber, first_seen: BlockNumber,
propagated_to: HashMap<NodeId, usize>, propagated_to: HashMap<NodeId, usize>,
} }
impl Stats {
pub fn new(number: BlockNumber) -> Self {
Stats {
first_seen: number,
propagated_to: Default::default(),
}
}
}
impl<'a> From<&'a Stats> for TransactionStats {
fn from(other: &'a Stats) -> Self {
TransactionStats {
first_seen: other.first_seen,
propagated_to: other.propagated_to
.iter()
.map(|(hash, size)| (*hash, *size))
.collect(),
}
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct TransactionsStats { pub struct TransactionsStats {
pending_transactions: H256FastMap<Stats>, pending_transactions: H256FastMap<Stats>,
@ -36,18 +56,23 @@ pub struct TransactionsStats {
impl TransactionsStats { impl TransactionsStats {
/// Increases number of propagations to given `enodeid`. /// Increases number of propagations to given `enodeid`.
pub fn propagated(&mut self, hash: H256, enode_id: Option<NodeId>) { pub fn propagated(&mut self, hash: H256, enode_id: Option<NodeId>, current_block_num: BlockNumber) {
let enode_id = enode_id.unwrap_or_default(); let enode_id = enode_id.unwrap_or_default();
let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::default()); let mut stats = self.pending_transactions.entry(hash).or_insert_with(|| Stats::new(current_block_num));
let mut count = stats.propagated_to.entry(enode_id).or_insert(0); let mut count = stats.propagated_to.entry(enode_id).or_insert(0);
*count = count.saturating_add(1); *count = count.saturating_add(1);
} }
/// Returns propagation stats for given hash or `None` if hash is not known. /// Returns propagation stats for given hash or `None` if hash is not known.
pub fn stats(&self, hash: &H256) -> Option<&Stats> { #[cfg(test)]
pub fn get(&self, hash: &H256) -> Option<&Stats> {
self.pending_transactions.get(hash) self.pending_transactions.get(hash)
} }
pub fn stats(&self) -> &H256FastMap<Stats> {
&self.pending_transactions
}
/// Retains only transactions present in given `HashSet`. /// Retains only transactions present in given `HashSet`.
pub fn retain(&mut self, hashes: &HashSet<H256>) { pub fn retain(&mut self, hashes: &HashSet<H256>) {
let to_remove = self.pending_transactions.keys() let to_remove = self.pending_transactions.keys()
@ -76,14 +101,14 @@ mod tests {
let enodeid2 = 5.into(); let enodeid2 = 5.into();
// when // when
stats.propagated(hash, Some(enodeid1)); stats.propagated(hash, Some(enodeid1), 5);
stats.propagated(hash, Some(enodeid1)); stats.propagated(hash, Some(enodeid1), 10);
stats.propagated(hash, Some(enodeid2)); stats.propagated(hash, Some(enodeid2), 15);
// then // then
let stats = stats.stats(&hash); let stats = stats.get(&hash);
assert_eq!(stats, Some(&Stats { assert_eq!(stats, Some(&Stats {
first_seen: 0, first_seen: 5,
propagated_to: hash_map![ propagated_to: hash_map![
enodeid1 => 2, enodeid1 => 2,
enodeid2 => 1 enodeid2 => 1
@ -97,13 +122,13 @@ mod tests {
let mut stats = TransactionsStats::default(); let mut stats = TransactionsStats::default();
let hash = 5.into(); let hash = 5.into();
let enodeid1 = 5.into(); let enodeid1 = 5.into();
stats.propagated(hash, Some(enodeid1)); stats.propagated(hash, Some(enodeid1), 10);
// when // when
stats.retain(&HashSet::new()); stats.retain(&HashSet::new());
// then // then
let stats = stats.stats(&hash); let stats = stats.get(&hash);
assert_eq!(stats, None); assert_eq!(stats, None);
} }
} }