Net etiquette: Track useless peers, Send out disconnect packet (#1028)

This commit is contained in:
Arkadiy Paronyan 2016-05-02 14:48:30 +02:00 committed by Gav Wood
parent e2465b1eab
commit c34e3535e0
4 changed files with 71 additions and 17 deletions

View File

@ -102,6 +102,11 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
} }
/// Check if this connection has data to be sent.
pub fn is_sending(&self) -> bool {
self.interest.is_writable()
}
/// Writable IO handler. Called when the socket is ready to send. /// Writable IO handler. Called when the socket is ready to send.
pub fn writable(&mut self) -> io::Result<WriteStatus> { pub fn writable(&mut self) -> io::Result<WriteStatus> {
if self.send_queue.is_empty() { if self.send_queue.is_empty() {
@ -277,6 +282,11 @@ impl EncryptedConnection {
self.connection.remote_addr() self.connection.remote_addr()
} }
/// Check if this connection has data to be sent.
pub fn is_sending(&self) -> bool {
self.connection.is_sending()
}
/// Create an encrypted connection out of the handshake. Consumes a handshake object. /// Create an encrypted connection out of the handshake. Consumes a handshake object.
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> { pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> {
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)); let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral));

View File

@ -39,7 +39,7 @@ use io::*;
use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; use network::{NetworkProtocolHandler, PROTOCOL_VERSION};
use network::node_table::*; use network::node_table::*;
use network::stats::NetworkStats; use network::stats::NetworkStats;
use network::error::DisconnectReason; use network::error::{NetworkError, DisconnectReason};
use network::discovery::{Discovery, TableUpdates, NodeEntry}; use network::discovery::{Discovery, TableUpdates, NodeEntry};
use network::ip_utils::{map_external_address, select_public_address}; use network::ip_utils::{map_external_address, select_public_address};
@ -122,6 +122,7 @@ const DISCOVERY: usize = LAST_HANDSHAKE + 3;
const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4; const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4;
const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5; const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5;
const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6; const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6;
const NODE_TABLE: usize = LAST_HANDSHAKE + 7;
const FIRST_SESSION: usize = 0; const FIRST_SESSION: usize = 0;
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1; const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
const FIRST_HANDSHAKE: usize = LAST_SESSION + 1; const FIRST_HANDSHAKE: usize = LAST_SESSION + 1;
@ -154,8 +155,10 @@ pub enum NetworkIoMessage<Message> where Message: Send + Sync + Clone {
/// Timer delay in milliseconds. /// Timer delay in milliseconds.
delay: u64, delay: u64,
}, },
/// Disconnect a peer /// Disconnect a peer.
Disconnect(PeerId), Disconnect(PeerId),
/// Disconnect and temporary disable peer.
DisablePeer(PeerId),
/// User message /// User message
User(Message), User(Message),
} }
@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) { pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left //TODO: remove capability, disconnect if no capabilities left
self.disconnect_peer(peer); self.io.message(NetworkIoMessage::DisablePeer(peer));
} }
/// Disconnect peer. Reconnect can be attempted later. /// Disconnect peer. Reconnect can be attempted later.
@ -391,7 +394,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
pub fn add_node(&mut self, id: &str) { pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) { match Node::from_str(id) {
Err(e) => { debug!("Could not add node {}: {:?}", id, e); }, Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
Ok(n) => { Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.pinned_nodes.push(n.id.clone()); self.pinned_nodes.push(n.id.clone());
@ -463,6 +466,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
io.register_timer(NODE_TABLE, 300_000).expect("Error registering node table timer");
} }
try!(io.register_stream(TCP_ACCEPT)); try!(io.register_stream(TCP_ACCEPT));
Ok(()) Ok(())
@ -499,6 +503,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
} }
for p in to_kill { for p in to_kill {
trace!(target: "network", "Ping timeout: {}", p);
self.kill_connection(p, io, true); self.kill_connection(p, io, true);
} }
} }
@ -553,7 +558,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
match TcpStream::connect(&address) { match TcpStream::connect(&address) {
Ok(socket) => socket, Ok(socket) => socket,
Err(e) => { Err(e) => {
debug!("Can't connect to address {:?}: {:?}", address, e); debug!(target: "network", "Can't connect to address {:?}: {:?}", address, e);
return; return;
} }
} }
@ -609,11 +614,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if let Err(e) = s.writable(io, &self.info.read().unwrap()) { if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
trace!(target: "network", "Session write error: {}: {:?}", token, e); trace!(target: "network", "Session write error: {}: {:?}", token, e);
} }
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e)); if s.done() {
io.deregister_stream(token).expect("Error deregistering stream");
} else {
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e));
}
} }
} }
fn connection_closed(&self, token: TimerToken, io: &IoContext<NetworkIoMessage<Message>>) { fn connection_closed(&self, token: TimerToken, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "network", "Connection closed: {}", token);
self.kill_connection(token, io, true); self.kill_connection(token, io, true);
} }
@ -650,7 +660,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut s = session.lock().unwrap(); let mut s = session.lock().unwrap();
match s.readable(io, &self.info.read().unwrap()) { match s.readable(io, &self.info.read().unwrap()) {
Err(e) => { Err(e) => {
debug!(target: "network", "Session read error: {}: {:?}", token, e); trace!(target: "network", "Session read error: {}:{} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
match e {
UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) |
UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => {
self.nodes.write().unwrap().mark_as_useless(s.id());
}
_ => (),
}
kill = true; kill = true;
}, },
Ok(SessionData::Ready) => { Ok(SessionData::Ready) => {
@ -721,7 +738,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
session.set_token(session_token); session.set_token(session_token);
io.register_stream(session_token).expect("Error creating session registration"); io.register_stream(session_token).expect("Error creating session registration");
self.stats.inc_sessions(); self.stats.inc_sessions();
trace!(target: "network", "Creating session {} -> {}", token, session_token); trace!(target: "network", "Creating session {} -> {}:{} ({:?})", token, session_token, session.id(), session.remote_addr());
if !originated { if !originated {
// Add it no node table // Add it no node table
if let Ok(address) = session.remote_addr() { if let Ok(address) = session.remote_addr() {
@ -741,6 +758,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "network", "Connection timeout: {}", token);
self.kill_connection(token, io, true) self.kill_connection(token, io, true)
} }
@ -776,8 +794,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
s.set_expired(); s.set_expired();
failure_id = Some(s.id().clone()); failure_id = Some(s.id().clone());
deregister = true;
} }
deregister = remote || s.done();
} }
}, },
_ => {}, _ => {},
@ -793,6 +811,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
if deregister { if deregister {
io.deregister_stream(token).expect("Error deregistering stream"); io.deregister_stream(token).expect("Error deregistering stream");
} else if expired_session.is_some() {
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Connection registration error: {:?}", e));
} }
} }
@ -819,6 +839,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
} }
for i in to_remove { for i in to_remove {
trace!(target: "network", "Removed from node table: {}", i);
self.kill_connection(i, io, false); self.kill_connection(i, io, false);
} }
self.nodes.write().unwrap().update(node_changes); self.nodes.write().unwrap().update(node_changes);
@ -888,6 +909,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} }
io.update_registration(DISCOVERY).expect("Error updating discovery registration"); io.update_registration(DISCOVERY).expect("Error updating discovery registration");
}, },
NODE_TABLE => {
self.nodes.write().unwrap().clear_useless();
},
_ => match self.timers.read().unwrap().get(&token).cloned() { _ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
@ -933,6 +957,16 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
if let Some(session) = session { if let Some(session) = session {
session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested); session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested);
} }
trace!(target: "network", "Disconnect requested {}", peer);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
if let Some(session) = session {
session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested);
self.nodes.write().unwrap().mark_as_useless(session.lock().unwrap().id());
}
trace!(target: "network", "Disabling peer {}", peer);
self.kill_connection(*peer, io, false); self.kill_connection(*peer, io, false);
}, },
NetworkIoMessage::User(ref message) => { NetworkIoMessage::User(ref message) => {

View File

@ -19,7 +19,7 @@ use std::slice::from_raw_parts;
use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::str::{FromStr}; use std::str::{FromStr};
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::path::{PathBuf}; use std::path::{PathBuf};
use std::fmt; use std::fmt;
@ -196,6 +196,7 @@ impl Hash for Node {
/// Node table backed by disk file. /// Node table backed by disk file.
pub struct NodeTable { pub struct NodeTable {
nodes: HashMap<NodeId, Node>, nodes: HashMap<NodeId, Node>,
useless_nodes: HashSet<NodeId>,
path: Option<String>, path: Option<String>,
} }
@ -204,6 +205,7 @@ impl NodeTable {
NodeTable { NodeTable {
path: path.clone(), path: path.clone(),
nodes: NodeTable::load(path), nodes: NodeTable::load(path),
useless_nodes: HashSet::new(),
} }
} }
@ -217,7 +219,7 @@ impl NodeTable {
/// Returns node ids sorted by number of failures /// Returns node ids sorted by number of failures
pub fn nodes(&self) -> Vec<NodeId> { pub fn nodes(&self) -> Vec<NodeId> {
let mut refs: Vec<&Node> = self.nodes.values().collect(); let mut refs: Vec<&Node> = self.nodes.values().filter(|n| !self.useless_nodes.contains(&n.id)).collect();
refs.sort_by(|a, b| a.failures.cmp(&b.failures)); refs.sort_by(|a, b| a.failures.cmp(&b.failures));
refs.iter().map(|n| n.id.clone()).collect() refs.iter().map(|n| n.id.clone()).collect()
} }
@ -251,6 +253,16 @@ impl NodeTable {
} }
} }
/// Mark as useless, no furter attempts to connect until next call to `clear_useless`.
pub fn mark_as_useless(&mut self, id: &NodeId) {
self.useless_nodes.insert(id.clone());
}
/// Atempt to connect to useless nodes again.
pub fn clear_useless(&mut self) {
self.useless_nodes.clear();
}
fn save(&self) { fn save(&self) {
if let Some(ref path) = self.path { if let Some(ref path) = self.path {
let mut path_buf = PathBuf::from(path); let mut path_buf = PathBuf::from(path);

View File

@ -155,6 +155,10 @@ impl Session {
self.expired self.expired
} }
/// Check if this session is over and there is nothing to be sent.
pub fn done(&self) -> bool {
self.expired() && !self.connection.is_sending()
}
/// Replace socket token /// Replace socket token
pub fn set_token(&mut self, token: StreamToken) { pub fn set_token(&mut self, token: StreamToken) {
self.connection.set_token(token); self.connection.set_token(token);
@ -178,9 +182,6 @@ impl Session {
/// Writable IO handler. Sends pending packets. /// Writable IO handler. Sends pending packets.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone {
if self.expired() {
return Ok(())
}
self.connection.writable(io) self.connection.writable(io)
} }
@ -200,9 +201,6 @@ impl Session {
/// Update registration with the event loop. Should be called at the end of the IO handler. /// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
if self.expired() {
return Ok(());
}
self.connection.update_socket(reg, event_loop) self.connection.update_socket(reg, event_loop)
} }