// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
use std::io::{Read, Write, self};
use std::fs;
use ethkey::{KeyPair, Secret, Random, Generator};
use hash::keccak;
use mio::*;
use mio::deprecated::{EventLoop};
use mio::tcp::*;
use bigint::hash::*;
use rlp::*;
use session::{Session, SessionInfo, SessionData};
use io::*;
use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION, IpFilter};
use node_table::*;
use stats::NetworkStats;
use discovery::{Discovery, TableUpdates, NodeEntry};
use ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use connection_filter::{ConnectionFilter, ConnectionDirection};
use error::{Error, ErrorKind, DisconnectReason};
type Slab = ::slab::Slab;
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;
const DEFAULT_PORT: u16 = 30303;
// StreamToken/TimerToken
const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
const IDLE: TimerToken = SYS_TIMER + 2;
const DISCOVERY: StreamToken = SYS_TIMER + 3;
const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 5;
const NODE_TABLE: TimerToken = SYS_TIMER + 6;
const FIRST_SESSION: StreamToken = 0;
const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: TimerToken = LAST_SESSION + 256;
const SYS_TIMER: TimerToken = LAST_SESSION + 1;
// Timeouts
// for IDLE TimerToken
const MAINTENANCE_TIMEOUT: u64 = 1000;
// for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: u64 = 60_000;
// for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: u64 = 300;
// for NODE_TABLE TimerToken
const NODE_TABLE_TIMEOUT: u64 = 300_000;
#[derive(Debug, PartialEq, Clone)]
/// Network service configuration
pub struct NetworkConfiguration {
/// Directory path to store general network configuration. None means nothing will be saved
pub config_path: Option,
/// Directory path to store network-specific configuration. None means nothing will be saved
pub net_config_path: Option,
/// IP address to listen for incoming connections. Listen to all connections by default
pub listen_address: Option,
/// IP address to advertise. Detected automatically if none.
pub public_address: Option,
/// Port for UDP connections, same as TCP by default
pub udp_port: Option,
/// Enable NAT configuration
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// List of initial node addresses
pub boot_nodes: Vec,
/// Use provided node key instead of default
pub use_secret: Option,
/// Minimum number of connected peers to maintain
pub min_peers: u32,
/// Maximum allowed number of peers
pub max_peers: u32,
/// Maximum handshakes
pub max_handshakes: u32,
/// Reserved protocols. Peers with protocol get additional connection slots.
pub reserved_protocols: HashMap,
/// List of reserved node addresses.
pub reserved_nodes: Vec,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// IP filter
pub ip_filter: IpFilter,
/// Client identifier
pub client_version: String,
}
impl Default for NetworkConfiguration {
fn default() -> Self {
NetworkConfiguration::new()
}
}
impl NetworkConfiguration {
/// Create a new instance of default settings.
pub fn new() -> Self {
NetworkConfiguration {
config_path: None,
net_config_path: None,
listen_address: None,
public_address: None,
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
boot_nodes: Vec::new(),
use_secret: None,
min_peers: 25,
max_peers: 50,
max_handshakes: 64,
reserved_protocols: HashMap::new(),
ip_filter: IpFilter::default(),
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "Parity-network".into(),
}
}
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
config
}
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
config.nat_enabled = false;
config
}
}
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = [u8; 3];
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
pub enum NetworkIoMessage {
/// Register a new protocol handler.
AddHandler {
/// Handler shared instance.
handler: Arc,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions.
versions: Vec,
/// Number of packet IDs reserved by the protocol.
packet_count: u8,
},
/// Register a new protocol timer
AddTimer {
/// Protocol Id.
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay in milliseconds.
delay: u64,
},
/// Initliaze public interface.
InitPublicInterface,
/// Disconnect a peer.
Disconnect(PeerId),
/// Disconnect and temporary disable peer.
DisablePeer(PeerId),
/// Network has been started with the host as the given enode.
NetworkStarted(String),
}
/// Local (temporary) peer session ID.
pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
}
impl Encodable for CapabilityInfo {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&&self.protocol[..]);
s.append(&self.version);
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct NetworkContext<'s> {
io: &'s IoContext,
protocol: ProtocolId,
sessions: Arc>>,
session: Option,
session_id: Option,
_reserved_peers: &'s HashSet,
}
impl<'s> NetworkContext<'s> {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(io: &'s IoContext,
protocol: ProtocolId,
session: Option, sessions: Arc>>,
reserved_peers: &'s HashSet) -> NetworkContext<'s> {
let id = session.as_ref().map(|s| s.lock().token());
NetworkContext {
io: io,
protocol: protocol,
session_id: id,
session: session,
sessions: sessions,
_reserved_peers: reserved_peers,
}
}
fn resolve_session(&self, peer: PeerId) -> Option {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().get(peer).cloned(),
}
}
/// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
/// Send a packet over the network to another peer using specified protocol.
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
}
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), Error> {
assert!(self.session.is_some(), "Respond called without network context");
self.session_id.map_or_else(|| Err(ErrorKind::Expired.into()), |id| self.send(id, packet_id, data))
}
/// Get an IoChannel.
pub fn io_channel(&self) -> IoChannel {
self.io.channel()
}
/// Disconnect a peer and prevent it from connecting again.
pub fn disable_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
/// Disconnect peer. Reconnect can be attempted later.
pub fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
/// Check if the session is still active.
pub fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
self.io.message(NetworkIoMessage::AddTimer {
token: token,
delay: ms,
protocol: self.protocol,
}).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
Ok(())
}
/// Returns peer identification string
pub fn peer_client_version(&self, peer: PeerId) -> String {
self.resolve_session(peer).map_or("unknown".to_owned(), |s| s.lock().info.client_version.clone())
}
/// Returns information on p2p session
pub fn session_info(&self, peer: PeerId) -> Option {
self.resolve_session(peer).map(|s| s.lock().info.clone())
}
/// Returns max version for a given protocol.
pub fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
/// Returns this object's subprotocol name.
pub fn subprotocol_name(&self) -> ProtocolId { self.protocol }
}
/// Shared host information
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
nonce: H256,
/// RLPx protocol version
pub protocol_version: u32,
/// Registered capabilities (handlers)
pub capabilities: Vec,
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
/// Public address + discovery port
pub public_endpoint: Option,
}
impl HostInfo {
/// Returns public key
pub fn id(&self) -> &NodeId {
self.keys.public()
}
/// Returns secret key
pub fn secret(&self) -> &Secret {
self.keys.secret()
}
/// Increments and returns connection nonce.
pub fn next_nonce(&mut self) -> H256 {
self.nonce = keccak(&self.nonce);
self.nonce
}
pub fn client_version(&self) -> &str {
&self.config.client_version
}
}
type SharedSession = Arc>;
#[derive(Copy, Clone)]
struct ProtocolTimer {
pub protocol: ProtocolId,
pub token: TimerToken, // Handler level token
}
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host {
pub info: RwLock,
tcp_listener: Mutex,
sessions: Arc>>,
discovery: Mutex