From cad08f78b9290c417cd337886a028c157a1a99b6 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 2 May 2016 13:15:58 +0200 Subject: [PATCH] Net etiquette: Track useless peers, Send out disconnect packet --- util/src/network/connection.rs | 10 +++++++ util/src/network/host.rs | 52 ++++++++++++++++++++++++++++------ util/src/network/node_table.rs | 16 +++++++++-- util/src/network/session.rs | 10 +++---- 4 files changed, 71 insertions(+), 17 deletions(-) diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index a3a42b44e..589fc0106 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -102,6 +102,11 @@ impl GenericConnection { } } + /// Check if this connection has data to be sent. + pub fn is_sending(&self) -> bool { + self.interest.is_writable() + } + /// Writable IO handler. Called when the socket is ready to send. pub fn writable(&mut self) -> io::Result { if self.send_queue.is_empty() { @@ -277,6 +282,11 @@ impl EncryptedConnection { self.connection.remote_addr() } + /// Check if this connection has data to be sent. + pub fn is_sending(&self) -> bool { + self.connection.is_sending() + } + /// Create an encrypted connection out of the handshake. Consumes a handshake object. pub fn new(handshake: &mut Handshake) -> Result { let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index ec35fbe61..1b4d3c67f 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -39,7 +39,7 @@ use io::*; use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; use network::node_table::*; use network::stats::NetworkStats; -use network::error::DisconnectReason; +use network::error::{NetworkError, DisconnectReason}; use network::discovery::{Discovery, TableUpdates, NodeEntry}; use network::ip_utils::{map_external_address, select_public_address}; @@ -122,6 +122,7 @@ const DISCOVERY: usize = LAST_HANDSHAKE + 3; const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4; const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5; const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6; +const NODE_TABLE: usize = LAST_HANDSHAKE + 7; const FIRST_SESSION: usize = 0; const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1; const FIRST_HANDSHAKE: usize = LAST_SESSION + 1; @@ -154,8 +155,10 @@ pub enum NetworkIoMessage where Message: Send + Sync + Clone { /// Timer delay in milliseconds. delay: u64, }, - /// Disconnect a peer + /// Disconnect a peer. Disconnect(PeerId), + /// Disconnect and temporary disable peer. + DisablePeer(PeerId), /// User message User(Message), } @@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left - self.disconnect_peer(peer); + self.io.message(NetworkIoMessage::DisablePeer(peer)); } /// Disconnect peer. Reconnect can be attempted later. @@ -391,7 +394,7 @@ impl Host where Message: Send + Sync + Clone { pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { - Err(e) => { debug!("Could not add node {}: {:?}", id, e); }, + Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; self.pinned_nodes.push(n.id.clone()); @@ -463,6 +466,7 @@ impl Host where Message: Send + Sync + Clone { io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); + io.register_timer(NODE_TABLE, 300_000).expect("Error registering node table timer"); } try!(io.register_stream(TCP_ACCEPT)); Ok(()) @@ -499,6 +503,7 @@ impl Host where Message: Send + Sync + Clone { } } for p in to_kill { + trace!(target: "network", "Ping timeout: {}", p); self.kill_connection(p, io, true); } } @@ -553,7 +558,7 @@ impl Host where Message: Send + Sync + Clone { match TcpStream::connect(&address) { Ok(socket) => socket, Err(e) => { - debug!("Can't connect to address {:?}: {:?}", address, e); + debug!(target: "network", "Can't connect to address {:?}: {:?}", address, e); return; } } @@ -609,11 +614,16 @@ impl Host where Message: Send + Sync + Clone { if let Err(e) = s.writable(io, &self.info.read().unwrap()) { trace!(target: "network", "Session write error: {}: {:?}", token, e); } - io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e)); + if s.done() { + io.deregister_stream(token).expect("Error deregistering stream"); + } else { + io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e)); + } } } fn connection_closed(&self, token: TimerToken, io: &IoContext>) { + trace!(target: "network", "Connection closed: {}", token); self.kill_connection(token, io, true); } @@ -650,7 +660,14 @@ impl Host where Message: Send + Sync + Clone { let mut s = session.lock().unwrap(); match s.readable(io, &self.info.read().unwrap()) { Err(e) => { - debug!(target: "network", "Session read error: {}: {:?}", token, e); + trace!(target: "network", "Session read error: {}:{} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); + match e { + UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) | + UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => { + self.nodes.write().unwrap().mark_as_useless(s.id()); + } + _ => (), + } kill = true; }, Ok(SessionData::Ready) => { @@ -721,7 +738,7 @@ impl Host where Message: Send + Sync + Clone { session.set_token(session_token); io.register_stream(session_token).expect("Error creating session registration"); self.stats.inc_sessions(); - trace!(target: "network", "Creating session {} -> {}", token, session_token); + trace!(target: "network", "Creating session {} -> {}:{} ({:?})", token, session_token, session.id(), session.remote_addr()); if !originated { // Add it no node table if let Ok(address) = session.remote_addr() { @@ -741,6 +758,7 @@ impl Host where Message: Send + Sync + Clone { } fn connection_timeout(&self, token: StreamToken, io: &IoContext>) { + trace!(target: "network", "Connection timeout: {}", token); self.kill_connection(token, io, true) } @@ -776,8 +794,8 @@ impl Host where Message: Send + Sync + Clone { } s.set_expired(); failure_id = Some(s.id().clone()); - deregister = true; } + deregister = remote || s.done(); } }, _ => {}, @@ -793,6 +811,8 @@ impl Host where Message: Send + Sync + Clone { } if deregister { io.deregister_stream(token).expect("Error deregistering stream"); + } else if expired_session.is_some() { + io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Connection registration error: {:?}", e)); } } @@ -819,6 +839,7 @@ impl Host where Message: Send + Sync + Clone { } } for i in to_remove { + trace!(target: "network", "Removed from node table: {}", i); self.kill_connection(i, io, false); } self.nodes.write().unwrap().update(node_changes); @@ -888,6 +909,9 @@ impl IoHandler> for Host where Messa } io.update_registration(DISCOVERY).expect("Error updating discovery registration"); }, + NODE_TABLE => { + self.nodes.write().unwrap().clear_useless(); + }, _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, @@ -933,6 +957,16 @@ impl IoHandler> for Host where Messa if let Some(session) = session { session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); } + trace!(target: "network", "Disconnect requested {}", peer); + self.kill_connection(*peer, io, false); + }, + NetworkIoMessage::DisablePeer(ref peer) => { + let session = { self.sessions.read().unwrap().get(*peer).cloned() }; + if let Some(session) = session { + session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); + self.nodes.write().unwrap().mark_as_useless(session.lock().unwrap().id()); + } + trace!(target: "network", "Disabling peer {}", peer); self.kill_connection(*peer, io, false); }, NetworkIoMessage::User(ref message) => { diff --git a/util/src/network/node_table.rs b/util/src/network/node_table.rs index 868863e8c..416a0f8eb 100644 --- a/util/src/network/node_table.rs +++ b/util/src/network/node_table.rs @@ -19,7 +19,7 @@ use std::slice::from_raw_parts; use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use std::hash::{Hash, Hasher}; use std::str::{FromStr}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::path::{PathBuf}; use std::fmt; @@ -196,6 +196,7 @@ impl Hash for Node { /// Node table backed by disk file. pub struct NodeTable { nodes: HashMap, + useless_nodes: HashSet, path: Option, } @@ -204,6 +205,7 @@ impl NodeTable { NodeTable { path: path.clone(), nodes: NodeTable::load(path), + useless_nodes: HashSet::new(), } } @@ -217,7 +219,7 @@ impl NodeTable { /// Returns node ids sorted by number of failures pub fn nodes(&self) -> Vec { - let mut refs: Vec<&Node> = self.nodes.values().collect(); + let mut refs: Vec<&Node> = self.nodes.values().filter(|n| !self.useless_nodes.contains(&n.id)).collect(); refs.sort_by(|a, b| a.failures.cmp(&b.failures)); refs.iter().map(|n| n.id.clone()).collect() } @@ -251,6 +253,16 @@ impl NodeTable { } } + /// Mark as useless, no furter attempts to connect until next call to `clear_useless`. + pub fn mark_as_useless(&mut self, id: &NodeId) { + self.useless_nodes.insert(id.clone()); + } + + /// Atempt to connect to useless nodes again. + pub fn clear_useless(&mut self) { + self.useless_nodes.clear(); + } + fn save(&self) { if let Some(ref path) = self.path { let mut path_buf = PathBuf::from(path); diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 7dbcc4229..614b7eeb6 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -155,6 +155,10 @@ impl Session { self.expired } + /// Check if this session is over and there is nothing to be sent. + pub fn done(&self) -> bool { + self.expired() && !self.connection.is_sending() + } /// Replace socket token pub fn set_token(&mut self, token: StreamToken) { self.connection.set_token(token); @@ -178,9 +182,6 @@ impl Session { /// Writable IO handler. Sends pending packets. pub fn writable(&mut self, io: &IoContext, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone { - if self.expired() { - return Ok(()) - } self.connection.writable(io) } @@ -200,9 +201,6 @@ impl Session { /// Update registration with the event loop. Should be called at the end of the IO handler. pub fn update_socket(&self, reg:Token, event_loop: &mut EventLoop) -> Result<(), UtilError> { - if self.expired() { - return Ok(()); - } self.connection.update_socket(reg, event_loop) }