// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of OpenEthereum.
// OpenEthereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// OpenEthereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with OpenEthereum. If not, see .
use ethereum_types::H256;
use ethkey::{Generator, KeyPair, Random, Secret};
use hash::keccak;
use mio::{deprecated::EventLoop, tcp::*, udp::*, *};
use rlp::{Encodable, RlpStream};
use std::{
cmp::{max, min},
collections::{HashMap, HashSet},
fs,
io::{self, Read, Write},
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
ops::*,
path::{Path, PathBuf},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering as AtomicOrdering},
Arc,
},
time::Duration,
};
use discovery::{Discovery, NodeEntry, TableUpdates, MAX_DATAGRAM_SIZE};
use io::*;
use ip_utils::{map_external_address, select_public_address};
use network::{
client_version::ClientVersion, ConnectionDirection, ConnectionFilter, DisconnectReason, Error,
ErrorKind, NetworkConfiguration, NetworkContext as NetworkContextTrait, NetworkIoMessage,
NetworkProtocolHandler, NonReservedPeerMode, PacketId, PeerId, ProtocolId, SessionInfo,
};
use node_table::*;
use parity_path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use session::{Session, SessionData};
use PROTOCOL_VERSION;
type Slab = ::slab::Slab;
const MAX_SESSIONS: usize = 2048 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;
const DEFAULT_PORT: u16 = 30303;
// StreamToken/TimerToken
const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
const IDLE: TimerToken = SYS_TIMER + 2;
const DISCOVERY: StreamToken = SYS_TIMER + 3;
const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
const NODE_TABLE: TimerToken = SYS_TIMER + 7;
const FIRST_SESSION: StreamToken = 0;
const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: TimerToken = LAST_SESSION + 256;
const SYS_TIMER: TimerToken = LAST_SESSION + 1;
// Timeouts
// for IDLE TimerToken
const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1);
// for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60);
// for FAST_DISCOVERY_REFRESH TimerToken
const FAST_DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
// for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300);
// for NODE_TABLE TimerToken
const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
/// Protocol ID
pub protocol: ProtocolId,
/// Protocol version
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
}
impl Encodable for CapabilityInfo {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&&self.protocol[..]);
s.append(&self.version);
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct NetworkContext<'s> {
io: &'s IoContext,
protocol: ProtocolId,
sessions: Arc>>,
session: Option,
session_id: Option,
reserved_peers: &'s HashSet,
}
impl<'s> NetworkContext<'s> {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(
io: &'s IoContext,
protocol: ProtocolId,
session: Option,
sessions: Arc>>,
reserved_peers: &'s HashSet,
) -> NetworkContext<'s> {
let id = session.as_ref().map(|s| s.lock().token());
NetworkContext {
io,
protocol,
session_id: id,
session,
sessions,
reserved_peers: reserved_peers,
}
}
fn resolve_session(&self, peer: PeerId) -> Option {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().get(peer).cloned(),
}
}
}
impl<'s> NetworkContextTrait for NetworkContext<'s> {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
fn send_protocol(
&self,
protocol: ProtocolId,
peer: PeerId,
packet_id: PacketId,
data: Vec,
) -> Result<(), Error> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session
.lock()
.send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
}
fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), Error> {
assert!(
self.session.is_some(),
"Respond called without network context"
);
self.session_id.map_or_else(
|| Err(ErrorKind::Expired.into()),
|id| self.send(id, packet_id, data),
)
}
fn disable_peer(&self, peer: PeerId) {
self.io
.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
fn disconnect_peer(&self, peer: PeerId) {
self.io
.message(NetworkIoMessage::Disconnect(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> {
self.io
.message(NetworkIoMessage::AddTimer {
token,
delay,
protocol: self.protocol,
})
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
Ok(())
}
fn peer_client_version(&self, peer: PeerId) -> ClientVersion {
self.resolve_session(peer)
.map_or(ClientVersion::from("unknown").to_owned(), |s| {
s.lock().info.client_version.clone()
})
}
fn session_info(&self, peer: PeerId) -> Option {
self.resolve_session(peer).map(|s| s.lock().info.clone())
}
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
fn subprotocol_name(&self) -> ProtocolId {
self.protocol
}
fn is_reserved_peer(&self, peer: PeerId) -> bool {
self.session_info(peer)
.and_then(|info| info.id)
.map(|node| self.reserved_peers.contains(&node))
.unwrap_or(false)
}
}
/// Shared host information
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
nonce: H256,
/// RLPx protocol version
pub protocol_version: u32,
/// Registered capabilities (handlers)
pub capabilities: Vec,
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
/// Public address + discovery port
pub public_endpoint: Option,
}
impl HostInfo {
fn next_nonce(&mut self) -> H256 {
self.nonce = keccak(&self.nonce);
self.nonce
}
pub(crate) fn client_version(&self) -> &str {
&self.config.client_version
}
pub(crate) fn secret(&self) -> &Secret {
self.keys.secret()
}
pub(crate) fn id(&self) -> &NodeId {
self.keys.public()
}
}
type SharedSession = Arc>;
#[derive(Copy, Clone)]
struct ProtocolTimer {
pub protocol: ProtocolId,
pub token: TimerToken, // Handler level token
}
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
///
/// NOTE: must keep the lock in order of: reserved_nodes (rwlock) -> session (mutex, from sessions)
pub struct Host {
pub info: RwLock,
udp_socket: Mutex