From 83f706186f198370ed2cc0a353b52a7d8ca34c83 Mon Sep 17 00:00:00 2001 From: Kirill Pimenov Date: Thu, 10 Jan 2019 19:43:16 +0000 Subject: [PATCH] Ping nodes from discovery (#10167) --- util/network-devp2p/src/discovery.rs | 93 ++++++++++++++++++++++------ 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 980531ba2..f6eaf494b 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -53,12 +53,15 @@ const REQUEST_BACKOFF: [Duration; 4] = [ Duration::from_secs(64) ]; +const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60); + #[derive(Clone, Debug)] pub struct NodeEntry { pub id: NodeId, pub endpoint: NodeEndpoint, } +#[derive(Debug)] pub struct BucketEntry { pub address: NodeEntry, pub id_hash: H256, @@ -89,6 +92,12 @@ struct FindNodeRequest { answered: bool, } +#[derive(Clone, Copy)] +enum PingReason { + Default, + FromDiscoveryRequest(NodeId) +} + struct PingRequest { // Time when the request was sent sent_at: Instant, @@ -99,8 +108,10 @@ struct PingRequest { // The hash Parity used to respond with (until rev 01f825b0e1f1c4c420197b51fc801cbe89284b29) #[deprecated()] deprecated_echo_hash: H256, + reason: PingReason } +#[derive(Debug)] pub struct NodeBucket { nodes: VecDeque, //sorted by last active } @@ -178,7 +189,7 @@ impl<'a> Discovery<'a> { if self.node_buckets[dist].nodes.iter().any(|n| n.id_hash == id_hash) { return; } - self.try_ping(e); + self.try_ping(e, PingReason::Default); } } @@ -221,7 +232,7 @@ impl<'a> Discovery<'a> { } else { None } }; if let Some(node) = ping { - self.try_ping(node); + self.try_ping(node, PingReason::Default); } Some(TableUpdates { added: added_map, removed: HashSet::new() }) } @@ -244,7 +255,7 @@ impl<'a> Discovery<'a> { fn update_new_nodes(&mut self) { while self.in_flight_pings.len() < MAX_NODES_PING { match self.adding_nodes.pop() { - Some(next) => self.try_ping(next), + Some(next) => self.try_ping(next, PingReason::Default), None => break, } } @@ -298,7 +309,7 @@ impl<'a> Discovery<'a> { None // a and b are equal, so log distance is -inf } - fn try_ping(&mut self, node: NodeEntry) { + fn try_ping(&mut self, node: NodeEntry, reason: PingReason) { if !self.is_allowed(&node) { trace!(target: "discovery", "Node {:?} not allowed", node); return; @@ -313,7 +324,7 @@ impl<'a> Discovery<'a> { } if self.in_flight_pings.len() < MAX_NODES_PING { - self.ping(&node) + self.ping(&node, reason) .unwrap_or_else(|e| { warn!(target: "discovery", "Error sending Ping packet: {:?}", e); }); @@ -322,7 +333,7 @@ impl<'a> Discovery<'a> { } } - fn ping(&mut self, node: &NodeEntry) -> Result<(), Error> { + fn ping(&mut self, node: &NodeEntry, reason: PingReason) -> Result<(), Error> { let mut rlp = RlpStream::new_list(4); rlp.append(&PROTOCOL_VERSION); self.public_endpoint.to_rlp_list(&mut rlp); @@ -336,6 +347,7 @@ impl<'a> Discovery<'a> { node: node.clone(), echo_hash: hash, deprecated_echo_hash: old_parity_hash, + reason: reason }); trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id); @@ -514,7 +526,7 @@ impl<'a> Discovery<'a> { if request.deprecated_echo_hash == echo_hash { trace!(target: "discovery", "Got Pong from an old parity-ethereum version."); } - Some(request.node.clone()) + Some((request.node.clone(), request.reason.clone())) } }; @@ -528,7 +540,10 @@ impl<'a> Discovery<'a> { }, }; - if let Some(node) = expected_node { + if let Some((node, ping_reason)) = expected_node { + if let PingReason::FromDiscoveryRequest(target) = ping_reason { + self.respond_with_discovery(target, &node)?; + } Ok(self.update_node(node)) } else { debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from); @@ -536,21 +551,59 @@ impl<'a> Discovery<'a> { } } - fn on_find_node(&mut self, rlp: &Rlp, _node: &NodeId, from: &SocketAddr) -> Result, Error> { + fn on_find_node(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result, Error> { trace!(target: "discovery", "Got FindNode from {:?}", &from); let target: NodeId = rlp.val_at(0)?; let timestamp: u64 = rlp.val_at(1)?; self.check_timestamp(timestamp)?; + + let node = NodeEntry { + id: node_id.clone(), + endpoint: NodeEndpoint { + address: *from, + udp_port: from.port() + } + }; + + if self.is_a_valid_known_node(&node) { + self.respond_with_discovery(target, &node)?; + } else { + // Make sure the request source is actually there and responds to pings before actually responding + self.try_ping(node, PingReason::FromDiscoveryRequest(target)); + } + Ok(None) + } + + fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool { + let id_hash = keccak(node.id); + let dist = match Discovery::distance(&self.id_hash, &id_hash) { + Some(dist) => dist, + None => { + debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node); + return false; + } + }; + + let bucket = &self.node_buckets[dist]; + if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) { + debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node); + (known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT) + } else { + false + } + } + + fn respond_with_discovery(&mut self, target: NodeId, node: &NodeEntry) -> Result<(), Error> { let nearest = self.nearest_node_entries(&target); if nearest.is_empty() { - return Ok(None); + return Ok(()); } let mut packets = Discovery::prepare_neighbours_packets(&nearest); for p in packets.drain(..) { - self.send_packet(PACKET_NEIGHBOURS, from, &p)?; + self.send_packet(PACKET_NEIGHBOURS, &node.endpoint.address, &p)?; } - trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &from); - Ok(None) + trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint); + Ok(()) } fn prepare_neighbours_packets(nearest: &[NodeEntry]) -> Vec { @@ -825,7 +878,7 @@ mod tests { } // After 4 discovery rounds, the first one should have learned about the rest. - for _round in 0 .. 4 { + for _round in 0 .. 5 { discovery_handlers[0].round(); let mut continue_loop = true; @@ -833,9 +886,9 @@ mod tests { continue_loop = false; // Process all queued messages. - for i in 0 .. 5 { - let src = discovery_handlers[i].public_endpoint.address.clone(); - while let Some(datagram) = discovery_handlers[i].dequeue_send() { + for i in 0 .. 20 { + let src = discovery_handlers[i%5].public_endpoint.address.clone(); + while let Some(datagram) = discovery_handlers[i%5].dequeue_send() { let dest = discovery_handlers.iter_mut() .find(|disc| datagram.address == disc.public_endpoint.address) .unwrap(); @@ -927,14 +980,14 @@ mod tests { let mut discovery = Discovery { request_backoff: &request_backoff, ..discovery }; for _ in 0..2 { - discovery.ping(&node_entries[101]).unwrap(); + discovery.ping(&node_entries[101], PingReason::Default).unwrap(); let num_nodes = total_bucket_nodes(&discovery.node_buckets); discovery.check_expired(Instant::now() + PING_TIMEOUT); let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert_eq!(removed, 0); } - discovery.ping(&node_entries[101]).unwrap(); + discovery.ping(&node_entries[101], PingReason::Default).unwrap(); let num_nodes = total_bucket_nodes(&discovery.node_buckets); discovery.check_expired(Instant::now() + PING_TIMEOUT); let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); @@ -1121,7 +1174,7 @@ mod tests { let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default()); let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default()); - discovery1.ping(&NodeEntry { id: discovery2.id, endpoint: ep2.clone() }).unwrap(); + discovery1.ping(&NodeEntry { id: discovery2.id, endpoint: ep2.clone() }, PingReason::Default).unwrap(); let ping_data = discovery1.dequeue_send().unwrap(); assert!(!discovery1.any_sends_queued()); let data = &ping_data.payload[(32 + 65)..];