From 76ea030b7817a3f3c21f4674c987d817de4d7c38 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 14 Feb 2016 01:03:48 +0100 Subject: [PATCH] Small refactoring --- sync/src/chain.rs | 6 +- util/src/network/discovery.rs | 6 +- util/src/network/handshake.rs | 2 +- util/src/network/host.rs | 106 ++++++++--------- util/src/network/mod.rs | 1 - util/src/network/node.rs | 198 -------------------------------- util/src/network/node_table.rs | 203 ++++++++++++++++++++++++++++++++- util/src/network/session.rs | 2 +- 8 files changed, 257 insertions(+), 267 deletions(-) delete mode 100644 util/src/network/node.rs diff --git a/sync/src/chain.rs b/sync/src/chain.rs index f82162b79..671e2241e 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -82,7 +82,7 @@ const RECEIPTS_PACKET: u8 = 0x10; const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent -const CONNECTION_TIMEOUT_SEC: f64 = 30f64; +const CONNECTION_TIMEOUT_SEC: f64 = 10f64; struct Header { /// Header data @@ -309,7 +309,7 @@ impl ChainSync { } self.peers.insert(peer_id.clone(), peer); - info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); + debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); self.sync_peer(io, peer_id, false); Ok(()) } @@ -537,7 +537,7 @@ impl ChainSync { pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Disconnecting {}", peer); if self.peers.contains_key(&peer) { - info!(target: "sync", "Disconnected {}", peer); + debug!(target: "sync", "Disconnected {}", peer); self.clear_peer_download(peer); self.peers.remove(&peer); self.continue_sync(io); diff --git a/util/src/network/discovery.rs b/util/src/network/discovery.rs index a214f5278..9feef9c74 100644 --- a/util/src/network/discovery.rs +++ b/util/src/network/discovery.rs @@ -26,7 +26,7 @@ use time; use hash::*; use crypto::*; use rlp::*; -use network::node::*; +use network::node_table::*; use network::error::NetworkError; use io::StreamToken; @@ -227,8 +227,7 @@ impl Discovery { } #[allow(map_clone)] - fn nearest_node_entries(target: &NodeId, buckets: &[NodeBucket]) -> Vec - { + fn nearest_node_entries(target: &NodeId, buckets: &[NodeBucket]) -> Vec { let mut found: BTreeMap> = BTreeMap::new(); let mut count = 0; @@ -425,6 +424,7 @@ impl Discovery { let node_id: NodeId = try!(r.val_at(3)); let entry = NodeEntry { id: node_id.clone(), endpoint: endpoint }; added.insert(node_id, entry.clone()); + self.ping(&entry.endpoint); self.update_node(entry); } Ok(Some(TableUpdates { added: added, removed: HashSet::new() })) diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index 94650b2a7..1fd830ea0 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -24,7 +24,7 @@ use crypto::*; use crypto; use network::connection::{Connection}; use network::host::{HostInfo}; -use network::node::NodeId; +use network::node_table::NodeId; use error::*; use network::error::NetworkError; use network::stats::NetworkStats; diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 47a3d9986..321e965ec 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -32,18 +32,18 @@ use network::session::{Session, SessionData}; use error::*; use io::*; use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; -use network::node::*; +use network::node_table::*; use network::stats::NetworkStats; use network::error::DisconnectReason; use igd::{PortMappingProtocol,search_gateway}; use network::discovery::{Discovery, TableUpdates, NodeEntry}; -use network::node_table::NodeTable; type Slab = ::slab::Slab; const _DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; const MAINTENANCE_TIMEOUT: u64 = 1000; +const MAX_HANDSHAKES: usize = 100; #[derive(Debug)] /// Network service configuration @@ -226,7 +226,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone _ => warn!(target: "net", "Send: Peer is not connected yet") } } else { - warn!(target: "net", "Send: Peer does not exist") + trace!(target: "net", "Send: Peer no longer exist") } Ok(()) } @@ -405,11 +405,23 @@ impl Host where Message: Send + Sync + Clone { } fn have_session(&self, id: &NodeId) -> bool { - self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false }) + self.connections.read().unwrap().iter().any(|e| + match *e.lock().unwrap().deref() { ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false }) + } + + fn session_count(&self) -> usize { + self.connections.read().unwrap().iter().filter(|e| + match *e.lock().unwrap().deref() { ConnectionEntry::Session(_) => true, _ => false }).count() } fn connecting_to(&self, id: &NodeId) -> bool { - self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) + self.connections.read().unwrap().iter().any(|e| + match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) + } + + fn handshake_count(&self) -> usize { + self.connections.read().unwrap().iter().filter(|e| + match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(_) => true, _ => false }).count() } fn keep_alive(&self, io: &IoContext>) { @@ -423,64 +435,40 @@ impl Host where Message: Send + Sync + Clone { } } for p in to_kill { - self.kill_connection(p, io); + self.kill_connection(p, io, true); } } fn connect_peers(&self, io: &IoContext>) { - struct NodeInfo { - id: NodeId, - peer_type: PeerType + let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; + let connections = self.session_count(); + if connections >= ideal_peers as usize { + return; } - let mut to_connect: Vec = Vec::new(); - let mut req_conn = 0; - let pin = self.info.read().unwrap().deref().config.pin; - let ideal_peers = self.info.read().unwrap().deref().config.ideal_peers; - for n in self.nodes.read().unwrap().nodes().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { - let connected = self.have_session(&n.id) || self.connecting_to(&n.id); - let required = n.peer_type == PeerType::Required; - if connected && required { - req_conn += 1; - } - else if !connected && (!pin || required) { - to_connect.push(n); - } + let handshake_count = self.handshake_count(); + if handshake_count >= MAX_HANDSHAKES { + return; } - for n in &to_connect { - if n.peer_type == PeerType::Required { - if req_conn < ideal_peers { - self.connect_peer(&n.id, io); - } - req_conn += 1; - } - } - if !pin { - let pending_count = 0; //TODO: - let peer_count = 0; - let mut open_slots = ideal_peers - peer_count - pending_count + req_conn; - if open_slots > 0 { - for n in &to_connect { - if n.peer_type == PeerType::Optional && open_slots > 0 { - open_slots -= 1; - self.connect_peer(&n.id, io); - } - } - } + let nodes = { self.nodes.read().unwrap().nodes() }; + + for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id)).take(MAX_HANDSHAKES - handshake_count) { + self.connect_peer(&id, io); } + debug!(target: "net", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count()); } #[allow(single_match)] fn connect_peer(&self, id: &NodeId, io: &IoContext>) { if self.have_session(id) { - debug!("Aborted connect. Node already connected."); + trace!("Aborted connect. Node already connected."); return; } if self.connecting_to(id) { - debug!("Aborted connect. Node already connecting."); + trace!("Aborted connect. Node already connecting."); return; } @@ -542,7 +530,7 @@ impl Host where Message: Send + Sync + Clone { ConnectionEntry::Handshake(ref mut h) => { match h.writable(io, &self.info.read().unwrap()) { Err(e) => { - debug!(target: "net", "Handshake write error: {:?}", e); + debug!(target: "net", "Handshake write error: {}:{:?}", token, e); kill = true; }, Ok(_) => () @@ -554,7 +542,7 @@ impl Host where Message: Send + Sync + Clone { ConnectionEntry::Session(ref mut s) => { match s.writable(io, &self.info.read().unwrap()) { Err(e) => { - debug!(target: "net", "Session write error: {:?}", e); + debug!(target: "net", "Session write error: {}:{:?}", token, e); kill = true; }, Ok(_) => () @@ -564,7 +552,7 @@ impl Host where Message: Send + Sync + Clone { } } if kill { - self.kill_connection(token, io); //TODO: mark connection as dead an check in kill_connection + self.kill_connection(token, io, true); //TODO: mark connection as dead an check in kill_connection return; } else if create_session { self.start_session(token, io); @@ -573,7 +561,7 @@ impl Host where Message: Send + Sync + Clone { } fn connection_closed(&self, token: TimerToken, io: &IoContext>) { - self.kill_connection(token, io); + self.kill_connection(token, io, true); } fn connection_readable(&self, token: StreamToken, io: &IoContext>) { @@ -585,7 +573,7 @@ impl Host where Message: Send + Sync + Clone { match *connection.lock().unwrap().deref_mut() { ConnectionEntry::Handshake(ref mut h) => { if let Err(e) = h.readable(io, &self.info.read().unwrap()) { - debug!(target: "net", "Handshake read error: {:?}", e); + debug!(target: "net", "Handshake read error: {}:{:?}", token, e); kill = true; } if h.done() { @@ -595,7 +583,7 @@ impl Host where Message: Send + Sync + Clone { ConnectionEntry::Session(ref mut s) => { match s.readable(io, &self.info.read().unwrap()) { Err(e) => { - debug!(target: "net", "Handshake read error: {:?}", e); + debug!(target: "net", "Handshake read error: {}:{:?}", token, e); kill = true; }, Ok(SessionData::Ready) => { @@ -621,7 +609,7 @@ impl Host where Message: Send + Sync + Clone { } } if kill { - self.kill_connection(token, io); //TODO: mark connection as dead an check in kill_connection + self.kill_connection(token, io, true); //TODO: mark connection as dead an check in kill_connection return; } else if create_session { self.start_session(token, io); @@ -657,17 +645,20 @@ impl Host where Message: Send + Sync + Clone { } fn connection_timeout(&self, token: StreamToken, io: &IoContext>) { - self.kill_connection(token, io) + self.kill_connection(token, io, true) } - fn kill_connection(&self, token: StreamToken, io: &IoContext>) { + fn kill_connection(&self, token: StreamToken, io: &IoContext>, remote: bool) { let mut to_disconnect: Vec = Vec::new(); { let mut connections = self.connections.write().unwrap(); if let Some(connection) = connections.get(token).cloned() { match *connection.lock().unwrap().deref_mut() { - ConnectionEntry::Handshake(_) => { + ConnectionEntry::Handshake(ref h) => { connections.remove(token); + if remote { + self.nodes.write().unwrap().note_failure(h.id()); + } }, ConnectionEntry::Session(ref mut s) if s.is_ready() => { for (p, _) in self.handlers.read().unwrap().iter() { @@ -676,6 +667,9 @@ impl Host where Message: Send + Sync + Clone { } } connections.remove(token); + if remote { + self.nodes.write().unwrap().note_failure(s.id()); + } }, _ => {}, } @@ -706,7 +700,7 @@ impl Host where Message: Send + Sync + Clone { } } for i in to_remove { - self.kill_connection(i, io); + self.kill_connection(i, io, false); } self.nodes.write().unwrap().update(node_changes); } @@ -816,7 +810,7 @@ impl IoHandler> for Host where Messa ConnectionEntry::Session(ref mut s) => { s.disconnect(DisconnectReason::DisconnectRequested); } } } - self.kill_connection(*peer, io); + self.kill_connection(*peer, io, false); }, NetworkIoMessage::User(ref message) => { for (p, h) in self.handlers.read().unwrap().iter() { diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 466ef4e6a..7d5aac8f7 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -71,7 +71,6 @@ mod session; mod discovery; mod service; mod error; -mod node; mod node_table; mod stats; diff --git a/util/src/network/node.rs b/util/src/network/node.rs deleted file mode 100644 index d8370bc79..000000000 --- a/util/src/network/node.rs +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -use std::mem; -use std::slice::from_raw_parts; -use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; -use std::hash::{Hash, Hasher}; -use std::str::{FromStr}; -use hash::*; -use rlp::*; -use time::Tm; -use error::*; - -/// Node public key -pub type NodeId = H512; - -#[derive(Debug, Clone)] -/// Node address info -pub struct NodeEndpoint { - /// IP(V4 or V6) address - pub address: SocketAddr, - /// Conneciton port. - pub udp_port: u16 -} - -impl NodeEndpoint { - pub fn udp_address(&self) -> SocketAddr { - match self.address { - SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(a.ip().clone(), self.udp_port)), - SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(a.ip().clone(), self.udp_port, a.flowinfo(), a.scope_id())), - } - } -} - -impl NodeEndpoint { - pub fn from_rlp(rlp: &UntrustedRlp) -> Result { - let tcp_port = try!(rlp.val_at::(2)); - let udp_port = try!(rlp.val_at::(1)); - let addr_bytes = try!(try!(rlp.at(0)).data()); - let address = try!(match addr_bytes.len() { - 4 => Ok(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(addr_bytes[0], addr_bytes[1], addr_bytes[2], addr_bytes[3]), tcp_port))), - 16 => unsafe { - let o: *const u16 = mem::transmute(addr_bytes.as_ptr()); - let o = from_raw_parts(o, 8); - Ok(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(o[0], o[1], o[2], o[3], o[4], o[5], o[6], o[7]), tcp_port, 0, 0))) - }, - _ => Err(DecoderError::RlpInconsistentLengthAndData) - }); - Ok(NodeEndpoint { address: address, udp_port: udp_port }) - } - - pub fn to_rlp(&self, rlp: &mut RlpStream) { - match self.address { - SocketAddr::V4(a) => { - rlp.append(&(&a.ip().octets()[..])); - } - SocketAddr::V6(a) => unsafe { - let o: *const u8 = mem::transmute(a.ip().segments().as_ptr()); - rlp.append(&from_raw_parts(o, 16)); - } - }; - rlp.append(&self.udp_port); - rlp.append(&self.address.port()); - } - - pub fn to_rlp_list(&self, rlp: &mut RlpStream) { - rlp.begin_list(3); - self.to_rlp(rlp); - } - - pub fn is_valid(&self) -> bool { - self.udp_port != 0 && self.address.port() != 0 && - match self.address { - SocketAddr::V4(a) => !a.ip().is_unspecified(), - SocketAddr::V6(a) => !a.ip().is_unspecified() - } - } -} - -impl FromStr for NodeEndpoint { - type Err = UtilError; - - /// Create endpoint from string. Performs name resolution if given a host name. - fn from_str(s: &str) -> Result { - let address = s.to_socket_addrs().map(|mut i| i.next()); - match address { - Ok(Some(a)) => Ok(NodeEndpoint { - address: a, - udp_port: a.port() - }), - Ok(_) => Err(UtilError::AddressResolve(None)), - Err(e) => Err(UtilError::AddressResolve(Some(e))) - } - } -} - -#[derive(PartialEq, Eq, Copy, Clone)] -pub enum PeerType { - Required, - Optional -} - -pub struct Node { - pub id: NodeId, - pub endpoint: NodeEndpoint, - pub peer_type: PeerType, - pub last_attempted: Option, -} - -impl Node { - pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node { - Node { - id: id, - endpoint: endpoint, - peer_type: PeerType::Optional, - last_attempted: None, - } - } -} - -impl FromStr for Node { - type Err = UtilError; - fn from_str(s: &str) -> Result { - let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { - (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) - } - else { - (NodeId::new(), try!(NodeEndpoint::from_str(s))) - }; - - Ok(Node { - id: id, - endpoint: endpoint, - peer_type: PeerType::Optional, - last_attempted: None, - }) - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} -impl Eq for Node {} - -impl Hash for Node { - fn hash(&self, state: &mut H) where H: Hasher { - self.id.hash(state) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::str::FromStr; - use std::net::*; - use hash::*; - - #[test] - fn endpoint_parse() { - let endpoint = NodeEndpoint::from_str("123.99.55.44:7770"); - assert!(endpoint.is_ok()); - let v4 = match endpoint.unwrap().address { - SocketAddr::V4(v4address) => v4address, - _ => panic!("should ve v4 address") - }; - assert_eq!(SocketAddrV4::new(Ipv4Addr::new(123, 99, 55, 44), 7770), v4); - } - - #[test] - fn node_parse() { - let node = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770"); - assert!(node.is_ok()); - let node = node.unwrap(); - let v4 = match node.endpoint.address { - SocketAddr::V4(v4address) => v4address, - _ => panic!("should ve v4 address") - }; - assert_eq!(SocketAddrV4::new(Ipv4Addr::new(22, 99, 55, 44), 7770), v4); - assert_eq!( - H512::from_str("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap(), - node.id); - } -} diff --git a/util/src/network/node_table.rs b/util/src/network/node_table.rs index d93057eb3..a3ee57481 100644 --- a/util/src/network/node_table.rs +++ b/util/src/network/node_table.rs @@ -14,11 +14,161 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::mem; +use std::slice::from_raw_parts; +use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; +use std::hash::{Hash, Hasher}; +use std::str::{FromStr}; use std::collections::HashMap; -use std::collections::hash_map::Values; -use network::node::*; +use hash::*; +use rlp::*; +use time::Tm; +use error::*; use network::discovery::TableUpdates; +/// Node public key +pub type NodeId = H512; + +#[derive(Debug, Clone)] +/// Node address info +pub struct NodeEndpoint { + /// IP(V4 or V6) address + pub address: SocketAddr, + /// Conneciton port. + pub udp_port: u16 +} + +impl NodeEndpoint { + pub fn udp_address(&self) -> SocketAddr { + match self.address { + SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(a.ip().clone(), self.udp_port)), + SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(a.ip().clone(), self.udp_port, a.flowinfo(), a.scope_id())), + } + } +} + +impl NodeEndpoint { + pub fn from_rlp(rlp: &UntrustedRlp) -> Result { + let tcp_port = try!(rlp.val_at::(2)); + let udp_port = try!(rlp.val_at::(1)); + let addr_bytes = try!(try!(rlp.at(0)).data()); + let address = try!(match addr_bytes.len() { + 4 => Ok(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(addr_bytes[0], addr_bytes[1], addr_bytes[2], addr_bytes[3]), tcp_port))), + 16 => unsafe { + let o: *const u16 = mem::transmute(addr_bytes.as_ptr()); + let o = from_raw_parts(o, 8); + Ok(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(o[0], o[1], o[2], o[3], o[4], o[5], o[6], o[7]), tcp_port, 0, 0))) + }, + _ => Err(DecoderError::RlpInconsistentLengthAndData) + }); + Ok(NodeEndpoint { address: address, udp_port: udp_port }) + } + + pub fn to_rlp(&self, rlp: &mut RlpStream) { + match self.address { + SocketAddr::V4(a) => { + rlp.append(&(&a.ip().octets()[..])); + } + SocketAddr::V6(a) => unsafe { + let o: *const u8 = mem::transmute(a.ip().segments().as_ptr()); + rlp.append(&from_raw_parts(o, 16)); + } + }; + rlp.append(&self.udp_port); + rlp.append(&self.address.port()); + } + + pub fn to_rlp_list(&self, rlp: &mut RlpStream) { + rlp.begin_list(3); + self.to_rlp(rlp); + } + + pub fn is_valid(&self) -> bool { + self.udp_port != 0 && self.address.port() != 0 && + match self.address { + SocketAddr::V4(a) => !a.ip().is_unspecified(), + SocketAddr::V6(a) => !a.ip().is_unspecified() + } + } +} + +impl FromStr for NodeEndpoint { + type Err = UtilError; + + /// Create endpoint from string. Performs name resolution if given a host name. + fn from_str(s: &str) -> Result { + let address = s.to_socket_addrs().map(|mut i| i.next()); + match address { + Ok(Some(a)) => Ok(NodeEndpoint { + address: a, + udp_port: a.port() + }), + Ok(_) => Err(UtilError::AddressResolve(None)), + Err(e) => Err(UtilError::AddressResolve(Some(e))) + } + } +} + +#[derive(PartialEq, Eq, Copy, Clone)] +pub enum PeerType { + _Required, + Optional +} + +pub struct Node { + pub id: NodeId, + pub endpoint: NodeEndpoint, + pub peer_type: PeerType, + pub failures: u32, + pub last_attempted: Option, +} + +impl Node { + pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node { + Node { + id: id, + endpoint: endpoint, + peer_type: PeerType::Optional, + failures: 0, + last_attempted: None, + } + } +} + +impl FromStr for Node { + type Err = UtilError; + fn from_str(s: &str) -> Result { + let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { + (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) + } + else { + (NodeId::new(), try!(NodeEndpoint::from_str(s))) + }; + + Ok(Node { + id: id, + endpoint: endpoint, + peer_type: PeerType::Optional, + last_attempted: None, + failures: 0, + }) + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} +impl Eq for Node {} + +impl Hash for Node { + fn hash(&self, state: &mut H) where H: Hasher { + self.id.hash(state) + } +} + +/// Node table backed by disk file. pub struct NodeTable { nodes: HashMap } @@ -30,18 +180,24 @@ impl NodeTable { } } + /// Add a node to table pub fn add_node(&mut self, node: Node) { self.nodes.insert(node.id.clone(), node); } - pub fn nodes(&self) -> Values { - self.nodes.values() + /// Returns node ids sorted by number of failures + pub fn nodes(&self) -> Vec { + let mut refs: Vec<&Node> = self.nodes.values().collect(); + refs.sort_by(|a, b| a.failures.cmp(&b.failures)); + refs.iter().map(|n| n.id.clone()).collect() } + /// Get particular node pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut Node> { self.nodes.get_mut(id) } + /// Apply table changes coming from discovery pub fn update(&mut self, mut update: TableUpdates) { for (_, node) in update.added.drain() { let mut entry = self.nodes.entry(node.id.clone()).or_insert_with(|| Node::new(node.id.clone(), node.endpoint.clone())); @@ -52,4 +208,43 @@ impl NodeTable { } } + pub fn note_failure(&mut self, id: &NodeId) { + if let Some(node) = self.nodes.get_mut(id) { + node.failures += 1; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use std::net::*; + use hash::*; + + #[test] + fn endpoint_parse() { + let endpoint = NodeEndpoint::from_str("123.99.55.44:7770"); + assert!(endpoint.is_ok()); + let v4 = match endpoint.unwrap().address { + SocketAddr::V4(v4address) => v4address, + _ => panic!("should ve v4 address") + }; + assert_eq!(SocketAddrV4::new(Ipv4Addr::new(123, 99, 55, 44), 7770), v4); + } + + #[test] + fn node_parse() { + let node = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770"); + assert!(node.is_ok()); + let node = node.unwrap(); + let v4 = match node.endpoint.address { + SocketAddr::V4(v4address) => v4address, + _ => panic!("should ve v4 address") + }; + assert_eq!(SocketAddrV4::new(Ipv4Addr::new(22, 99, 55, 44), 7770), v4); + assert_eq!( + H512::from_str("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap(), + node.id); + } } diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 19e2cf08e..3b49a8f5e 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -23,7 +23,7 @@ use error::*; use io::{IoContext, StreamToken}; use network::error::{NetworkError, DisconnectReason}; use network::host::*; -use network::node::NodeId; +use network::node_table::NodeId; use time; const PING_TIMEOUT_SEC: u64 = 30;