// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see .
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io::{self, Read, Write};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::*;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::time::Duration;
use ethereum_types::H256;
use keccak_hash::keccak;
use log::{debug, info, trace, warn};
use mio::{
deprecated::EventLoop, PollOpt, Ready, tcp::{TcpListener, TcpStream},
Token,
udp::UdpSocket
};
use parity_path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use rlp::{Encodable, RlpStream};
use ethcore_io::{IoContext, IoHandler, IoManager, StreamToken, TimerToken};
use parity_crypto::publickey::{Generator, KeyPair, Random, Secret};
use network::{
client_version::ClientVersion, ConnectionDirection, ConnectionFilter, DisconnectReason, Error,
NetworkConfiguration, NetworkContext as NetworkContextTrait, NetworkIoMessage, NetworkProtocolHandler,
NonReservedPeerMode, PacketId, PeerId, ProtocolId, SessionInfo
};
use crate::{
connection::PAYLOAD_SOFT_LIMIT,
discovery::{Discovery, MAX_DATAGRAM_SIZE, NodeEntry, TableUpdates},
ip_utils::{map_external_address, select_public_address},
node_table::*,
PROTOCOL_VERSION,
session::{Session, SessionData}
};
type Slab = ::slab::Slab;
const MAX_SESSIONS: usize = 2048 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;
const DEFAULT_PORT: u16 = 30303;
// StreamToken/TimerToken
const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
const IDLE: TimerToken = SYS_TIMER + 2;
const DISCOVERY: StreamToken = SYS_TIMER + 3;
const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
const NODE_TABLE: TimerToken = SYS_TIMER + 7;
const FIRST_SESSION: StreamToken = 0;
const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: TimerToken = LAST_SESSION + 256;
const SYS_TIMER: TimerToken = LAST_SESSION + 1;
// Timeouts
// for IDLE TimerToken
const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1);
// for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60);
// for FAST_DISCOVERY_REFRESH TimerToken
const FAST_DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
// for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300);
// for NODE_TABLE TimerToken
const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
/// Protocol ID
pub protocol: ProtocolId,
/// Protocol version
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
}
impl Encodable for CapabilityInfo {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&&self.protocol[..]);
s.append(&self.version);
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct NetworkContext<'s> {
io: &'s IoContext,
protocol: ProtocolId,
sessions: Arc>>,
session: Option,
session_id: Option,
reserved_peers: &'s HashSet,
}
impl<'s> NetworkContext<'s> {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(
io: &'s IoContext,
protocol: ProtocolId,
session: Option,
sessions: Arc>>,
reserved_peers: &'s HashSet,
) -> NetworkContext<'s> {
let id = session.as_ref().map(|s| s.lock().token());
NetworkContext {
io,
protocol,
session_id: id,
session,
sessions,
reserved_peers: reserved_peers,
}
}
fn resolve_session(&self, peer: PeerId) -> Option {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().get(peer).cloned(),
}
}
}
impl<'s> NetworkContextTrait for NetworkContext<'s> {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
}
fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), Error> {
assert!(self.session.is_some(), "Respond called without network context");
self.session_id.map_or_else(|| Err(Error::Expired), |id| self.send(id, packet_id, data))
}
fn disable_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> {
self.io.message(NetworkIoMessage::AddTimer {
token,
delay,
protocol: self.protocol,
}).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
Ok(())
}
fn peer_client_version(&self, peer: PeerId) -> ClientVersion {
self.resolve_session(peer).map_or(ClientVersion::from("unknown").to_owned(), |s| s.lock().info.client_version.clone())
}
fn session_info(&self, peer: PeerId) -> Option {
self.resolve_session(peer).map(|s| s.lock().info.clone())
}
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
fn subprotocol_name(&self) -> ProtocolId { self.protocol }
fn is_reserved_peer(&self, peer: PeerId) -> bool {
self.session_info(peer)
.and_then(|info| info.id)
.map(|node| self.reserved_peers.contains(&node))
.unwrap_or(false)
}
fn payload_soft_limit(&self) -> usize {
PAYLOAD_SOFT_LIMIT
}
}
/// Shared host information
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
nonce: H256,
/// RLPx protocol version
pub protocol_version: u32,
/// Registered capabilities (handlers)
pub capabilities: Vec,
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
/// Public address + discovery port
pub public_endpoint: Option,
}
impl HostInfo {
fn next_nonce(&mut self) -> H256 {
self.nonce = keccak(&self.nonce);
self.nonce
}
pub(crate) fn client_version(&self) -> &str {
&self.config.client_version
}
pub(crate) fn secret(&self) -> &Secret {
self.keys.secret()
}
pub(crate) fn id(&self) -> &NodeId {
self.keys.public()
}
}
type SharedSession = Arc>;
#[derive(Copy, Clone)]
struct ProtocolTimer {
pub protocol: ProtocolId,
pub token: TimerToken, // Handler level token
}
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
///
/// NOTE: must keep the lock in order of: reserved_nodes (rwlock) -> session (mutex, from sessions)
pub struct Host {
pub info: RwLock,
udp_socket: Mutex