openethereum/util/network/src/discovery.rs

712 lines
24 KiB
Rust
Raw Normal View History

// Copyright 2015-2017 Parity Technologies (UK) Ltd.
2016-02-05 13:40:41 +01:00
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::bytes::Bytes;
2016-02-12 09:52:32 +01:00
use std::net::SocketAddr;
use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::mem;
2016-03-11 10:57:58 +01:00
use std::default::Default;
use mio::*;
2016-10-30 09:56:34 +01:00
use mio::deprecated::{Handler, EventLoop};
use mio::udp::*;
use hash::keccak;
2016-02-13 22:57:39 +01:00
use time;
2017-09-04 16:36:49 +02:00
use bigint::hash::*;
2016-09-01 14:49:12 +02:00
use rlp::*;
use node_table::*;
use error::NetworkError;
use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover};
use IpFilter;
2016-02-12 09:52:32 +01:00
use PROTOCOL_VERSION;
2016-02-13 22:57:39 +01:00
2016-02-12 09:52:32 +01:00
const ADDRESS_BYTES_SIZE: u32 = 32; // Size of address type in bytes.
const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia].
const NODE_BINS: u32 = ADDRESS_BITS - 1; // Size of m_state (excludes root, which is us).
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
2016-02-13 22:57:39 +01:00
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
2016-02-12 09:52:32 +01:00
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
const MAX_DATAGRAM_SIZE: usize = 1280;
2016-02-12 09:52:32 +01:00
const PACKET_PING: u8 = 1;
const PACKET_PONG: u8 = 2;
const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4;
2016-02-13 22:57:39 +01:00
const PING_TIMEOUT_MS: u64 = 300;
2016-07-20 12:41:31 +02:00
const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
2016-02-13 22:57:39 +01:00
#[derive(Clone, Debug)]
pub struct NodeEntry {
pub id: NodeId,
pub endpoint: NodeEndpoint,
}
pub struct BucketEntry {
pub address: NodeEntry,
pub id_hash: H256,
2016-02-13 22:57:39 +01:00
pub timeout: Option<u64>,
}
2016-08-24 17:00:14 +02:00
pub struct NodeBucket {
2016-02-13 22:57:39 +01:00
nodes: VecDeque<BucketEntry>, //sorted by last active
}
2016-03-11 10:57:58 +01:00
impl Default for NodeBucket {
fn default() -> Self {
NodeBucket::new()
}
}
impl NodeBucket {
2016-03-11 10:57:58 +01:00
fn new() -> Self {
2016-01-04 13:49:32 +01:00
NodeBucket {
2016-02-13 22:57:39 +01:00
nodes: VecDeque::new()
2016-01-04 13:49:32 +01:00
}
}
}
2016-02-12 09:52:32 +01:00
struct Datagramm {
payload: Bytes,
address: SocketAddr,
}
pub struct Discovery {
2016-01-04 13:49:32 +01:00
id: NodeId,
id_hash: H256,
2016-02-13 22:57:39 +01:00
secret: Secret,
2016-02-16 02:05:36 +01:00
public_endpoint: NodeEndpoint,
2016-02-12 09:52:32 +01:00
udp_socket: UdpSocket,
token: StreamToken,
2016-01-04 13:49:32 +01:00
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
2016-02-22 23:05:27 +01:00
send_queue: VecDeque<Datagramm>,
check_timestamps: bool,
2016-07-20 12:41:31 +02:00
adding_nodes: Vec<NodeEntry>,
ip_filter: IpFilter,
2016-02-12 09:52:32 +01:00
}
pub struct TableUpdates {
2016-02-13 22:57:39 +01:00
pub added: HashMap<NodeId, NodeEntry>,
2016-02-12 09:52:32 +01:00
pub removed: HashSet<NodeId>,
}
impl Discovery {
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, ip_filter: IpFilter) -> Discovery {
2016-10-30 09:56:34 +01:00
let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket");
Discovery {
2016-02-13 22:57:39 +01:00
id: key.public().clone(),
id_hash: keccak(key.public()),
2016-02-13 22:57:39 +01:00
secret: key.secret().clone(),
2016-02-16 02:05:36 +01:00
public_endpoint: public,
2016-02-12 09:52:32 +01:00
token: token,
2016-01-04 13:49:32 +01:00
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
2016-02-13 22:57:39 +01:00
node_buckets: (0..NODE_BINS).map(|_| NodeBucket::new()).collect(),
2016-02-12 09:52:32 +01:00
udp_socket: socket,
send_queue: VecDeque::new(),
2016-02-22 23:05:27 +01:00
check_timestamps: true,
2016-07-20 12:41:31 +02:00
adding_nodes: Vec::new(),
ip_filter: ip_filter,
}
}
2016-02-15 20:28:27 +01:00
/// Add a new node to discovery table. Pings the node.
2016-07-20 12:41:31 +02:00
pub fn add_node(&mut self, e: NodeEntry) {
2016-10-31 12:54:50 +01:00
if self.is_allowed(&e) {
let endpoint = e.endpoint.clone();
self.update_node(e);
self.ping(&endpoint);
}
2016-07-20 12:41:31 +02:00
}
/// Add a list of nodes. Pings a few nodes each round
pub fn add_node_list(&mut self, nodes: Vec<NodeEntry>) {
self.adding_nodes = nodes;
self.update_new_nodes();
}
2016-02-15 20:28:27 +01:00
/// Add a list of known nodes to the table.
pub fn init_node_list(&mut self, mut nodes: Vec<NodeEntry>) {
2016-02-15 20:28:27 +01:00
for n in nodes.drain(..) {
2016-10-31 12:54:50 +01:00
if self.is_allowed(&n) {
self.update_node(n);
}
2016-02-15 20:28:27 +01:00
}
}
2016-02-13 22:57:39 +01:00
fn update_node(&mut self, e: NodeEntry) {
trace!(target: "discovery", "Inserting {:?}", &e);
let id_hash = keccak(e.id);
2016-02-13 22:57:39 +01:00
let ping = {
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id_hash) as usize];
2016-02-13 22:57:39 +01:00
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.timeout = None;
true
} else { false };
if !updated {
bucket.nodes.push_front(BucketEntry { address: e, timeout: None, id_hash: id_hash, });
2016-02-13 22:57:39 +01:00
}
if bucket.nodes.len() > BUCKET_SIZE {
//ping least active node
let mut last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0");
last.timeout = Some(time::precise_time_ns());
Some(last.address.endpoint.clone())
2016-02-13 22:57:39 +01:00
} else { None }
};
if let Some(endpoint) = ping {
self.ping(&endpoint);
}
}
fn clear_ping(&mut self, id: &NodeId) {
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &keccak(id)) as usize];
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
node.timeout = None;
}
}
2016-02-13 22:57:39 +01:00
fn start(&mut self) {
trace!(target: "discovery", "Starting discovery");
2016-01-04 13:49:32 +01:00
self.discovery_round = 0;
2016-02-13 22:57:39 +01:00
self.discovery_id.randomize(); //TODO: use cryptographic nonce
2016-01-04 13:49:32 +01:00
self.discovery_nodes.clear();
}
2016-07-20 12:41:31 +02:00
fn update_new_nodes(&mut self) {
let mut count = 0usize;
while !self.adding_nodes.is_empty() && count < MAX_NODES_PING {
let node = self.adding_nodes.pop().expect("pop is always Some if not empty; qed");
self.add_node(node);
count += 1;
}
}
2016-02-13 22:57:39 +01:00
fn discover(&mut self) {
2016-07-20 12:41:31 +02:00
self.update_new_nodes();
2016-02-13 22:57:39 +01:00
if self.discovery_round == DISCOVERY_MAX_STEPS {
2016-01-04 13:49:32 +01:00
return;
}
2016-02-13 22:57:39 +01:00
trace!(target: "discovery", "Starting round {:?}", self.discovery_round);
2016-01-04 13:49:32 +01:00
let mut tried_count = 0;
{
2016-02-13 22:57:39 +01:00
let nearest = Discovery::nearest_node_entries(&self.discovery_id, &self.node_buckets).into_iter();
let nearest = nearest.filter(|x| !self.discovery_nodes.contains(&x.id)).take(ALPHA).collect::<Vec<_>>();
2016-01-04 13:49:32 +01:00
for r in nearest {
let rlp = encode_list(&(&[self.discovery_id.clone()][..]));
2016-02-13 22:57:39 +01:00
self.send_packet(PACKET_FIND_NODE, &r.endpoint.udp_address(), &rlp);
self.discovery_nodes.insert(r.id.clone());
2016-01-04 13:49:32 +01:00
tried_count += 1;
2016-02-13 22:57:39 +01:00
trace!(target: "discovery", "Sent FindNode to {:?}", &r.endpoint);
2016-01-04 13:49:32 +01:00
}
}
2016-02-13 22:57:39 +01:00
if tried_count == 0 {
trace!(target: "discovery", "Completing discovery");
self.discovery_round = DISCOVERY_MAX_STEPS;
self.discovery_nodes.clear();
2016-01-04 13:49:32 +01:00
return;
}
self.discovery_round += 1;
}
fn distance(a: &H256, b: &H256) -> u32 {
let d = *a ^ *b;
2016-01-04 13:49:32 +01:00
let mut ret:u32 = 0;
for i in 0..32 {
let mut v: u8 = d[i];
while v != 0 {
v >>= 1;
ret += 1;
}
}
ret
}
2016-02-13 22:57:39 +01:00
fn ping(&mut self, node: &NodeEndpoint) {
let mut rlp = RlpStream::new_list(3);
rlp.append(&PROTOCOL_VERSION);
2016-02-16 02:05:36 +01:00
self.public_endpoint.to_rlp_list(&mut rlp);
2016-02-13 22:57:39 +01:00
node.to_rlp_list(&mut rlp);
trace!(target: "discovery", "Sent Ping to {:?}", &node);
self.send_packet(PACKET_PING, &node.udp_address(), &rlp.drain());
}
fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) {
let mut rlp = RlpStream::new();
rlp.append_raw(&[packet_id], 1);
let source = Rlp::new(payload);
rlp.begin_list(source.item_count() + 1);
for i in 0 .. source.item_count() {
rlp.append_raw(source.at(i).as_raw(), 1);
}
let timestamp = time::get_time().sec as u32 + 60;
rlp.append(&timestamp);
let bytes = rlp.drain();
let hash = keccak(bytes.as_ref());
let signature = match sign(&self.secret, &hash) {
2016-02-13 22:57:39 +01:00
Ok(s) => s,
Err(_) => {
warn!("Error signing UDP packet");
return;
}
};
let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65);
packet.extend(hash.iter());
packet.extend(signature.iter());
packet.extend(bytes.iter());
let signed_hash = keccak(&packet[32..]);
2016-02-13 22:57:39 +01:00
packet[0..32].clone_from_slice(&signed_hash);
self.send_to(packet, address.clone());
}
2016-01-04 13:49:32 +01:00
2016-03-11 11:16:49 +01:00
#[cfg_attr(feature="dev", allow(map_clone))]
2016-02-14 01:03:48 +01:00
fn nearest_node_entries(target: &NodeId, buckets: &[NodeBucket]) -> Vec<NodeEntry> {
2016-02-13 22:57:39 +01:00
let mut found: BTreeMap<u32, Vec<&NodeEntry>> = BTreeMap::new();
2016-01-04 13:49:32 +01:00
let mut count = 0;
let target_hash = keccak(target);
2016-01-04 13:49:32 +01:00
2016-02-13 22:57:39 +01:00
// Sort nodes by distance to target
for bucket in buckets {
for node in &bucket.nodes {
let distance = Discovery::distance(&target_hash, &node.id_hash);
2016-02-13 22:57:39 +01:00
found.entry(distance).or_insert_with(Vec::new).push(&node.address);
if count == BUCKET_SIZE {
// delete the most distant element
let remove = {
let (key, last) = found.iter_mut().next_back().expect("Last element is always Some when count > 0");
2016-02-13 22:57:39 +01:00
last.pop();
2016-08-24 17:00:14 +02:00
if last.is_empty() { Some(key.clone()) } else { None }
2016-02-13 22:57:39 +01:00
};
2016-08-24 17:00:14 +02:00
if let Some(remove) = remove {
found.remove(&remove);
2016-01-04 13:49:32 +01:00
}
}
2016-02-13 22:57:39 +01:00
else {
count += 1;
2016-01-04 13:49:32 +01:00
}
}
}
2016-02-13 22:57:39 +01:00
let mut ret:Vec<NodeEntry> = Vec::new();
2016-02-15 14:39:56 +01:00
for nodes in found.values() {
2016-02-13 22:57:39 +01:00
ret.extend(nodes.iter().map(|&n| n.clone()));
2016-01-04 13:49:32 +01:00
}
ret
}
2016-02-12 09:52:32 +01:00
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
while let Some(data) = self.send_queue.pop_front() {
2016-02-13 22:57:39 +01:00
match self.udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
Ok(Some(_)) => {
warn!("UDP sent incomplete datagramm");
},
Ok(None) => {
self.send_queue.push_front(data);
return;
}
Err(e) => {
2016-03-14 10:25:04 +01:00
debug!("UDP send error: {:?}, address: {:?}", e, &data.address);
2016-02-13 22:57:39 +01:00
return;
}
2016-02-12 09:52:32 +01:00
}
}
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
2016-02-12 09:52:32 +01:00
}
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
self.send_queue.push_back(Datagramm { payload: payload, address: address });
}
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone {
2016-02-12 09:52:32 +01:00
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
let writable = !self.send_queue.is_empty();
let res = match self.udp_socket.recv_from(&mut buf) {
2016-02-12 09:52:32 +01:00
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
debug!("Error processing UDP packet: {:?}", e);
None
}),
Ok(_) => None,
Err(e) => {
2016-03-14 10:25:04 +01:00
debug!("Error reading UPD socket: {:?}", e);
2016-02-12 09:52:32 +01:00
None
}
};
let new_writable = !self.send_queue.is_empty();
if writable != new_writable {
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
2016-02-12 09:52:32 +01:00
}
res
2016-02-12 09:52:32 +01:00
}
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
// validate packet
if packet.len() < 32 + 65 + 4 + 1 {
return Err(NetworkError::BadProtocol);
}
let hash_signed = keccak(&packet[32..]);
2016-02-12 09:52:32 +01:00
if hash_signed[..] != packet[0..32] {
return Err(NetworkError::BadProtocol);
}
let signed = &packet[(32 + 65)..];
let signature = H520::from_slice(&packet[32..(32 + 65)]);
let node_id = recover(&signature.into(), &keccak(signed))?;
2016-02-12 09:52:32 +01:00
let packet_id = signed[0];
let rlp = UntrustedRlp::new(&signed[1..]);
match packet_id {
PACKET_PING => self.on_ping(&rlp, &node_id, &from),
PACKET_PONG => self.on_pong(&rlp, &node_id, &from),
PACKET_FIND_NODE => self.on_find_node(&rlp, &node_id, &from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, &node_id, &from),
_ => {
2016-02-12 09:52:32 +01:00
debug!("Unknown UDP packet: {}", packet_id);
Ok(None)
}
}
}
2016-02-27 15:02:05 +01:00
fn check_timestamp(&self, timestamp: u64) -> Result<(), NetworkError> {
2016-02-22 23:05:27 +01:00
if self.check_timestamps && timestamp < time::get_time().sec as u64{
debug!(target: "discovery", "Expired packet");
return Err(NetworkError::Expired);
}
Ok(())
}
2016-10-31 12:54:50 +01:00
fn is_allowed(&self, entry: &NodeEntry) -> bool {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
2016-10-31 12:54:50 +01:00
}
2016-02-12 09:52:32 +01:00
fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
2016-02-13 22:57:39 +01:00
trace!(target: "discovery", "Got Ping from {:?}", &from);
let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?;
let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?;
let timestamp: u64 = rlp.val_at(3)?;
self.check_timestamp(timestamp)?;
2016-02-17 14:06:19 +01:00
let mut added_map = HashMap::new();
let entry = NodeEntry { id: node.clone(), endpoint: source.clone() };
if !entry.endpoint.is_valid() {
2016-02-17 14:06:19 +01:00
debug!(target: "discovery", "Got bad address: {:?}", entry);
2016-10-31 12:54:50 +01:00
} else if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
} else {
2016-02-17 14:06:19 +01:00
self.update_node(entry.clone());
added_map.insert(node.clone(), entry);
2016-02-13 22:57:39 +01:00
}
let hash = keccak(rlp.as_raw());
2016-02-13 22:57:39 +01:00
let mut response = RlpStream::new_list(2);
dest.to_rlp_list(&mut response);
response.append(&hash);
2016-02-17 14:06:19 +01:00
self.send_packet(PACKET_PONG, from, &response.drain());
2016-02-13 22:57:39 +01:00
Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() }))
2016-02-12 09:52:32 +01:00
}
fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
2016-02-13 22:57:39 +01:00
trace!(target: "discovery", "Got Pong from {:?}", &from);
// TODO: validate pong packet
let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?;
let timestamp: u64 = rlp.val_at(2)?;
self.check_timestamp(timestamp)?;
2016-02-13 22:57:39 +01:00
let mut entry = NodeEntry { id: node.clone(), endpoint: dest };
if !entry.endpoint.is_valid() {
debug!(target: "discovery", "Bad address: {:?}", entry);
entry.endpoint.address = from.clone();
}
self.clear_ping(node);
2016-02-13 22:57:39 +01:00
let mut added_map = HashMap::new();
added_map.insert(node.clone(), entry);
Ok(None)
2016-02-12 09:52:32 +01:00
}
2016-02-13 22:57:39 +01:00
fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
trace!(target: "discovery", "Got FindNode from {:?}", &from);
let target: NodeId = rlp.val_at(0)?;
let timestamp: u64 = rlp.val_at(1)?;
self.check_timestamp(timestamp)?;
2016-02-13 22:57:39 +01:00
let nearest = Discovery::nearest_node_entries(&target, &self.node_buckets);
if nearest.is_empty() {
return Ok(None);
}
2016-03-13 23:20:26 +01:00
let mut packets = Discovery::prepare_neighbours_packets(&nearest);
for p in packets.drain(..) {
self.send_packet(PACKET_NEIGHBOURS, from, &p);
2016-03-13 23:20:26 +01:00
}
trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &from);
Ok(None)
}
fn prepare_neighbours_packets(nearest: &[NodeEntry]) -> Vec<Bytes> {
let limit = (MAX_DATAGRAM_SIZE - 109) / 90;
2016-03-14 00:41:25 +01:00
let chunks = nearest.chunks(limit);
let packets = chunks.map(|c| {
let mut rlp = RlpStream::new_list(1);
rlp.begin_list(c.len());
for n in 0 .. c.len() {
rlp.begin_list(4);
c[n].endpoint.to_rlp(&mut rlp);
rlp.append(&c[n].id);
2016-02-13 22:57:39 +01:00
}
2016-03-14 00:41:25 +01:00
rlp.out()
});
packets.collect()
2016-02-12 09:52:32 +01:00
}
2016-02-13 22:57:39 +01:00
fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
// TODO: validate packet
let mut added = HashMap::new();
trace!(target: "discovery", "Got {} Neighbours from {:?}", rlp.at(0)?.item_count()?, &from);
for r in rlp.at(0)?.iter() {
let endpoint = NodeEndpoint::from_rlp(&r)?;
2016-02-13 22:57:39 +01:00
if !endpoint.is_valid() {
debug!(target: "discovery", "Bad address: {:?}", endpoint);
continue;
}
let node_id: NodeId = r.val_at(3)?;
2016-02-15 16:01:45 +01:00
if node_id == self.id {
continue;
}
2016-02-13 22:57:39 +01:00
let entry = NodeEntry { id: node_id.clone(), endpoint: endpoint };
2016-10-31 12:54:50 +01:00
if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
continue;
}
2016-02-13 22:57:39 +01:00
added.insert(node_id, entry.clone());
2016-02-14 01:03:48 +01:00
self.ping(&entry.endpoint);
2016-02-13 22:57:39 +01:00
self.update_node(entry);
}
Ok(Some(TableUpdates { added: added, removed: HashSet::new() }))
}
2016-02-16 21:25:01 +01:00
fn check_expired(&mut self, force: bool) -> HashSet<NodeId> {
2016-02-13 22:57:39 +01:00
let now = time::precise_time_ns();
let mut removed: HashSet<NodeId> = HashSet::new();
for bucket in &mut self.node_buckets {
bucket.nodes.retain(|node| {
if let Some(timeout) = node.timeout {
2016-02-16 21:25:01 +01:00
if !force && now - timeout < PING_TIMEOUT_MS * 1000_0000 {
2016-02-13 22:57:39 +01:00
true
}
else {
trace!(target: "discovery", "Removed expired node {:?}", &node.address);
removed.insert(node.address.id.clone());
false
}
} else { true }
});
}
removed
2016-02-12 09:52:32 +01:00
}
2016-02-13 22:57:39 +01:00
pub fn round(&mut self) -> Option<TableUpdates> {
2016-02-16 21:25:01 +01:00
let removed = self.check_expired(false);
2016-02-13 22:57:39 +01:00
self.discover();
if !removed.is_empty() {
Some(TableUpdates { added: HashMap::new(), removed: removed })
2016-02-13 22:57:39 +01:00
} else { None }
2016-02-12 09:52:32 +01:00
}
pub fn refresh(&mut self) {
2016-02-13 22:57:39 +01:00
self.start();
2016-02-12 09:52:32 +01:00
}
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
2016-10-30 09:56:34 +01:00
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
2016-02-12 09:52:32 +01:00
Ok(())
}
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
2016-06-03 11:36:30 +02:00
let registration = if !self.send_queue.is_empty() {
2016-10-30 09:56:34 +01:00
Ready::readable() | Ready::writable()
2016-06-03 11:36:30 +02:00
} else {
2016-10-30 09:56:34 +01:00
Ready::readable()
2016-06-03 11:36:30 +02:00
};
2016-02-12 09:52:32 +01:00
event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket");
Ok(())
}
}
2016-02-15 16:01:45 +01:00
#[cfg(test)]
mod tests {
use super::*;
2016-11-28 17:05:37 +01:00
use std::net::{SocketAddr};
use node_table::{Node, NodeId, NodeEndpoint};
2016-02-15 16:01:45 +01:00
use std::str::FromStr;
use rustc_hex::FromHex;
use ethkey::{Random, Generator};
2016-03-13 23:20:26 +01:00
#[test]
fn find_node() {
let mut nearest = Vec::new();
let node = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
for _ in 0..1000 {
nearest.push( NodeEntry { id: node.id.clone(), endpoint: node.endpoint.clone() });
}
let packets = Discovery::prepare_neighbours_packets(&nearest);
2016-03-14 00:41:25 +01:00
assert_eq!(packets.len(), 77);
for p in &packets[0..76] {
2016-03-13 23:20:26 +01:00
assert!(p.len() > 1280/2);
assert!(p.len() <= 1280);
}
2016-03-14 00:41:25 +01:00
assert!(packets.last().unwrap().len() > 0);
2016-03-13 23:20:26 +01:00
}
2016-02-15 16:01:45 +01:00
#[test]
fn discovery() {
let key1 = Random.generate().unwrap();
let key2 = Random.generate().unwrap();
2016-02-15 16:01:45 +01:00
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
2016-02-15 16:01:45 +01:00
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
2016-07-20 12:41:31 +02:00
discovery1.add_node(NodeEntry { id: node1.id.clone(), endpoint: node1.endpoint.clone() });
discovery1.add_node(NodeEntry { id: node2.id.clone(), endpoint: node2.endpoint.clone() });
2016-02-15 16:01:45 +01:00
2016-07-20 12:41:31 +02:00
discovery2.add_node(NodeEntry { id: key1.public().clone(), endpoint: ep1.clone() });
2016-02-15 16:01:45 +01:00
discovery2.refresh();
for _ in 0 .. 10 {
while !discovery1.send_queue.is_empty() {
let datagramm = discovery1.send_queue.pop_front().unwrap();
if datagramm.address == ep2.address {
discovery2.on_packet(&datagramm.payload, ep1.address.clone()).ok();
}
}
while !discovery2.send_queue.is_empty() {
let datagramm = discovery2.send_queue.pop_front().unwrap();
if datagramm.address == ep1.address {
discovery1.on_packet(&datagramm.payload, ep2.address.clone()).ok();
}
}
discovery2.round();
}
assert_eq!(Discovery::nearest_node_entries(&NodeId::new(), &discovery2.node_buckets).len(), 3)
}
2016-02-16 21:25:01 +01:00
#[test]
fn removes_expired() {
let key = Random.generate().unwrap();
2016-02-22 23:05:27 +01:00
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
2016-02-16 21:25:01 +01:00
for _ in 0..1200 {
2016-07-20 12:41:31 +02:00
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
2016-02-16 21:25:01 +01:00
}
2016-02-16 23:33:32 +01:00
assert!(Discovery::nearest_node_entries(&NodeId::new(), &discovery.node_buckets).len() <= 16);
2016-02-16 21:25:01 +01:00
let removed = discovery.check_expired(true).len();
assert!(removed > 0);
}
2016-02-22 23:05:27 +01:00
2016-08-24 17:00:14 +02:00
#[test]
fn find_nearest_saturated() {
use super::*;
let mut buckets: Vec<_> = (0..256).map(|_| NodeBucket::new()).collect();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
for _ in 0..(16 + 10) {
buckets[0].nodes.push_back(BucketEntry {
address: NodeEntry { id: NodeId::new(), endpoint: ep.clone() },
timeout: None,
id_hash: keccak(NodeId::new()),
2016-08-24 17:00:14 +02:00
});
}
let nearest = Discovery::nearest_node_entries(&NodeId::new(), &buckets);
assert_eq!(nearest.len(), 16)
}
2016-02-22 23:05:27 +01:00
#[test]
fn packets() {
let key = Random.generate().unwrap();
2016-02-22 23:05:27 +01:00
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
2016-02-22 23:05:27 +01:00
discovery.check_timestamps = false;
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();
let packet = "\
e9614ccfd9fc3e74360018522d30e1419a143407ffcce748de3e22116b7e8dc92ff74788c0b6663a\
aa3d67d641936511c8f8d6ad8698b820a7cf9e1be7155e9a241f556658c55428ec0563514365799a\
4be2be5a685a80971ddcfa80cb422cdd0101ec04cb847f000001820cfa8215a8d790000000000000\
000000000000000000018208ae820d058443b9a3550102\
".from_hex().unwrap();
assert!(discovery.on_packet(&packet, from.clone()).is_ok());
let packet = "\
577be4349c4dd26768081f58de4c6f375a7a22f3f7adda654d1428637412c3d7fe917cadc56d4e5e\
7ffae1dbe3efffb9849feb71b262de37977e7c7a44e677295680e9e38ab26bee2fcbae207fba3ff3\
d74069a50b902a82c9903ed37cc993c50001f83e82022bd79020010db83c4d001500000000abcdef\
12820cfa8215a8d79020010db885a308d313198a2e037073488208ae82823a8443b9a355c5010203\
040531b9019afde696e582a78fa8d95ea13ce3297d4afb8ba6433e4154caa5ac6431af1b80ba7602\
3fa4090c408f6b4bc3701562c031041d4702971d102c9ab7fa5eed4cd6bab8f7af956f7d565ee191\
7084a95398b6a21eac920fe3dd1345ec0a7ef39367ee69ddf092cbfe5b93e5e568ebc491983c09c7\
6d922dc3\
".from_hex().unwrap();
assert!(discovery.on_packet(&packet, from.clone()).is_ok());
let packet = "\
09b2428d83348d27cdf7064ad9024f526cebc19e4958f0fdad87c15eb598dd61d08423e0bf66b206\
9869e1724125f820d851c136684082774f870e614d95a2855d000f05d1648b2d5945470bc187c2d2\
216fbe870f43ed0909009882e176a46b0102f846d79020010db885a308d313198a2e037073488208\
ae82823aa0fbc914b16819237dcd8801d7e53f69e9719adecb3cc0e790c57e91ca4461c9548443b9\
a355c6010203c2040506a0c969a58f6f9095004c0177a6b47f451530cab38966a25cca5cb58f0555
42124e\
".from_hex().unwrap();
assert!(discovery.on_packet(&packet, from.clone()).is_ok());
let packet = "\
c7c44041b9f7c7e41934417ebac9a8e1a4c6298f74553f2fcfdcae6ed6fe53163eb3d2b52e39fe91\
831b8a927bf4fc222c3902202027e5e9eb812195f95d20061ef5cd31d502e47ecb61183f74a504fe\
04c51e73df81f25c4d506b26db4517490103f84eb840ca634cae0d49acb401d8a4c6b6fe8c55b70d\
115bf400769cc1400f3258cd31387574077f301b421bc84df7266c44e9e6d569fc56be0081290476\
7bf5ccd1fc7f8443b9a35582999983999999280dc62cc8255c73471e0a61da0c89acdc0e035e260a\
dd7fc0c04ad9ebf3919644c91cb247affc82b69bd2ca235c71eab8e49737c937a2c396\
".from_hex().unwrap();
assert!(discovery.on_packet(&packet, from.clone()).is_ok());
let packet = "\
c679fc8fe0b8b12f06577f2e802d34f6fa257e6137a995f6f4cbfc9ee50ed3710faf6e66f932c4c8\
d81d64343f429651328758b47d3dbc02c4042f0fff6946a50f4a49037a72bb550f3a7872363a83e1\
b9ee6469856c24eb4ef80b7535bcf99c0004f9015bf90150f84d846321163782115c82115db84031\
55e1427f85f10a5c9a7755877748041af1bcd8d474ec065eb33df57a97babf54bfd2103575fa8291\
15d224c523596b401065a97f74010610fce76382c0bf32f84984010203040101b840312c55512422\
cf9b8a4097e9a6ad79402e87a15ae909a4bfefa22398f03d20951933beea1e4dfa6f968212385e82\
9f04c2d314fc2d4e255e0d3bc08792b069dbf8599020010db83c4d001500000000abcdef12820d05\
820d05b84038643200b172dcfef857492156971f0e6aa2c538d8b74010f8e140811d53b98c765dd2\
d96126051913f44582e8c199ad7c6d6819e9a56483f637feaac9448aacf8599020010db885a308d3\
13198a2e037073488203e78203e8b8408dcab8618c3253b558d459da53bd8fa68935a719aff8b811\
197101a4b2b47dd2d47295286fc00cc081bb542d760717d1bdd6bec2c37cd72eca367d6dd3b9df73\
8443b9a355010203b525a138aa34383fec3d2719a0\
".from_hex().unwrap();
assert!(discovery.on_packet(&packet, from.clone()).is_ok());
}
2016-02-15 16:01:45 +01:00
}