Don't add discovery initiators to the node table (#10305)

* Don't add discovery initiators to the node table

* Use enums for tracking state of the nodes in discovery

* Dont try to ping ourselves

* Fix minor nits

* Update timeouts when observing an outdated node

* Extracted update_bucket_record from update_node

* Fixed typo

* Fix two final nits from @todr
This commit is contained in:
Kirill Pimenov 2019-02-12 15:57:53 +00:00 committed by Afri Schoedon
parent 55454b2f2d
commit 5be0163cde
4 changed files with 107 additions and 34 deletions

1
Cargo.lock generated
View File

@ -971,6 +971,7 @@ dependencies = [
"keccak-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -34,6 +34,7 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
error-chain = { version = "0.12", default-features = false } error-chain = { version = "0.12", default-features = false }
lru-cache = "0.1"
[dev-dependencies] [dev-dependencies]
env_logger = "0.5" env_logger = "0.5"

View File

@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, VecDeque};
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::default::Default; use std::default::Default;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use lru_cache::LruCache;
use hash::keccak; use hash::keccak;
use ethereum_types::{H256, H520}; use ethereum_types::{H256, H520};
use rlp::{Rlp, RlpStream}; use rlp::{Rlp, RlpStream};
@ -55,6 +56,8 @@ const REQUEST_BACKOFF: [Duration; 4] = [
const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60); const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60);
const OBSERVED_NODES_MAX_SIZE: usize = 10_000;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct NodeEntry { pub struct NodeEntry {
pub id: NodeId, pub id: NodeId,
@ -95,7 +98,27 @@ struct FindNodeRequest {
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum PingReason { enum PingReason {
Default, Default,
FromDiscoveryRequest(NodeId) FromDiscoveryRequest(NodeId, NodeValidity),
}
#[derive(Clone, Copy, PartialEq)]
enum NodeCategory {
Bucket,
Observed
}
#[derive(Clone, Copy, PartialEq)]
enum NodeValidity {
Ourselves,
ValidNode(NodeCategory),
ExpiredNode(NodeCategory),
UnknownNode
}
#[derive(Debug)]
enum BucketError {
Ourselves,
NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize},
} }
struct PingRequest { struct PingRequest {
@ -145,6 +168,12 @@ pub struct Discovery<'a> {
discovery_id: NodeId, discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>, discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>, node_buckets: Vec<NodeBucket>,
// Sometimes we don't want to add nodes to the NodeTable, but still want to
// keep track of them to avoid excessive pinging (happens when an unknown node sends
// a discovery request to us -- the node might be on a different net).
other_observed_nodes: LruCache<NodeId, (NodeEndpoint, Instant)>,
in_flight_pings: HashMap<NodeId, PingRequest>, in_flight_pings: HashMap<NodeId, PingRequest>,
in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>, in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>,
send_queue: VecDeque<Datagram>, send_queue: VecDeque<Datagram>,
@ -171,6 +200,7 @@ impl<'a> Discovery<'a> {
discovery_id: NodeId::new(), discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(), discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
other_observed_nodes: LruCache::new(OBSERVED_NODES_MAX_SIZE),
in_flight_pings: HashMap::new(), in_flight_pings: HashMap::new(),
in_flight_find_nodes: HashMap::new(), in_flight_find_nodes: HashMap::new(),
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
@ -200,41 +230,53 @@ impl<'a> Discovery<'a> {
} }
} }
fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> { fn update_bucket_record(&mut self, e: NodeEntry) -> Result<(), BucketError> {
trace!(target: "discovery", "Inserting {:?}", &e);
let id_hash = keccak(e.id); let id_hash = keccak(e.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) { let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist, Some(dist) => dist,
None => { None => {
debug!(target: "discovery", "Attempted to update own entry: {:?}", e); debug!(target: "discovery", "Attempted to update own entry: {:?}", e);
return None; return Err(BucketError::Ourselves);
} }
}; };
let bucket = &mut self.node_buckets[dist];
bucket.nodes.iter_mut().find(|n| n.address.id == e.id)
.map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| {
entry.address = e;
entry.last_seen = Instant::now();
entry.backoff_until = Instant::now();
entry.fail_count = 0;
Ok(())
})
}
let mut added_map = HashMap::new(); fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
let ping = { trace!(target: "discovery", "Inserting {:?}", &e);
let bucket = &mut self.node_buckets[dist];
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.last_seen = Instant::now();
node.backoff_until = Instant::now();
node.fail_count = 0;
true
} else { false };
if !updated { match self.update_bucket_record(e) {
added_map.insert(e.id, e.clone()); Ok(()) => None,
bucket.nodes.push_front(BucketEntry::new(e)); Err(BucketError::Ourselves) => None,
Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance))
}.map(|(node_entry, bucket_distance)| {
trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance);
let mut added = HashMap::with_capacity(1);
added.insert(node_entry.id, node_entry.clone());
let node_to_ping = {
let bucket = &mut self.node_buckets[bucket_distance];
bucket.nodes.push_front(BucketEntry::new(node_entry));
if bucket.nodes.len() > BUCKET_SIZE { if bucket.nodes.len() > BUCKET_SIZE {
select_bucket_ping(bucket.nodes.iter()) select_bucket_ping(bucket.nodes.iter())
} else { None } } else {
} else { None } None
}; }
if let Some(node) = ping { };
self.try_ping(node, PingReason::Default); if let Some(node) = node_to_ping {
} self.try_ping(node, PingReason::Default);
Some(TableUpdates { added: added_map, removed: HashSet::new() }) };
TableUpdates{added, removed: HashSet::new()}
})
} }
/// Starts the discovery process at round 0 /// Starts the discovery process at round 0
@ -541,10 +583,28 @@ impl<'a> Discovery<'a> {
}; };
if let Some((node, ping_reason)) = expected_node { if let Some((node, ping_reason)) = expected_node {
if let PingReason::FromDiscoveryRequest(target) = ping_reason { if let PingReason::FromDiscoveryRequest(target, validity) = ping_reason {
self.respond_with_discovery(target, &node)?; self.respond_with_discovery(target, &node)?;
// kirushik: I would prefer to probe the network id of the remote node here, and add it to the nodes list if it's on "our" net --
// but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea.
// So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging
match validity {
NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => {
trace!(target: "discovery", "Updating node {:?} in our Kad buckets", &node);
self.update_bucket_record(node).unwrap_or_else(|error| {
debug!(target: "discovery", "Error occured when processing ping from a bucket node: {:?}", &error);
});
},
NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) | NodeValidity::ValidNode(NodeCategory::Observed)=> {
trace!(target: "discovery", "Updating node {:?} in the list of other_observed_nodes", &node);
self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now()));
},
NodeValidity::Ourselves => (),
}
Ok(None)
} else {
Ok(self.update_node(node))
} }
Ok(self.update_node(node))
} else { } else {
debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from); debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from);
Ok(None) Ok(None)
@ -565,31 +625,41 @@ impl<'a> Discovery<'a> {
} }
}; };
if self.is_a_valid_known_node(&node) { match self.check_validity(&node) {
self.respond_with_discovery(target, &node)?; NodeValidity::Ourselves => (), // It makes no sense to respond to the discovery request from ourselves
} else { NodeValidity::ValidNode(_) => self.respond_with_discovery(target, &node)?,
// Make sure the request source is actually there and responds to pings before actually responding // Make sure the request source is actually there and responds to pings before actually responding
self.try_ping(node, PingReason::FromDiscoveryRequest(target)); invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(target, invalidity_reason))
} }
Ok(None) Ok(None)
} }
fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool { fn check_validity(&mut self, node: &NodeEntry) -> NodeValidity {
let id_hash = keccak(node.id); let id_hash = keccak(node.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) { let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist, Some(dist) => dist,
None => { None => {
debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node); debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node);
return false; return NodeValidity::Ourselves;
} }
}; };
let bucket = &self.node_buckets[dist]; let bucket = &self.node_buckets[dist];
if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) { if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) {
debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node); debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node);
(known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT) match ((known_node.address.endpoint == node.endpoint), (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
(true, true) => NodeValidity::ValidNode(NodeCategory::Bucket),
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Bucket),
_ => NodeValidity::UnknownNode
}
} else { } else {
false self.other_observed_nodes.get_mut(&node.id).map_or(NodeValidity::UnknownNode, |(endpoint, observed_at)| {
match ((node.endpoint==*endpoint), (observed_at.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
(true, true) => NodeValidity::ValidNode(NodeCategory::Observed),
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Observed),
_ => NodeValidity::UnknownNode
}
})
} }
} }

View File

@ -84,6 +84,7 @@ extern crate keccak_hash as hash;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate parity_snappy as snappy; extern crate parity_snappy as snappy;
extern crate lru_cache;
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;