openethereum/util/network-devp2p/src/host.rs

1266 lines
41 KiB
Rust
Raw Normal View History

// Copyright 2015-2018 Parity Technologies (UK) Ltd.
2016-02-05 13:40:41 +01:00
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2016-01-21 16:48:37 +01:00
use std::ops::*;
use std::cmp::{min, max};
2016-02-15 18:36:34 +01:00
use std::path::{Path, PathBuf};
2017-11-13 14:37:08 +01:00
use std::io::{Read, Write, self};
2016-02-15 18:36:34 +01:00
use std::fs;
use std::time::Duration;
use ethkey::{KeyPair, Secret, Random, Generator};
use hash::keccak;
2015-11-29 11:50:28 +01:00
use mio::*;
2016-10-30 09:56:34 +01:00
use mio::deprecated::{EventLoop};
2015-11-29 11:50:28 +01:00
use mio::tcp::*;
use mio::udp::*;
use ethereum_types::H256;
use rlp::{RlpStream, Encodable};
use session::{Session, SessionData};
use io::*;
use PROTOCOL_VERSION;
use node_table::*;
use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId};
use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait};
use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler};
use discovery::{Discovery, TableUpdates, NodeEntry, MAX_DATAGRAM_SIZE};
use ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use network::{ConnectionFilter, ConnectionDirection};
2016-01-15 00:50:48 +01:00
type Slab<T> = ::slab::Slab<T, usize>;
2015-11-29 11:50:28 +01:00
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;
const DEFAULT_PORT: u16 = 30303;
// StreamToken/TimerToken
const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
const IDLE: TimerToken = SYS_TIMER + 2;
const DISCOVERY: StreamToken = SYS_TIMER + 3;
const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
const NODE_TABLE: TimerToken = SYS_TIMER + 7;
const FIRST_SESSION: StreamToken = 0;
const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: TimerToken = LAST_SESSION + 256;
const SYS_TIMER: TimerToken = LAST_SESSION + 1;
// Timeouts
// for IDLE TimerToken
const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1);
// for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60);
// for FAST_DISCOVERY_REFRESH TimerToken
const FAST_DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
// for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300);
// for NODE_TABLE TimerToken
const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
2016-01-14 16:52:10 +01:00
2015-12-02 20:11:13 +01:00
#[derive(Debug, PartialEq, Eq)]
2016-01-10 22:42:27 +01:00
/// Protocol info
2015-12-02 20:11:13 +01:00
pub struct CapabilityInfo {
/// Protocol ID
2015-12-17 11:42:30 +01:00
pub protocol: ProtocolId,
/// Protocol version
2015-12-17 11:42:30 +01:00
pub version: u8,
2016-01-10 22:42:27 +01:00
/// Total number of packet IDs this protocol support.
2015-12-17 11:42:30 +01:00
pub packet_count: u8,
2015-12-02 20:11:13 +01:00
}
impl Encodable for CapabilityInfo {
2016-01-27 12:14:57 +01:00
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&&self.protocol[..]);
2016-01-27 12:14:57 +01:00
s.append(&self.version);
2015-12-02 20:11:13 +01:00
}
}
2016-01-10 22:42:27 +01:00
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct NetworkContext<'s> {
io: &'s IoContext<NetworkIoMessage>,
2016-01-13 13:56:48 +01:00
protocol: ProtocolId,
sessions: Arc<RwLock<Slab<SharedSession>>>,
2016-03-05 23:09:51 +01:00
session: Option<SharedSession>,
session_id: Option<StreamToken>,
_reserved_peers: &'s HashSet<NodeId>,
2015-12-17 11:42:30 +01:00
}
impl<'s> NetworkContext<'s> {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(
io: &'s IoContext<NetworkIoMessage>,
protocol: ProtocolId,
session: Option<SharedSession>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
reserved_peers: &'s HashSet<NodeId>,
) -> NetworkContext<'s> {
let id = session.as_ref().map(|s| s.lock().token());
NetworkContext {
io,
protocol,
2016-03-05 23:09:51 +01:00
session_id: id,
session,
sessions,
_reserved_peers: reserved_peers,
2015-12-17 11:42:30 +01:00
}
}
2016-03-05 23:09:51 +01:00
fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().get(peer).cloned(),
2016-03-05 23:09:51 +01:00
}
}
}
2016-03-05 23:09:51 +01:00
impl<'s> NetworkContextTrait for NetworkContext<'s> {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
2016-03-05 23:09:51 +01:00
let session = self.resolve_session(peer);
if let Some(session) = session {
2017-10-19 14:41:11 +02:00
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
2016-01-21 16:48:37 +01:00
} else {
2016-02-20 01:10:27 +01:00
trace!(target: "network", "Send: Peer no longer exist")
2015-12-22 22:23:43 +01:00
}
2015-12-17 11:42:30 +01:00
Ok(())
}
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
2016-03-05 23:09:51 +01:00
assert!(self.session.is_some(), "Respond called without network context");
2017-11-13 14:37:08 +01:00
self.session_id.map_or_else(|| Err(ErrorKind::Expired.into()), |id| self.send(id, packet_id, data))
2015-12-17 11:42:30 +01:00
}
fn disable_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
2016-02-02 14:54:46 +01:00
}
fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
2015-12-02 20:11:13 +01:00
}
2015-12-28 11:41:51 +01:00
fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> {
2016-01-21 16:48:37 +01:00
self.io.message(NetworkIoMessage::AddTimer {
token,
delay,
2016-01-21 16:48:37 +01:00
protocol: self.protocol,
}).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
2016-01-21 16:48:37 +01:00
Ok(())
}
2016-01-14 16:52:10 +01:00
fn peer_client_version(&self, peer: PeerId) -> String {
self.resolve_session(peer).map_or("unknown".to_owned(), |s| s.lock().info.client_version.clone())
}
fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
self.resolve_session(peer).map(|s| s.lock().info.clone())
2016-01-14 16:52:10 +01:00
}
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
fn subprotocol_name(&self) -> ProtocolId { self.protocol }
2015-12-17 11:42:30 +01:00
}
2016-01-10 22:42:27 +01:00
/// Shared host information
pub struct HostInfo {
2016-01-10 22:42:27 +01:00
/// Our private and public keys.
2016-01-08 13:49:00 +01:00
keys: KeyPair,
2016-01-10 22:42:27 +01:00
/// Current network configuration
2016-01-08 13:49:00 +01:00
config: NetworkConfiguration,
2016-01-10 22:42:27 +01:00
/// Connection nonce.
2015-12-02 20:11:13 +01:00
nonce: H256,
2016-01-10 22:42:27 +01:00
/// RLPx protocol version
2015-12-02 20:11:13 +01:00
pub protocol_version: u32,
2016-01-10 22:42:27 +01:00
/// Registered capabilities (handlers)
2016-02-23 19:38:06 +01:00
pub capabilities: Vec<CapabilityInfo>,
2016-03-20 11:35:46 +01:00
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
2016-02-23 19:38:06 +01:00
/// Public address + discovery port
2016-03-20 11:35:46 +01:00
pub public_endpoint: Option<NodeEndpoint>,
}
2018-05-18 08:04:25 +02:00
impl HostInfo {
fn next_nonce(&mut self) -> H256 {
self.nonce = keccak(&self.nonce);
self.nonce
}
pub(crate) fn client_version(&self) -> &str {
&self.config.client_version
}
pub(crate) fn secret(&self) -> &Secret {
self.keys.secret()
}
2016-01-10 22:42:27 +01:00
2018-06-02 11:05:11 +02:00
pub(crate) fn id(&self) -> &NodeId {
self.keys.public()
}
}
type SharedSession = Arc<Mutex<Session>>;
2016-01-21 16:48:37 +01:00
#[derive(Copy, Clone)]
struct ProtocolTimer {
pub protocol: ProtocolId,
pub token: TimerToken, // Handler level token
}
2016-01-10 22:42:27 +01:00
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host {
2016-01-21 16:48:37 +01:00
pub info: RwLock<HostInfo>,
udp_socket: Mutex<Option<UdpSocket>>,
2016-03-20 11:35:46 +01:00
tcp_listener: Mutex<TcpListener>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
Backports to 2.0.0-beta (#9094) * parity-version: betalize 2.0 * Multiple improvements to discovery ping handling (#8771) * discovery: Only add nodes to routing table after receiving pong. Previously the discovery algorithm would add nodes to the routing table before confirming that the endpoint is participating in the protocol. This now tracks in-flight pings and adds to the routing table only after receiving a response. * discovery: Refactor packet creation into its own function. This function is useful inside unit tests. * discovery: Additional testing for new add_node behavior. * discovery: Track expiration of pings to non-yet-in-bucket nodes. Now that we may ping nodes before adding to a k-bucket, the timeout tracking must be separate from BucketEntry. * discovery: Verify echo hash on pong packets. Stores packet hash with in-flight requests and matches with pong response. * discovery: Track timeouts on FIND_NODE requests. * discovery: Retry failed pings with exponential backoff. UDP packets may get dropped, so instead of immediately booting nodes that fail to respond to a ping, retry 4 times with exponential backoff. * !fixup Use slice instead of Vec for request_backoff. * Add separate database directory for light client (#8927) (#9064) * Add seperate default DB path for light client (#8927) * Improve readability * Revert "Replace `std::env::home_dir` with `dirs::home_dir` (#9077)" (#9097) * Revert "Replace `std::env::home_dir` with `dirs::home_dir` (#9077)" This reverts commit 7e779327ebad5a60e068f39c60bcf944f3c99114. * Restore some of the changes * Update parity-common * Offload cull to IoWorker. (#9099) * Fix work-notify. (#9104) * Update hidapi, fixes #7542 (#9108) * docker: add cmake dependency (#9111) * Update light client hardcoded headers (#9098) * Insert Kovan hardcoded headers until #7690241 * Insert Kovan hardcoded headers until block 7690241 * Insert Ropsten hardcoded headers until #3612673 * Insert Mainnet hardcoded headers until block 5941249 * Make sure to produce full blocks. (#9115) * Insert ETC (classic) hardcoded headers until block #6170625 (#9121) * fix verification in ethcore-sync collect_blocks (#9135) * Completely remove all dapps struct from rpc (#9107) * Completely remove all dapps struct from rpc * Remove unused pub use * `evm bench` fix broken dependencies (#9134) * `evm bench` use valid dependencies Benchmarks of the `evm` used stale versions of a couple a crates that this commit fixes! * fix warnings * Update snapcraft.yaml (#9132)
2018-07-17 13:47:14 +02:00
discovery: Mutex<Option<Discovery<'static>>>,
2016-02-12 09:52:32 +01:00
nodes: RwLock<NodeTable>,
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler + Sync>>>,
2016-01-21 16:48:37 +01:00
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
timer_counter: RwLock<usize>,
reserved_nodes: RwLock<HashSet<NodeId>>,
2016-06-17 18:26:54 +02:00
stopping: AtomicBool,
2017-08-29 14:38:01 +02:00
filter: Option<Arc<ConnectionFilter>>,
2015-11-29 11:50:28 +01:00
}
impl Host {
2016-01-23 02:36:58 +01:00
/// Create a new instance
2018-03-28 08:45:36 +02:00
pub fn new(mut config: NetworkConfiguration, filter: Option<Arc<ConnectionFilter>>) -> Result<Host, Error> {
2016-03-20 11:35:46 +01:00
let mut listen_address = match config.listen_address {
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
2016-02-16 02:05:36 +01:00
Some(addr) => addr,
};
let keys = if let Some(ref secret) = config.use_secret {
KeyPair::from_secret(secret.clone())?
} else {
config.config_path.clone().and_then(|ref p| load_key(Path::new(&p)))
2016-02-15 18:36:34 +01:00
.map_or_else(|| {
let key = Random.generate().expect("Error generating random key pair");
2016-02-15 18:36:34 +01:00
if let Some(path) = config.config_path.clone() {
save_key(Path::new(&path), key.secret());
2016-02-15 18:36:34 +01:00
}
key
},
|s| KeyPair::from_secret(s).expect("Error creating node secret key"))
};
let path = config.net_config_path.clone();
2016-03-20 11:35:46 +01:00
// Setup the server socket
let tcp_listener = TcpListener::bind(&listen_address)?;
listen_address = SocketAddr::new(listen_address.ip(), tcp_listener.local_addr()?.port());
2016-12-10 14:20:34 +01:00
debug!(target: "network", "Listening at {:?}", listen_address);
let udp_port = config.udp_port.unwrap_or_else(|| listen_address.port());
2016-02-23 19:38:06 +01:00
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
2016-03-20 11:35:46 +01:00
let boot_nodes = config.boot_nodes.clone();
let reserved_nodes = config.reserved_nodes.clone();
config.max_handshakes = min(config.max_handshakes, MAX_HANDSHAKES as u32);
let mut host = Host {
2016-01-21 16:48:37 +01:00
info: RwLock::new(HostInfo {
2016-02-12 09:52:32 +01:00
keys: keys,
config: config,
2015-12-02 20:11:13 +01:00
nonce: H256::random(),
2016-02-13 22:57:39 +01:00
protocol_version: PROTOCOL_VERSION,
2015-12-17 11:42:30 +01:00
capabilities: Vec::new(),
2016-03-20 11:35:46 +01:00
public_endpoint: None,
local_endpoint: local_endpoint,
2016-01-21 16:48:37 +01:00
}),
2016-02-23 19:38:06 +01:00
discovery: Mutex::new(None),
udp_socket: Mutex::new(None),
2016-03-20 11:35:46 +01:00
tcp_listener: Mutex::new(tcp_listener),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
2016-02-12 09:52:32 +01:00
nodes: RwLock::new(NodeTable::new(path)),
2016-01-21 16:48:37 +01:00
handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(USER_TIMER),
reserved_nodes: RwLock::new(HashSet::new()),
2016-06-17 18:26:54 +02:00
stopping: AtomicBool::new(false),
2017-08-29 14:38:01 +02:00
filter: filter,
2016-01-21 16:48:37 +01:00
};
for n in boot_nodes {
host.add_node(&n);
}
for n in reserved_nodes {
if let Err(e) = host.add_reserved_node(&n) {
debug!(target: "network", "Error parsing node id: {}: {:?}", n, e);
}
2016-01-23 02:36:58 +01:00
}
2016-03-20 11:35:46 +01:00
Ok(host)
2016-01-08 13:49:00 +01:00
}
2015-11-29 11:50:28 +01:00
pub fn add_node(&mut self, id: &str) {
2015-11-29 11:50:28 +01:00
match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
2015-12-02 12:07:46 +01:00
Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id };
self.nodes.write().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock() {
2016-07-20 12:41:31 +02:00
discovery.add_node(entry);
}
2015-12-02 12:07:46 +01:00
}
2015-11-29 11:50:28 +01:00
}
}
2017-11-13 14:37:08 +01:00
pub fn add_reserved_node(&self, id: &str) -> Result<(), Error> {
let n = Node::from_str(id)?;
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id };
self.reserved_nodes.write().insert(n.id);
self.nodes.write().add_node(Node::new(entry.id, entry.endpoint.clone()));
if let Some(ref mut discovery) = *self.discovery.lock() {
2016-07-20 12:41:31 +02:00
discovery.add_node(entry);
}
Ok(())
}
pub fn set_non_reserved_mode(&self, mode: &NonReservedPeerMode, io: &IoContext<NetworkIoMessage>) {
let mut info = self.info.write();
if &info.config.non_reserved_mode != mode {
info.config.non_reserved_mode = mode.clone();
drop(info);
if let NonReservedPeerMode::Deny = mode {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
let mut s = e.lock();
{
let id = s.id();
if id.map_or(false, |id| reserved.contains(id)) {
continue;
}
}
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
for p in to_kill {
trace!(target: "network", "Disconnecting on reserved-only mode: {}", p);
self.kill_connection(p, io, false);
}
}
}
}
2017-11-13 14:37:08 +01:00
pub fn remove_reserved_node(&self, id: &str) -> Result<(), Error> {
let n = Node::from_str(id)?;
self.reserved_nodes.write().remove(&n.id);
Ok(())
}
2016-03-20 11:35:46 +01:00
pub fn external_url(&self) -> Option<String> {
2017-01-30 17:08:21 +01:00
let info = self.info.read();
info.public_endpoint.as_ref().map(|e| format!("{}", Node::new(*info.id(), e.clone())))
2016-02-23 19:38:06 +01:00
}
2016-03-20 11:35:46 +01:00
pub fn local_url(&self) -> String {
2017-01-30 17:08:21 +01:00
let info = self.info.read();
format!("{}", Node::new(*info.id(), info.local_endpoint.clone()))
2016-03-20 11:35:46 +01:00
}
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) {
2016-06-17 18:26:54 +02:00
self.stopping.store(true, AtomicOrdering::Release);
2016-06-17 12:58:28 +02:00
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
let mut s = e.lock();
2016-06-17 18:26:54 +02:00
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
2016-06-17 12:58:28 +02:00
}
for p in to_kill {
2016-06-17 18:26:54 +02:00
trace!(target: "network", "Disconnecting on shutdown: {}", p);
2016-06-17 12:58:28 +02:00
self.kill_connection(p, io, true);
}
io.unregister_handler();
2016-06-17 12:58:28 +02:00
}
/// Get all connected peers.
pub fn connected_peers(&self) -> Vec<PeerId> {
let sessions = self.sessions.read();
let sessions = &*sessions;
let mut peers = Vec::with_capacity(sessions.count());
for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) {
if sessions.get(i).is_some() {
peers.push(i);
}
}
peers
}
2017-11-13 14:37:08 +01:00
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
if self.info.read().public_endpoint.is_some() {
2016-03-20 11:35:46 +01:00
return Ok(());
2016-02-23 19:38:06 +01:00
}
let local_endpoint = self.info.read().local_endpoint.clone();
let public_address = self.info.read().config.public_address.clone();
let allow_ips = self.info.read().config.ip_filter.clone();
2016-03-04 22:01:36 +01:00
let public_endpoint = match public_address {
2016-02-23 19:38:06 +01:00
None => {
2016-03-20 11:35:46 +01:00
let public_address = select_public_address(local_endpoint.address.port());
let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port };
if self.info.read().config.nat_enabled {
2016-02-23 19:38:06 +01:00
match map_external_address(&local_endpoint) {
Some(endpoint) => {
2016-03-20 11:35:46 +01:00
info!("NAT mapped to external address {}", endpoint.address);
2016-02-23 19:38:06 +01:00
endpoint
},
2016-03-20 11:35:46 +01:00
None => public_endpoint
2016-02-23 19:38:06 +01:00
}
} else {
2016-03-20 11:35:46 +01:00
public_endpoint
2016-02-23 19:38:06 +01:00
}
}
2016-03-20 11:35:46 +01:00
Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
2016-02-23 19:38:06 +01:00
};
2016-02-29 23:09:51 +01:00
self.info.write().public_endpoint = Some(public_endpoint.clone());
2016-06-29 17:50:27 +02:00
if let Some(url) = self.external_url() {
io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
2016-06-29 17:50:27 +02:00
}
2016-02-23 19:38:06 +01:00
// Initialize discovery.
let discovery = {
let info = self.info.read();
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
Some(Discovery::new(&info.keys, public_endpoint, allow_ips))
2016-02-23 19:38:06 +01:00
} else { None }
};
if let Some(mut discovery) = discovery {
let mut udp_addr = local_endpoint.address;
udp_addr.set_port(local_endpoint.udp_port);
let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket");
*self.udp_socket.lock() = Some(socket);
discovery.add_node_list(self.nodes.read().entries());
*self.discovery.lock() = Some(discovery);
io.register_stream(DISCOVERY)?;
io.register_timer(FAST_DISCOVERY_REFRESH, FAST_DISCOVERY_REFRESH_TIMEOUT)?;
io.register_timer(DISCOVERY_REFRESH, DISCOVERY_REFRESH_TIMEOUT)?;
io.register_timer(DISCOVERY_ROUND, DISCOVERY_ROUND_TIMEOUT)?;
2016-02-23 19:38:06 +01:00
}
io.register_timer(NODE_TABLE, NODE_TABLE_TIMEOUT)?;
io.register_stream(TCP_ACCEPT)?;
2016-03-20 11:35:46 +01:00
Ok(())
2016-01-21 16:48:37 +01:00
}
fn maintain_network(&self, io: &IoContext<NetworkIoMessage>) {
2016-02-02 20:58:12 +01:00
self.keep_alive(io);
self.connect_peers(io);
2016-01-08 13:49:00 +01:00
}
2015-11-29 11:50:28 +01:00
fn have_session(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().info.id == Some(id.clone()))
2016-02-14 01:03:48 +01:00
}
// returns (handshakes, egress, ingress)
fn session_count(&self) -> (usize, usize, usize) {
let mut handshakes = 0;
let mut egress = 0;
let mut ingress = 0;
for s in self.sessions.read().iter() {
match s.try_lock() {
Some(ref s) if s.is_ready() && s.info.originated => egress += 1,
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1,
_ => handshakes +=1,
}
}
(handshakes, egress, ingress)
2015-11-29 11:50:28 +01:00
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().id() == Some(id))
2016-02-14 01:03:48 +01:00
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
2016-02-02 20:58:12 +01:00
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
let mut s = e.lock();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
to_kill.push(s.token());
2016-02-02 20:58:12 +01:00
}
}
for p in to_kill {
trace!(target: "network", "Ping timeout: {}", p);
2016-02-14 01:03:48 +01:00
self.kill_connection(p, io, true);
2016-02-02 20:58:12 +01:00
}
}
fn has_enough_peers(&self) -> bool {
let min_peers = {
let info = self.info.read();
let config = &info.config;
config.min_peers
};
let (_, egress_count, ingress_count) = self.session_count();
return egress_count + ingress_count >= min_peers as usize;
}
fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
2016-10-31 12:54:50 +01:00
let (min_peers, mut pin, max_handshakes, allow_ips, self_id) = {
let info = self.info.read();
if info.capabilities.is_empty() {
return;
}
let config = &info.config;
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.ip_filter.clone(), info.id().clone())
};
let (handshake_count, egress_count, ingress_count) = self.session_count();
let reserved_nodes = self.reserved_nodes.read();
if egress_count + ingress_count >= min_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected.
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
return;
}
// if not, only attempt connect to reserved peers
pin = true;
2015-11-29 11:50:28 +01:00
}
2016-02-16 02:05:36 +01:00
// allow 16 slots for incoming connections
if handshake_count >= max_handshakes {
2016-02-14 01:03:48 +01:00
return;
2015-11-29 11:50:28 +01:00
}
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.read().nodes(&allow_ips)
} else {
Vec::new()
});
let max_handshakes_per_round = max_handshakes / 2;
let mut started: usize = 0;
for id in nodes.filter(|id|
2017-08-29 14:38:01 +02:00
!self.have_session(id) &&
!self.connecting_to(id) &&
*id != self_id &&
self.filter.as_ref().map_or(true, |f| f.connection_allowed(&self_id, &id, ConnectionDirection::Outbound))
).take(min(max_handshakes_per_round, max_handshakes - handshake_count)) {
2016-02-14 01:03:48 +01:00
self.connect_peer(&id, io);
started += 1;
2015-11-29 11:50:28 +01:00
}
debug!(target: "network", "Connecting peers: {} sessions, {} pending + {} started", egress_count + ingress_count, handshake_count, started);
2016-01-08 13:49:00 +01:00
}
2015-11-29 11:50:28 +01:00
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) {
if self.have_session(id) {
2016-02-20 01:10:27 +01:00
trace!(target: "network", "Aborted connect. Node already connected.");
2015-11-29 11:50:28 +01:00
return;
}
2016-01-21 16:48:37 +01:00
if self.connecting_to(id) {
2016-02-20 01:10:27 +01:00
trace!(target: "network", "Aborted connect. Node already connecting.");
2015-11-29 11:50:28 +01:00
return;
}
let socket = {
2016-01-21 16:48:37 +01:00
let address = {
let mut nodes = self.nodes.write();
2016-02-14 17:42:03 +01:00
if let Some(node) = nodes.get_mut(id) {
node.endpoint.address
} else {
2016-02-20 01:10:27 +01:00
debug!(target: "network", "Connection to expired node aborted");
2016-02-14 17:45:00 +01:00
return;
}
2016-01-21 16:48:37 +01:00
};
match TcpStream::connect(&address) {
2016-12-10 14:20:34 +01:00
Ok(socket) => {
trace!(target: "network", "{}: Connecting to {:?}", id, address);
2016-12-10 14:20:34 +01:00
socket
},
2016-02-16 17:54:34 +01:00
Err(e) => {
debug!(target: "network", "{}: Can't connect to address {:?}: {:?}", id, address, e);
self.nodes.write().note_failure(&id);
return;
}
2015-11-29 11:50:28 +01:00
}
};
if let Err(e) = self.create_connection(socket, Some(id), io) {
debug!(target: "network", "Can't create connection: {:?}", e);
}
2016-01-24 18:53:54 +01:00
}
2015-11-29 11:50:28 +01:00
2017-11-13 14:37:08 +01:00
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
let nonce = self.info.write().next_nonce();
let mut sessions = self.sessions.write();
let token = sessions.insert_with_opt(|token| {
trace!(target: "network", "{}: Initiating session {:?}", token, id);
2018-03-28 08:45:36 +02:00
match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
None
}
}
});
match token {
Some(t) => io.register_stream(t).map(|_| ()).map_err(Into::into),
None => {
debug!(target: "network", "Max sessions reached");
Ok(())
}
}
}
2015-11-29 11:50:28 +01:00
fn accept(&self, io: &IoContext<NetworkIoMessage>) {
2016-02-20 01:10:27 +01:00
trace!(target: "network", "Accepting incoming connection");
2016-01-24 18:53:54 +01:00
loop {
let socket = match self.tcp_listener.lock().accept() {
2016-10-30 09:56:34 +01:00
Ok((sock, _addr)) => sock,
2016-01-24 18:53:54 +01:00
Err(e) => {
2017-11-13 14:37:08 +01:00
if e.kind() != io::ErrorKind::WouldBlock {
2016-12-10 14:20:34 +01:00
debug!(target: "network", "Error accepting connection: {:?}", e);
}
2016-01-24 18:53:54 +01:00
break
},
};
if let Err(e) = self.create_connection(socket, None, io) {
debug!(target: "network", "Can't accept connection: {:?}", e);
2016-01-08 13:55:44 +01:00
}
}
2015-11-29 11:50:28 +01:00
}
2015-12-17 11:42:30 +01:00
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
let session = { self.sessions.read().get(token).cloned() };
if let Some(session) = session {
let mut s = session.lock();
if let Err(e) = s.writable(io, &self.info.read()) {
2016-02-20 01:10:27 +01:00
trace!(target: "network", "Session write error: {}: {:?}", token, e);
}
if s.done() {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
2016-06-18 15:11:10 +02:00
}
}
}
fn connection_closed(&self, token: TimerToken, io: &IoContext<NetworkIoMessage>) {
trace!(target: "network", "Connection closed: {}", token);
2016-02-14 01:03:48 +01:00
self.kill_connection(token, io, true);
2016-01-10 14:02:01 +01:00
}
2015-12-17 11:42:30 +01:00
fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
2015-12-22 22:23:43 +01:00
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
2016-01-21 16:48:37 +01:00
let mut kill = false;
let session = { self.sessions.read().get(token).cloned() };
let mut ready_id = None;
2016-03-05 23:09:51 +01:00
if let Some(session) = session.clone() {
{
loop {
let session_result = session.lock().readable(io, &self.info.read());
match session_result {
Err(e) => {
let s = session.lock();
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
match *e.kind() {
ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) | ErrorKind::Disconnect(DisconnectReason::UselessPeer) => {
if let Some(id) = s.id() {
if !self.reserved_nodes.read().contains(id) {
let mut nodes = self.nodes.write();
nodes.note_failure(&id);
nodes.mark_as_useless(id);
}
}
},
_ => {},
}
kill = true;
break;
},
Ok(SessionData::Ready) => {
let (_, egress_count, ingress_count) = self.session_count();
let mut s = session.lock();
let (min_peers, mut max_peers, reserved_only, self_id) = {
let info = self.info.read();
let mut max_peers = info.config.max_peers;
for cap in s.info.capabilities.iter() {
if let Some(num) = info.config.reserved_protocols.get(&cap.protocol) {
max_peers += *num;
break;
}
}
2017-08-29 14:38:01 +02:00
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny, info.id().clone())
};
max_peers = max(max_peers, min_peers);
let id = s.id().expect("Ready session always has id").clone();
// Check for the session limit.
// Outgoing connections are allowed as long as their count is <= min_peers
// Incoming connections are allowed to take all of the max_peers reserve, or at most half of the slots.
let max_ingress = max(max_peers - min_peers, min_peers / 2);
if reserved_only ||
(s.info.originated && egress_count > min_peers) ||
(!s.info.originated && ingress_count > max_ingress) {
// only proceed if the connecting peer is reserved.
if !self.reserved_nodes.read().contains(&id) {
s.disconnect(io, DisconnectReason::TooManyPeers);
kill = true;
break;
}
}
2017-08-29 14:38:01 +02:00
if !self.filter.as_ref().map_or(true, |f| f.connection_allowed(&self_id, &id, ConnectionDirection::Inbound)) {
trace!(target: "network", "Inbound connection not allowed for {:?}", id);
s.disconnect(io, DisconnectReason::UnexpectedIdentity);
kill = true;
break;
}
ready_id = Some(id);
// Add it to the node table
if !s.info.originated {
if let Ok(address) = s.remote_addr() {
// We can't know remote listening ports, so just assume defaults and hope for the best.
let endpoint = NodeEndpoint { address: SocketAddr::new(address.ip(), DEFAULT_PORT), udp_port: DEFAULT_PORT };
let entry = NodeEntry { id: id, endpoint: endpoint };
let mut nodes = self.nodes.write();
if !nodes.contains(&entry.id) {
nodes.add_node(Node::new(entry.id, entry.endpoint.clone()));
let mut discovery = self.discovery.lock();
if let Some(ref mut discovery) = *discovery {
discovery.add_node(entry);
}
}
}
}
// Note connection success
self.nodes.write().note_success(&id);
for (p, _) in self.handlers.read().iter() {
if s.have_capability(*p) {
ready_data.push(*p);
}
}
},
Ok(SessionData::Packet {
data,
protocol,
packet_id,
}) => {
match self.handlers.read().get(&protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data.push((protocol, packet_id, data)),
}
},
Ok(SessionData::Continue) => (),
Ok(SessionData::None) => break,
}
}
}
if kill {
self.kill_connection(token, io, true);
}
let handlers = self.handlers.read();
if !ready_data.is_empty() {
let duplicate = self.sessions.read().iter().any(|e| {
let session = e.lock();
session.token() != token && session.info.id == ready_id
});
if duplicate {
trace!(target: "network", "Rejected duplicate connection: {}", token);
session.lock().disconnect(io, DisconnectReason::DuplicatePeer);
self.kill_connection(token, io, false);
return;
}
for p in ready_data {
let reserved = self.reserved_nodes.read();
if let Some(h) = handlers.get(&p).clone() {
h.connected(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token);
// accumulate pending packets.
let mut session = session.lock();
packet_data.extend(session.mark_connected(p));
}
}
}
for (p, packet_id, data) in packet_data {
let reserved = self.reserved_nodes.read();
if let Some(h) = handlers.get(&p).clone() {
2017-10-19 14:41:11 +02:00
h.read(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token, packet_id, &data);
}
}
2016-01-10 14:02:01 +01:00
}
2015-11-29 11:50:28 +01:00
}
fn discovery_readable(&self, io: &IoContext<NetworkIoMessage>) {
let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) {
(Some(udp_socket), Some(discovery)) => {
let mut buf = [0u8; MAX_DATAGRAM_SIZE];
let writable = discovery.any_sends_queued();
let res = match udp_socket.recv_from(&mut buf) {
Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
debug!(target: "network", "Error processing UDP packet: {:?}", e);
None
}),
Ok(_) => None,
Err(e) => {
debug!(target: "network", "Error reading UPD socket: {:?}", e);
None
}
};
let new_writable = discovery.any_sends_queued();
if writable != new_writable {
io.update_registration(DISCOVERY)
.unwrap_or_else(|e| {
debug!(target: "network" ,"Error updating discovery registration: {:?}", e)
});
}
res
},
_ => None,
};
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
}
fn discovery_writable(&self, io: &IoContext<NetworkIoMessage>) {
match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) {
(Some(udp_socket), Some(discovery)) => {
while let Some(data) = discovery.dequeue_send() {
match udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
Ok(Some(_)) => {
warn!(target: "network", "UDP sent incomplete datagram");
},
Ok(None) => {
discovery.requeue_send(data);
return;
}
Err(e) => {
debug!(target: "network", "UDP send error: {:?}, address: {:?}", e, &data.address);
return;
}
}
}
io.update_registration(DISCOVERY)
.unwrap_or_else(|e| {
debug!(target: "network", "Error updating discovery registration: {:?}", e)
});
},
_ => (),
}
}
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
trace!(target: "network", "Connection timeout: {}", token);
2016-02-14 01:03:48 +01:00
self.kill_connection(token, io, true)
2015-12-02 12:07:46 +01:00
}
2016-01-08 13:55:44 +01:00
fn kill_connection(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>, remote: bool) {
2016-01-10 14:02:01 +01:00
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
2016-02-14 02:11:55 +01:00
let mut failure_id = None;
let mut deregister = false;
2016-03-05 23:09:51 +01:00
let mut expired_session = None;
2016-06-03 11:36:30 +02:00
if let FIRST_SESSION ... LAST_SESSION = token {
let sessions = self.sessions.read();
2016-06-03 11:36:30 +02:00
if let Some(session) = sessions.get(token).cloned() {
expired_session = Some(session.clone());
let mut s = session.lock();
2016-06-03 11:36:30 +02:00
if !s.expired() {
if s.is_ready() {
for (p, _) in self.handlers.read().iter() {
if s.have_capability(*p) {
to_disconnect.push(*p);
2016-01-21 16:48:37 +01:00
}
}
}
2016-06-03 11:36:30 +02:00
s.set_expired();
failure_id = s.id().cloned();
2016-01-10 14:02:01 +01:00
}
2016-06-03 11:36:30 +02:00
deregister = remote || s.done();
}
2016-01-10 14:02:01 +01:00
}
2016-02-14 02:11:55 +01:00
if let Some(id) = failure_id {
if remote {
self.nodes.write().note_failure(&id);
2016-01-21 16:48:37 +01:00
}
2016-01-10 14:02:01 +01:00
}
for p in to_disconnect {
let reserved = self.reserved_nodes.read();
if let Some(h) = self.handlers.read().get(&p).clone() {
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
2016-01-15 00:50:48 +01:00
}
if deregister {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
2016-06-18 15:11:10 +02:00
}
2015-11-29 11:50:28 +01:00
}
2016-02-12 09:52:32 +01:00
fn update_nodes(&self, _io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) {
2016-02-12 09:52:32 +01:00
let mut to_remove: Vec<PeerId> = Vec::new();
2016-02-14 02:11:55 +01:00
{
let sessions = self.sessions.read();
for c in sessions.iter() {
let s = c.lock();
if let Some(id) = s.id() {
if node_changes.removed.contains(id) {
to_remove.push(s.token());
2016-02-12 09:52:32 +01:00
}
}
}
2016-01-15 00:50:48 +01:00
}
2016-02-12 09:52:32 +01:00
for i in to_remove {
trace!(target: "network", "Removed from node table: {}", i);
2016-02-12 09:52:32 +01:00
}
self.nodes.write().update(node_changes, &*self.reserved_nodes.read());
2015-11-29 11:50:28 +01:00
}
pub fn with_context<F>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) where F: FnOnce(&NetworkContextTrait) {
let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context);
}
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) -> T where F: FnOnce(&NetworkContextTrait) -> T {
let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context)
}
2016-01-08 13:55:44 +01:00
}
2015-11-29 11:50:28 +01:00
impl IoHandler<NetworkIoMessage> for Host {
/// Initialize networking
fn initialize(&self, io: &IoContext<NetworkIoMessage>) {
2016-01-21 16:48:37 +01:00
io.register_timer(IDLE, MAINTENANCE_TIMEOUT).expect("Error registering Network idle timer");
2016-07-09 11:21:54 +02:00
io.message(NetworkIoMessage::InitPublicInterface).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
2016-02-19 16:34:31 +01:00
self.maintain_network(io)
}
fn stream_hup(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
2016-02-20 01:10:27 +01:00
trace!(target: "network", "Hup: {}", stream);
match stream {
FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io),
2016-02-20 01:10:27 +01:00
_ => warn!(target: "network", "Unexpected hup"),
};
}
fn stream_readable(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
2016-06-17 18:26:54 +02:00
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => self.discovery_readable(io),
TCP_ACCEPT => self.accept(io),
_ => panic!("Received unknown readable token"),
2016-01-08 13:55:44 +01:00
}
}
fn stream_writable(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
2016-06-17 18:26:54 +02:00
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => self.discovery_writable(io),
_ => panic!("Received unknown writable token"),
2015-11-29 11:50:28 +01:00
}
2016-01-08 13:55:44 +01:00
}
2016-01-08 13:49:00 +01:00
fn timeout(&self, io: &IoContext<NetworkIoMessage>, token: TimerToken) {
2016-06-17 18:26:54 +02:00
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match token {
IDLE => self.maintain_network(io),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
2016-02-12 09:52:32 +01:00
DISCOVERY_REFRESH => {
// Run the _slow_ discovery if enough peers are connected
if !self.has_enough_peers() {
return;
}
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
2016-02-12 09:52:32 +01:00
},
FAST_DISCOVERY_REFRESH => {
// Run the fast discovery if not enough peers are connected
if self.has_enough_peers() {
return;
2016-02-13 22:57:39 +01:00
}
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
self.discovery.lock().as_mut().map(|d| d.round());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
2016-02-12 09:52:32 +01:00
},
NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
self.nodes.write().clear_useless();
self.nodes.write().save();
},
_ => match self.timers.read().get(&token).cloned() {
Some(timer) => match self.handlers.read().get(&timer.protocol).cloned() {
2016-06-17 12:58:28 +02:00
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => {
let reserved = self.reserved_nodes.read();
h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token);
}
2016-01-21 16:48:37 +01:00
},
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
2015-12-17 11:42:30 +01:00
}
}
2016-01-08 13:55:44 +01:00
}
2015-12-17 11:42:30 +01:00
fn message(&self, io: &IoContext<NetworkIoMessage>, message: &NetworkIoMessage) {
2016-06-17 18:26:54 +02:00
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
2016-01-19 12:14:29 +01:00
match *message {
NetworkIoMessage::AddHandler {
2016-01-21 16:48:37 +01:00
ref handler,
ref protocol,
ref versions,
2016-01-08 13:55:44 +01:00
} => {
2016-01-21 16:48:37 +01:00
let h = handler.clone();
let reserved = self.reserved_nodes.read();
h.initialize(
&NetworkContext::new(io, *protocol, None, self.sessions.clone(), &reserved),
);
self.handlers.write().insert(*protocol, h);
let mut info = self.info.write();
for &(version, packet_count) in versions {
info.capabilities.push(CapabilityInfo {
protocol: *protocol,
version,
packet_count,
});
2016-01-08 13:55:44 +01:00
}
},
NetworkIoMessage::AddTimer {
2016-01-21 16:48:37 +01:00
ref protocol,
ref delay,
ref token,
} => {
let handler_token = {
let mut timer_counter = self.timer_counter.write();
let counter = &mut *timer_counter;
2016-01-21 16:48:37 +01:00
let handler_token = *counter;
*counter += 1;
handler_token
};
self.timers.write().insert(handler_token, ProtocolTimer { protocol: *protocol, token: *token });
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
2016-01-21 16:48:37 +01:00
},
2016-02-02 21:10:16 +01:00
NetworkIoMessage::Disconnect(ref peer) => {
let session = { self.sessions.read().get(*peer).cloned() };
if let Some(session) = session {
session.lock().disconnect(io, DisconnectReason::DisconnectRequested);
}
trace!(target: "network", "Disconnect requested {}", peer);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.read().get(*peer).cloned() };
if let Some(session) = session {
session.lock().disconnect(io, DisconnectReason::DisconnectRequested);
if let Some(id) = session.lock().id() {
let mut nodes = self.nodes.write();
nodes.note_failure(&id);
nodes.mark_as_useless(id);
}
}
trace!(target: "network", "Disabling peer {}", peer);
2016-02-14 01:03:48 +01:00
self.kill_connection(*peer, io, false);
2016-02-02 14:54:46 +01:00
},
2016-07-09 11:21:54 +02:00
NetworkIoMessage::InitPublicInterface =>
self.init_public_interface(io).unwrap_or_else(|e| warn!("Error initializing public interface: {:?}", e)),
_ => {} // ignore others.
2015-11-29 11:50:28 +01:00
}
}
2016-01-21 16:48:37 +01:00
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>) {
2016-01-21 16:48:37 +01:00
match stream {
FIRST_SESSION ... LAST_SESSION => {
let session = { self.sessions.read().get(stream).cloned() };
if let Some(session) = session {
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
}
2016-01-21 16:48:37 +01:00
}
DISCOVERY => match self.udp_socket.lock().as_ref() {
Some(udp_socket) => {
event_loop.register(udp_socket, reg, Ready::all(), PollOpt::edge())
.expect("Error registering UDP socket");
},
_ => panic!("Error registering discovery socket"),
}
2016-10-30 09:56:34 +01:00
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
2016-01-21 23:33:52 +01:00
_ => warn!("Unexpected stream registration")
2016-01-21 16:48:37 +01:00
}
}
fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>) {
2016-01-22 18:13:59 +01:00
match stream {
FIRST_SESSION ... LAST_SESSION => {
let mut connections = self.sessions.write();
2016-01-22 18:13:59 +01:00
if let Some(connection) = connections.get(stream).cloned() {
let c = connection.lock();
if c.expired() { // make sure it is the same connection that the event was generated for
c.deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
}
}
}
2016-02-12 09:52:32 +01:00
DISCOVERY => (),
2016-01-22 18:13:59 +01:00
_ => warn!("Unexpected stream deregistration")
}
}
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage>>) {
2016-01-21 16:48:37 +01:00
match stream {
FIRST_SESSION ... LAST_SESSION => {
let connection = { self.sessions.read().get(stream).cloned() };
if let Some(connection) = connection {
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_ref()) {
(Some(udp_socket), Some(discovery)) => {
let registration = if discovery.any_sends_queued() {
Ready::readable() | Ready::writable()
} else {
Ready::readable()
};
event_loop.reregister(udp_socket, reg, registration, PollOpt::edge())
.expect("Error reregistering UDP socket");
},
_ => panic!("Error reregistering discovery socket"),
}
2016-10-30 09:56:34 +01:00
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
2016-01-21 16:48:37 +01:00
_ => warn!("Unexpected stream update")
}
}
2016-01-08 13:55:44 +01:00
}
2016-02-15 18:36:34 +01:00
fn save_key(path: &Path, key: &Secret) {
let mut path_buf = PathBuf::from(path);
if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
warn!("Error creating key directory: {:?}", e);
return;
};
path_buf.push("key");
let path = path_buf.as_path();
let mut file = match fs::File::create(&path) {
2016-02-15 18:36:34 +01:00
Ok(file) => file,
Err(e) => {
warn!("Error creating key file: {:?}", e);
return;
}
};
if let Err(e) = restrict_permissions_owner(path, true, false) {
warn!(target: "network", "Failed to modify permissions of the file ({})", e);
}
if let Err(e) = file.write(&key.hex().into_bytes()[2..]) {
2016-02-15 18:36:34 +01:00
warn!("Error writing key file: {:?}", e);
}
}
fn load_key(path: &Path) -> Option<Secret> {
let mut path_buf = PathBuf::from(path);
path_buf.push("key");
let mut file = match fs::File::open(path_buf.as_path()) {
Ok(file) => file,
Err(e) => {
debug!("Error opening key file: {:?}", e);
return None;
}
};
let mut buf = String::new();
match file.read_to_string(&mut buf) {
Ok(_) => {},
Err(e) => {
2016-02-15 18:36:34 +01:00
warn!("Error reading key file: {:?}", e);
return None;
}
}
match Secret::from_str(&buf) {
Ok(key) => Some(key),
Err(e) => {
2016-02-15 18:36:34 +01:00
warn!("Error parsing key file: {:?}", e);
None
}
}
}
2016-02-16 21:25:01 +01:00
#[test]
fn key_save_load() {
use tempdir::TempDir;
let tempdir = TempDir::new("").unwrap();
let key = H256::random().into();
save_key(tempdir.path(), &key);
let r = load_key(tempdir.path());
2016-02-16 21:25:01 +01:00
assert_eq!(key, r.unwrap());
}
#[test]
fn host_client_url() {
let mut config = NetworkConfiguration::new_local();
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".parse().unwrap();
config.use_secret = Some(key);
2018-03-28 08:45:36 +02:00
let host: Host = Host::new(config, None).unwrap();
2016-03-20 11:35:46 +01:00
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
}