// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::hash::{Hasher};
use std::str::{FromStr};
use std::sync::*;
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
use std::io::{Read, Write};
use std::default::Default;
use std::fs;
use mio::*;
use mio::tcp::*;
use hash::*;
use misc::version;
use crypto::*;
use sha3::Hashable;
use rlp::*;
use network::handshake::Handshake;
use network::session::{Session, SessionData};
use error::*;
use io::*;
use network::{NetworkProtocolHandler, PROTOCOL_VERSION};
use network::node_table::*;
use network::stats::NetworkStats;
use network::error::DisconnectReason;
use network::discovery::{Discovery, TableUpdates, NodeEntry};
use network::ip_utils::{map_external_address, select_public_address};
type Slab = ::slab::Slab;
const _DEFAULT_PORT: u16 = 30304;
const MAX_SESSIONS: usize = 1024;
const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)]
/// Network service configuration
pub struct NetworkConfiguration {
/// Directory path to store network configuration. None means nothing will be saved
pub config_path: Option,
/// IP address to listen for incoming connections. Listen to all connections by default
pub listen_address: Option,
/// IP address to advertise. Detected automatically if none.
pub public_address: Option,
/// Port for UDP connections, same as TCP by default
pub udp_port: Option,
/// Enable NAT configuration
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// Pin to boot nodes only
pub pin: bool,
/// List of initial node addresses
pub boot_nodes: Vec,
/// Use provided node key instead of default
pub use_secret: Option,
/// Number of connected peers to maintain
pub ideal_peers: u32,
}
impl Default for NetworkConfiguration {
fn default() -> Self {
NetworkConfiguration::new()
}
}
impl NetworkConfiguration {
/// Create a new instance of default settings.
pub fn new() -> Self {
NetworkConfiguration {
config_path: None,
listen_address: None,
public_address: None,
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
pin: false,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 25,
}
}
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap());
config
}
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap());
config.nat_enabled = false;
config
}
}
// Tokens
const TCP_ACCEPT: usize = LAST_HANDSHAKE + 1;
const IDLE: usize = LAST_HANDSHAKE + 2;
const DISCOVERY: usize = LAST_HANDSHAKE + 3;
const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4;
const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5;
const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6;
const FIRST_SESSION: usize = 0;
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
const FIRST_HANDSHAKE: usize = LAST_SESSION + 1;
const LAST_HANDSHAKE: usize = FIRST_HANDSHAKE + MAX_HANDSHAKES - 1;
const USER_TIMER: usize = LAST_HANDSHAKE + 256;
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = &'static str;
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
pub enum NetworkIoMessage where Message: Send + Sync + Clone {
/// Register a new protocol handler.
AddHandler {
/// Handler shared instance.
handler: Arc + Sync>,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions.
versions: Vec,
},
/// Register a new protocol timer
AddTimer {
/// Protocol Id.
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay in milliseconds.
delay: u64,
},
/// Disconnect a peer
Disconnect(PeerId),
/// User message
User(Message),
}
/// Local (temporary) peer session ID.
pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
}
impl Encodable for CapabilityInfo {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&self.protocol);
s.append(&self.version);
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, 's {
io: &'s IoContext>,
protocol: ProtocolId,
sessions: Arc>>,
session: Option,
session_id: Option,
}
impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(io: &'s IoContext>,
protocol: ProtocolId,
session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> {
let id = session.as_ref().map(|s| s.lock().unwrap().token());
NetworkContext {
io: io,
protocol: protocol,
session_id: id,
session: session,
sessions: sessions,
}
}
fn resolve_session(&self, peer: PeerId) -> Option {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().unwrap().get(peer).cloned(),
}
}
/// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> {
let session = self.resolve_session(peer);
if let Some(session) = session {
try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data));
try!(self.io.update_registration(peer));
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
}
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> {
assert!(self.session.is_some(), "Respond called without network context");
self.send(self.session_id.unwrap(), packet_id, data)
}
/// Send an IO message
pub fn message(&self, msg: Message) {
self.io.message(NetworkIoMessage::User(msg));
}
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
self.disconnect_peer(peer);
}
/// Disconnect peer. Reconnect can be attempted later.
pub fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect(peer));
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), UtilError> {
self.io.message(NetworkIoMessage::AddTimer {
token: token,
delay: ms,
protocol: self.protocol,
});
Ok(())
}
/// Returns peer identification string
pub fn peer_info(&self, peer: PeerId) -> String {
let session = self.resolve_session(peer);
if let Some(session) = session {
return session.lock().unwrap().info.client_version.clone()
}
"unknown".to_owned()
}
}
/// Shared host information
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
nonce: H256,
/// RLPx protocol version
pub protocol_version: u32,
/// Client identifier
pub client_version: String,
/// Registered capabilities (handlers)
pub capabilities: Vec,
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
/// Public address + discovery port
pub public_endpoint: Option,
}
impl HostInfo {
/// Returns public key
pub fn id(&self) -> &NodeId {
self.keys.public()
}
/// Returns secret key
pub fn secret(&self) -> &Secret {
self.keys.secret()
}
/// Increments and returns connection nonce.
pub fn next_nonce(&mut self) -> H256 {
self.nonce = self.nonce.sha3();
self.nonce.clone()
}
}
type SharedSession = Arc>;
type SharedHandshake = Arc>;
#[derive(Copy, Clone)]
struct ProtocolTimer {
pub protocol: ProtocolId,
pub token: TimerToken, // Handler level token
}
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host where Message: Send + Sync + Clone {
pub info: RwLock,
tcp_listener: Mutex,
handshakes: Arc>>,
sessions: Arc>>,
discovery: Mutex