// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
use ethcore_bytes::Bytes;
use std::net::SocketAddr;
use std::collections::{HashSet, HashMap, VecDeque};
use std::default::Default;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use hash::keccak;
use ethereum_types::{H256, H520};
use rlp::{Rlp, RlpStream, encode_list};
use node_table::*;
use network::{Error, ErrorKind};
use ethkey::{Secret, KeyPair, sign, recover};
use network::IpFilter;
use PROTOCOL_VERSION;
const ADDRESS_BYTES_SIZE: usize = 32; // Size of address type in bytes.
const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia].
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
pub const MAX_DATAGRAM_SIZE: usize = 1280;
const PACKET_PING: u8 = 1;
const PACKET_PONG: u8 = 2;
const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4;
const PING_TIMEOUT: Duration = Duration::from_millis(300);
const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
#[derive(Clone, Debug)]
pub struct NodeEntry {
pub id: NodeId,
pub endpoint: NodeEndpoint,
}
pub struct BucketEntry {
pub address: NodeEntry,
pub id_hash: H256,
pub timeout: Option,
}
pub struct NodeBucket {
nodes: VecDeque, //sorted by last active
}
impl Default for NodeBucket {
fn default() -> Self {
NodeBucket::new()
}
}
impl NodeBucket {
fn new() -> Self {
NodeBucket {
nodes: VecDeque::new()
}
}
}
pub struct Datagram {
pub payload: Bytes,
pub address: SocketAddr,
}
pub struct Discovery {
id: NodeId,
id_hash: H256,
secret: Secret,
public_endpoint: NodeEndpoint,
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet,
node_buckets: Vec,
send_queue: VecDeque,
check_timestamps: bool,
adding_nodes: Vec,
ip_filter: IpFilter,
}
pub struct TableUpdates {
pub added: HashMap,
pub removed: HashSet,
}
impl Discovery {
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
Discovery {
id: key.public().clone(),
id_hash: keccak(key.public()),
secret: key.secret().clone(),
public_endpoint: public,
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
send_queue: VecDeque::new(),
check_timestamps: true,
adding_nodes: Vec::new(),
ip_filter: ip_filter,
}
}
/// Add a new node to discovery table. Pings the node.
pub fn add_node(&mut self, e: NodeEntry) {
if self.is_allowed(&e) {
let endpoint = e.endpoint.clone();
self.update_node(e);
self.ping(&endpoint);
}
}
/// Add a list of nodes. Pings a few nodes each round
pub fn add_node_list(&mut self, nodes: Vec) {
self.adding_nodes = nodes;
self.update_new_nodes();
}
/// Add a list of known nodes to the table.
pub fn init_node_list(&mut self, mut nodes: Vec) {
for n in nodes.drain(..) {
if self.is_allowed(&n) {
self.update_node(n);
}
}
}
fn update_node(&mut self, e: NodeEntry) {
trace!(target: "discovery", "Inserting {:?}", &e);
let id_hash = keccak(e.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist,
None => {
debug!(target: "discovery", "Attempted to update own entry: {:?}", e);
return;
}
};
let ping = {
let bucket = &mut self.node_buckets[dist];
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.timeout = None;
true
} else { false };
if !updated {
bucket.nodes.push_front(BucketEntry { address: e, timeout: None, id_hash: id_hash, });
}
if bucket.nodes.len() > BUCKET_SIZE {
//ping least active node
let last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0");
last.timeout = Some(Instant::now());
Some(last.address.endpoint.clone())
} else { None }
};
if let Some(endpoint) = ping {
self.ping(&endpoint);
}
}
/// Removes the timeout of a given NodeId if it can be found in one of the discovery buckets
fn clear_ping(&mut self, id: &NodeId) {
let dist = match Discovery::distance(&self.id_hash, &keccak(id)) {
Some(dist) => dist,
None => {
debug!(target: "discovery", "Received ping from self");
return
}
};
let bucket = &mut self.node_buckets[dist];
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
node.timeout = None;
}
}
/// Starts the discovery process at round 0
fn start(&mut self) {
trace!(target: "discovery", "Starting discovery");
self.discovery_round = 0;
self.discovery_id.randomize(); //TODO: use cryptographic nonce
self.discovery_nodes.clear();
}
fn update_new_nodes(&mut self) {
let mut count = 0usize;
while !self.adding_nodes.is_empty() && count < MAX_NODES_PING {
let node = self.adding_nodes.pop().expect("pop is always Some if not empty; qed");
self.add_node(node);
count += 1;
}
}
fn discover(&mut self) {
self.update_new_nodes();
if self.discovery_round == DISCOVERY_MAX_STEPS {
return;
}
trace!(target: "discovery", "Starting round {:?}", self.discovery_round);
let mut tried_count = 0;
{
let nearest = self.nearest_node_entries(&self.discovery_id).into_iter();
let nearest = nearest.filter(|x| !self.discovery_nodes.contains(&x.id)).take(ALPHA).collect::>();
for r in nearest {
let rlp = encode_list(&(&[self.discovery_id.clone()][..]));
self.send_packet(PACKET_FIND_NODE, &r.endpoint.udp_address(), &rlp)
.unwrap_or_else(|e| warn!("Error sending node discovery packet for {:?}: {:?}", &r.endpoint, e));
self.discovery_nodes.insert(r.id.clone());
tried_count += 1;
trace!(target: "discovery", "Sent FindNode to {:?}", &r.endpoint);
}
}
if tried_count == 0 {
trace!(target: "discovery", "Completing discovery");
self.discovery_round = DISCOVERY_MAX_STEPS;
self.discovery_nodes.clear();
return;
}
self.discovery_round += 1;
}
/// The base 2 log of the distance between a and b using the XOR metric.
fn distance(a: &H256, b: &H256) -> Option {
for i in (0..ADDRESS_BYTES_SIZE).rev() {
let byte_index = ADDRESS_BYTES_SIZE - i - 1;
let d: u8 = a[byte_index] ^ b[byte_index];
if d != 0 {
let high_bit_index = 7 - d.leading_zeros() as usize;
return Some(i * 8 + high_bit_index);
}
}
None // a and b are equal, so log distance is -inf
}
fn ping(&mut self, node: &NodeEndpoint) {
let mut rlp = RlpStream::new_list(3);
rlp.append(&PROTOCOL_VERSION);
self.public_endpoint.to_rlp_list(&mut rlp);
node.to_rlp_list(&mut rlp);
trace!(target: "discovery", "Sent Ping to {:?}", &node);
self.send_packet(PACKET_PING, &node.udp_address(), &rlp.drain())
.unwrap_or_else(|e| warn!("Error sending Ping packet: {:?}", e))
}
fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) -> Result<(), Error> {
let mut rlp = RlpStream::new();
rlp.append_raw(&[packet_id], 1);
let source = Rlp::new(payload);
rlp.begin_list(source.item_count()? + 1);
for i in 0 .. source.item_count()? {
rlp.append_raw(source.at(i)?.as_raw(), 1);
}
let timestamp = 60 + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() as u32;
rlp.append(×tamp);
let bytes = rlp.drain();
let hash = keccak(bytes.as_ref());
let signature = match sign(&self.secret, &hash) {
Ok(s) => s,
Err(e) => {
warn!("Error signing UDP packet");
return Err(Error::from(e));
}
};
let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65);
packet.extend(hash.iter());
packet.extend(signature.iter());
packet.extend(bytes.iter());
let signed_hash = keccak(&packet[32..]);
packet[0..32].clone_from_slice(&signed_hash);
self.send_to(packet, address.clone());
Ok(())
}
fn nearest_node_entries(&self, target: &NodeId) -> Vec {
let target_hash = keccak(target);
let target_distance = self.id_hash ^ target_hash;
let mut ret = Vec::::with_capacity(BUCKET_SIZE);
// Sort bucket entries by distance to target and append to end of result vector.
let append_bucket = |results: &mut Vec, bucket: &NodeBucket| -> bool {
let mut sorted_entries: Vec<&BucketEntry> = bucket.nodes.iter().collect();
sorted_entries.sort_unstable_by_key(|entry| entry.id_hash ^ target_hash);
let remaining_capacity = results.capacity() - results.len();
let to_append = if remaining_capacity < sorted_entries.len() {
&sorted_entries[0..remaining_capacity]
} else {
&sorted_entries
};
for entry in to_append.iter() {
results.push(entry.address.clone());
}
results.len() == results.capacity()
};
// This algorithm leverages the structure of the routing table to efficiently find the
// nearest entries to a target hash. First, we compute the XOR distance from this node to
// the target. On a first pass, we iterate from the MSB of the distance, stopping at any
// buckets where the distance bit is set, and skipping the buckets where it is unset. These
// must be in order the nearest to the target. On a second pass, we traverse from LSB to
// MSB, appending the buckets skipped on the first pass. The reason this works is that all
// entries in bucket i have a common prefix of length exactly 32 - i - 1 with the ID of this
// node.
for i in 0..ADDRESS_BITS {
if ((target_distance[i / 8] << (i % 8)) & 0x80) != 0 {
let bucket = &self.node_buckets[ADDRESS_BITS - i - 1];
if !bucket.nodes.is_empty() && append_bucket(&mut ret, bucket) {
return ret;
}
}
}
for i in (0..ADDRESS_BITS).rev() {
if ((target_distance[i / 8] << (i % 8)) & 0x80) == 0 {
let bucket = &self.node_buckets[ADDRESS_BITS - i - 1];
if !bucket.nodes.is_empty() && append_bucket(&mut ret, bucket) {
return ret;
}
}
}
ret
}
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
self.send_queue.push_back(Datagram { payload: payload, address: address });
}
pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result