From c9ce25c8f337ec7ab6e1c2751823444551c8303a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 12 Oct 2016 21:18:59 +0300 Subject: [PATCH] Added peers details to ethcore_netPeers RPC (#2580) * added peers details to ethcore_netPeers RPC * fixed build (traits autoimplemented) * - documentation fixes - spaces -> tabs - Rust-way Option's handling * prepare for new protocols in ethcore_netPeers * commas & documentation --- rpc/src/v1/impls/ethcore.rs | 5 +- rpc/src/v1/tests/helpers/sync_provider.rs | 27 ++++++++- rpc/src/v1/tests/mocked/ethcore.rs | 7 ++- rpc/src/v1/types/mod.rs.in | 2 +- rpc/src/v1/types/sync.rs | 72 ++++++++++++++++++++++- sync/src/api.rs | 32 ++++++++++ sync/src/chain.rs | 22 ++++++- sync/src/lib.rs | 2 +- sync/src/sync_io.rs | 10 ++-- sync/src/tests/helpers.rs | 4 ++ util/network/src/connection.rs | 5 ++ util/network/src/host.rs | 22 ++++--- util/network/src/lib.rs | 1 + util/network/src/service.rs | 7 +++ util/network/src/session.rs | 28 +++++++-- util/network/src/tests.rs | 2 +- 16 files changed, 224 insertions(+), 24 deletions(-) diff --git a/rpc/src/v1/impls/ethcore.rs b/rpc/src/v1/impls/ethcore.rs index a63d33f52..3b756342d 100644 --- a/rpc/src/v1/impls/ethcore.rs +++ b/rpc/src/v1/impls/ethcore.rs @@ -165,13 +165,16 @@ impl Ethcore for EthcoreClient where fn net_peers(&self) -> Result { try!(self.active()); - let sync_status = take_weak!(self.sync).status(); + let sync = take_weak!(self.sync); + let sync_status = sync.status(); let net_config = take_weak!(self.net).network_config(); + let peers = sync.peers().into_iter().map(Into::into).collect(); Ok(Peers { active: sync_status.num_active_peers, connected: sync_status.num_peers, max: sync_status.current_max_peers(net_config.min_peers, net_config.max_peers), + peers: peers }) } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index b83aff758..74013660f 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -17,7 +17,7 @@ //! Test implementation of SyncProvider. use util::{RwLock, U256}; -use ethsync::{SyncProvider, SyncStatus, SyncState}; +use ethsync::{SyncProvider, SyncStatus, SyncState, PeerInfo}; /// TestSyncProvider config. pub struct Config { @@ -60,5 +60,30 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.read().clone() } + + fn peers(&self) -> Vec { + vec![ + PeerInfo { + id: Some("node1".to_owned()), + client_version: "Parity/1".to_owned(), + capabilities: vec!["eth/62".to_owned(), "eth/63".to_owned()], + remote_address: "127.0.0.1:7777".to_owned(), + local_address: "127.0.0.1:8888".to_owned(), + eth_version: 62, + eth_difficulty: Some(40.into()), + eth_head: 50.into() + }, + PeerInfo { + id: None, + client_version: "Parity/2".to_owned(), + capabilities: vec!["eth/63".to_owned(), "eth/64".to_owned()], + remote_address: "Handshake".to_owned(), + local_address: "127.0.0.1:3333".to_owned(), + eth_version: 64, + eth_difficulty: None, + eth_head: 60.into() + } + ] + } } diff --git a/rpc/src/v1/tests/mocked/ethcore.rs b/rpc/src/v1/tests/mocked/ethcore.rs index 3dc02e929..f09d84d5b 100644 --- a/rpc/src/v1/tests/mocked/ethcore.rs +++ b/rpc/src/v1/tests/mocked/ethcore.rs @@ -208,7 +208,12 @@ fn rpc_ethcore_net_peers() { io.add_delegate(ethcore_client(&client, &miner, &sync, &net).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPeers", "params":[], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":{"active":0,"connected":120,"max":50},"id":1}"#; + let response = "{\"jsonrpc\":\"2.0\",\"result\":{\"active\":0,\"connected\":120,\"max\":50,\"peers\":[{\"caps\":[\"eth/62\",\"eth/63\"],\ +\"id\":\"node1\",\"name\":\"Parity/1\",\"network\":{\"localAddress\":\"127.0.0.1:8888\",\"remoteAddress\":\"127.0.0.1:7777\"}\ +,\"protocols\":{\"eth\":{\"difficulty\":\"0x28\",\"head\":\"0000000000000000000000000000000000000000000000000000000000000032\"\ +,\"version\":62}}},{\"caps\":[\"eth/63\",\"eth/64\"],\"id\":null,\"name\":\"Parity/2\",\"network\":{\"localAddress\":\ +\"127.0.0.1:3333\",\"remoteAddress\":\"Handshake\"},\"protocols\":{\"eth\":{\"difficulty\":null,\"head\":\ +\"000000000000000000000000000000000000000000000000000000000000003c\",\"version\":64}}}]},\"id\":1}"; assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/mod.rs.in b/rpc/src/v1/types/mod.rs.in index 8aaf90eab..4a192ac36 100644 --- a/rpc/src/v1/types/mod.rs.in +++ b/rpc/src/v1/types/mod.rs.in @@ -42,7 +42,7 @@ pub use self::filter::{Filter, FilterChanges}; pub use self::hash::{H64, H160, H256, H512, H520, H2048}; pub use self::index::Index; pub use self::log::Log; -pub use self::sync::{SyncStatus, SyncInfo, Peers}; +pub use self::sync::{SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, PeerEthereumProtocolInfo}; pub use self::transaction::Transaction; pub use self::transaction_request::TransactionRequest; pub use self::receipt::Receipt; diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index 28ef3101d..9a7f733c1 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use ethsync::PeerInfo as SyncPeerInfo; use serde::{Serialize, Serializer}; use v1::types::U256; @@ -32,7 +33,7 @@ pub struct SyncInfo { } /// Peers info -#[derive(Default, Debug, Serialize, PartialEq)] +#[derive(Default, Debug, Serialize)] pub struct Peers { /// Number of active peers pub active: usize, @@ -40,6 +41,52 @@ pub struct Peers { pub connected: usize, /// Max number of peers pub max: u32, + /// Detailed information on peers + pub peers: Vec, +} + +/// Peer connection information +#[derive(Default, Debug, Serialize)] +pub struct PeerInfo { + /// Public node id + pub id: Option, + /// Node client ID + pub name: String, + /// Capabilities + pub caps: Vec, + /// Network information + pub network: PeerNetworkInfo, + /// Protocols information + pub protocols: PeerProtocolsInfo, +} + +/// Peer network information +#[derive(Default, Debug, Serialize)] +pub struct PeerNetworkInfo { + /// Remote endpoint address + #[serde(rename="remoteAddress")] + pub remote_address: String, + /// Local endpoint address + #[serde(rename="localAddress")] + pub local_address: String, +} + +/// Peer protocols information +#[derive(Default, Debug, Serialize)] +pub struct PeerProtocolsInfo { + /// Ethereum protocol information + pub eth: Option, +} + +/// Peer Ethereum protocol information +#[derive(Default, Debug, Serialize)] +pub struct PeerEthereumProtocolInfo { + /// Negotiated ethereum protocol version + pub version: u32, + /// Peer total difficulty if known + pub difficulty: Option, + /// SHA3 of peer best block hash + pub head: String, } /// Sync status @@ -61,6 +108,27 @@ impl Serialize for SyncStatus { } } +impl From for PeerInfo { + fn from(p: SyncPeerInfo) -> PeerInfo { + PeerInfo { + id: p.id, + name: p.client_version, + caps: p.capabilities, + network: PeerNetworkInfo { + remote_address: p.remote_address, + local_address: p.local_address, + }, + protocols: PeerProtocolsInfo { + eth: Some(PeerEthereumProtocolInfo { + version: p.eth_version, + difficulty: p.eth_difficulty.map(|d| d.into()), + head: p.eth_head.hex(), + }) + }, + } + } +} + #[cfg(test)] mod tests { use serde_json; @@ -77,7 +145,7 @@ mod tests { fn test_serialize_peers() { let t = Peers::default(); let serialized = serde_json::to_string(&t).unwrap(); - assert_eq!(serialized, r#"{"active":0,"connected":0,"max":0}"#); + assert_eq!(serialized, r#"{"active":0,"connected":0,"max":0,"peers":[]}"#); } #[test] diff --git a/sync/src/api.rs b/sync/src/api.rs index c09157e3b..54dfc91b7 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -61,6 +61,30 @@ binary_fixed_size!(SyncStatus); pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; + + /// Get peers information + fn peers(&self) -> Vec; +} + +/// Peer connection information +#[derive(Debug, Binary)] +pub struct PeerInfo { + /// Public node id + pub id: Option, + /// Node client ID + pub client_version: String, + /// Capabilities + pub capabilities: Vec, + /// Remote endpoint address + pub remote_address: String, + /// Local endpoint address + pub local_address: String, + /// Ethereum protocol version + pub eth_version: u32, + /// SHA3 of peer best block hash + pub eth_head: H256, + /// Peer total difficulty if known + pub eth_difficulty: Option, } /// Ethereum network protocol handler @@ -94,6 +118,14 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.handler.sync.write().status() } + + /// Get sync peers + fn peers(&self) -> Vec { + self.network.with_context_eval(self.subprotocol_name, |context| { + let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service); + self.handler.sync.write().peers(&sync_io) + }).unwrap_or(Vec::new()) + } } struct SyncProtocolHandler { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 446fd5499..ee2e90800 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -102,6 +102,7 @@ use super::SyncConfig; use blocks::BlockCollection; use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; +use api::PeerInfo as PeerInfoDigest; known_heap_size!(0, PeerInfo); @@ -346,7 +347,7 @@ impl ChainSync { } } - /// @returns Synchonization status + /// Returns synchonization status pub fn status(&self) -> SyncStatus { SyncStatus { state: self.state.clone(), @@ -368,6 +369,25 @@ impl ChainSync { } } + /// Returns information on peers connections + pub fn peers(&self, io: &SyncIo) -> Vec { + self.peers.iter() + .filter_map(|(&peer_id, ref peer_data)| + io.peer_session_info(peer_id).map(|session_info| + PeerInfoDigest { + id: session_info.id.map(|id| id.hex()), + client_version: session_info.client_version, + capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), + remote_address: session_info.remote_address, + local_address: session_info.local_address, + eth_version: peer_data.protocol_version, + eth_difficulty: peer_data.difficulty, + eth_head: peer_data.latest_hash, + }) + ) + .collect() + } + /// Abort all sync activity pub fn abort(&mut self, io: &mut SyncIo) { self.restart(io); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index af566772e..a4c29f166 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -60,7 +60,7 @@ mod api { } pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNetwork, SyncConfig, - ServiceConfiguration, NetworkConfiguration}; + ServiceConfiguration, NetworkConfiguration, PeerInfo}; pub use chain::{SyncStatus, SyncState}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError}; diff --git a/sync/src/sync_io.rs b/sync/src/sync_io.rs index 445939399..52118b710 100644 --- a/sync/src/sync_io.rs +++ b/sync/src/sync_io.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use network::{NetworkContext, PeerId, PacketId, NetworkError}; +use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo}; use ethcore::client::BlockChainClient; use ethcore::snapshot::SnapshotService; @@ -34,10 +34,12 @@ pub trait SyncIo { fn chain(&self) -> &BlockChainClient; /// Get the snapshot service. fn snapshot_service(&self) -> &SnapshotService; - /// Returns peer client identifier string + /// Returns peer identifier string fn peer_info(&self, peer_id: PeerId) -> String { peer_id.to_string() } + /// Returns information on p2p session + fn peer_session_info(&self, peer_id: PeerId) -> Option; /// Maximum mutuallt supported ETH protocol version fn eth_protocol_version(&self, peer_id: PeerId) -> u8; /// Returns if the chain block queue empty @@ -91,8 +93,8 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { self.snapshot_service } - fn peer_info(&self, peer_id: PeerId) -> String { - self.network.peer_info(peer_id) + fn peer_session_info(&self, peer_id: PeerId) -> Option { + self.network.session_info(peer_id) } fn is_expired(&self) -> bool { diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 3558e5578..fcc8f002e 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -83,6 +83,10 @@ impl<'p> SyncIo for TestIo<'p> { self.snapshot_service } + fn peer_session_info(&self, _peer_id: PeerId) -> Option { + None + } + fn eth_protocol_version(&self, _peer: PeerId) -> u8 { 64 } diff --git a/util/network/src/connection.rs b/util/network/src/connection.rs index 4a7b5a3c7..456c35e69 100644 --- a/util/network/src/connection.rs +++ b/util/network/src/connection.rs @@ -191,6 +191,11 @@ impl Connection { self.socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "Unknown".to_owned()) } + /// Get local peer address string + pub fn local_addr_str(&self) -> String { + self.socket.local_addr().map(|a| a.to_string()).unwrap_or_else(|_| "Unknown".to_owned()) + } + /// Clone this connection. Clears the receiving buffer of the returned connection. pub fn try_clone(&self) -> io::Result { Ok(Connection { diff --git a/util/network/src/host.rs b/util/network/src/host.rs index a0d0a081a..f5c14b0f9 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -31,7 +31,7 @@ use util::hash::*; use util::Hashable; use util::version; use rlp::*; -use session::{Session, SessionData}; +use session::{Session, SessionInfo, SessionData}; use error::*; use io::*; use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION}; @@ -280,12 +280,13 @@ impl<'s> NetworkContext<'s> { } /// Returns peer identification string - pub fn peer_info(&self, peer: PeerId) -> String { - let session = self.resolve_session(peer); - if let Some(session) = session { - return session.lock().info.client_version.clone() - } - "unknown".to_owned() + pub fn peer_client_version(&self, peer: PeerId) -> String { + self.resolve_session(peer).map_or("unknown".to_owned(), |s| s.lock().info.client_version.clone()) + } + + /// Returns information on p2p session + pub fn session_info(&self, peer: PeerId) -> Option { + self.resolve_session(peer).map(|s| s.lock().info.clone()) } /// Returns max version for a given protocol. @@ -918,6 +919,13 @@ impl Host { let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); action(&context); } + + pub fn with_context_eval(&self, protocol: ProtocolId, io: &IoContext, action: F) -> T where F: Fn(&NetworkContext) -> T { + let reserved = { self.reserved_nodes.read() }; + + let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); + action(&context) + } } impl IoHandler for Host { diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index cd0336823..50396b6fb 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -99,6 +99,7 @@ pub use host::NetworkIoMessage; pub use error::NetworkError; pub use host::NetworkConfiguration; pub use stats::NetworkStats; +pub use session::SessionInfo; use io::TimerToken; pub use node_table::is_valid_node_url; diff --git a/util/network/src/service.rs b/util/network/src/service.rs index 3ab1f0301..0b59f8bc7 100644 --- a/util/network/src/service.rs +++ b/util/network/src/service.rs @@ -178,6 +178,13 @@ impl NetworkService { host.with_context(protocol, &io, action); }; } + + /// Evaluates function in the network context + pub fn with_context_eval(&self, protocol: ProtocolId, action: F) -> Option where F: Fn(&NetworkContext) -> T { + let io = IoContext::new(self.io_service.channel(), 0); + let host = self.host.read(); + host.as_ref().map(|ref host| host.with_context_eval(protocol, &io, action)) + } } impl MayPanic for NetworkService { diff --git a/util/network/src/session.rs b/util/network/src/session.rs index fdba12fff..845f98bec 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -72,6 +72,7 @@ pub enum SessionData { } /// Shared session information +#[derive(Debug, Clone)] pub struct SessionInfo { /// Peer public key pub id: Option, @@ -79,15 +80,21 @@ pub struct SessionInfo { pub client_version: String, /// Peer RLPx protocol version pub protocol_version: u32, + /// Session protocol capabilities + pub capabilities: Vec, /// Peer protocol capabilities - capabilities: Vec, + pub peer_capabilities: Vec, /// Peer ping delay in milliseconds pub ping_ms: Option, /// True if this session was originated by us. pub originated: bool, + /// Remote endpoint address of the session + pub remote_address: String, + /// Local endpoint address of the session + pub local_address: String, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PeerCapabilityInfo { pub protocol: ProtocolId, pub version: u8, @@ -109,8 +116,14 @@ impl Decodable for PeerCapabilityInfo { } } -#[derive(Debug)] -struct SessionCapabilityInfo { +impl ToString for PeerCapabilityInfo { + fn to_string(&self) -> String { + format!("{}/{}", str::from_utf8(&self.protocol[..]).unwrap_or("???"), self.version) + } +} + +#[derive(Debug, Clone)] +pub struct SessionCapabilityInfo { pub protocol: [u8; 3], pub version: u8, pub packet_count: u8, @@ -134,6 +147,7 @@ impl Session { where Message: Send + Clone { let originated = id.is_some(); let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake"); + let local_addr = handshake.connection.local_addr_str(); try!(handshake.start(io, host, originated)); Ok(Session { state: State::Handshake(handshake), @@ -143,8 +157,11 @@ impl Session { client_version: String::new(), protocol_version: 0, capabilities: Vec::new(), + peer_capabilities: Vec::new(), ping_ms: None, originated: originated, + remote_address: "Handshake".to_owned(), + local_address: local_addr, }, ping_time_ns: 0, pong_time_ns: None, @@ -155,6 +172,7 @@ impl Session { fn complete_handshake(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone { let connection = if let State::Handshake(ref mut h) = self.state { self.info.id = Some(h.id.clone()); + self.info.remote_address = h.connection.remote_addr_str(); try!(EncryptedConnection::new(h)) } else { panic!("Unexpected state"); @@ -431,8 +449,10 @@ impl Session { i += 1; } trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + self.info.protocol_version = protocol; self.info.client_version = client_version; self.info.capabilities = caps; + self.info.peer_capabilities = peer_caps; if self.info.capabilities.is_empty() { trace!(target: "network", "No common capabilities with peer."); return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer))); diff --git a/util/network/src/tests.rs b/util/network/src/tests.rs index 97a641c81..1b0c6eb6c 100644 --- a/util/network/src/tests.rs +++ b/util/network/src/tests.rs @@ -69,7 +69,7 @@ impl NetworkProtocolHandler for TestProtocol { } fn connected(&self, io: &NetworkContext, peer: &PeerId) { - assert!(io.peer_info(*peer).contains("Parity")); + assert!(io.peer_client_version(*peer).contains("Parity")); if self.drop_session { io.disconnect_peer(*peer) } else {