Ping nodes from discovery (#10167)
This commit is contained in:
parent
eea5f6f232
commit
83f706186f
@ -53,12 +53,15 @@ const REQUEST_BACKOFF: [Duration; 4] = [
|
||||
Duration::from_secs(64)
|
||||
];
|
||||
|
||||
const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct NodeEntry {
|
||||
pub id: NodeId,
|
||||
pub endpoint: NodeEndpoint,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BucketEntry {
|
||||
pub address: NodeEntry,
|
||||
pub id_hash: H256,
|
||||
@ -89,6 +92,12 @@ struct FindNodeRequest {
|
||||
answered: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum PingReason {
|
||||
Default,
|
||||
FromDiscoveryRequest(NodeId)
|
||||
}
|
||||
|
||||
struct PingRequest {
|
||||
// Time when the request was sent
|
||||
sent_at: Instant,
|
||||
@ -99,8 +108,10 @@ struct PingRequest {
|
||||
// The hash Parity used to respond with (until rev 01f825b0e1f1c4c420197b51fc801cbe89284b29)
|
||||
#[deprecated()]
|
||||
deprecated_echo_hash: H256,
|
||||
reason: PingReason
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NodeBucket {
|
||||
nodes: VecDeque<BucketEntry>, //sorted by last active
|
||||
}
|
||||
@ -178,7 +189,7 @@ impl<'a> Discovery<'a> {
|
||||
if self.node_buckets[dist].nodes.iter().any(|n| n.id_hash == id_hash) {
|
||||
return;
|
||||
}
|
||||
self.try_ping(e);
|
||||
self.try_ping(e, PingReason::Default);
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,7 +232,7 @@ impl<'a> Discovery<'a> {
|
||||
} else { None }
|
||||
};
|
||||
if let Some(node) = ping {
|
||||
self.try_ping(node);
|
||||
self.try_ping(node, PingReason::Default);
|
||||
}
|
||||
Some(TableUpdates { added: added_map, removed: HashSet::new() })
|
||||
}
|
||||
@ -244,7 +255,7 @@ impl<'a> Discovery<'a> {
|
||||
fn update_new_nodes(&mut self) {
|
||||
while self.in_flight_pings.len() < MAX_NODES_PING {
|
||||
match self.adding_nodes.pop() {
|
||||
Some(next) => self.try_ping(next),
|
||||
Some(next) => self.try_ping(next, PingReason::Default),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
@ -298,7 +309,7 @@ impl<'a> Discovery<'a> {
|
||||
None // a and b are equal, so log distance is -inf
|
||||
}
|
||||
|
||||
fn try_ping(&mut self, node: NodeEntry) {
|
||||
fn try_ping(&mut self, node: NodeEntry, reason: PingReason) {
|
||||
if !self.is_allowed(&node) {
|
||||
trace!(target: "discovery", "Node {:?} not allowed", node);
|
||||
return;
|
||||
@ -313,7 +324,7 @@ impl<'a> Discovery<'a> {
|
||||
}
|
||||
|
||||
if self.in_flight_pings.len() < MAX_NODES_PING {
|
||||
self.ping(&node)
|
||||
self.ping(&node, reason)
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(target: "discovery", "Error sending Ping packet: {:?}", e);
|
||||
});
|
||||
@ -322,7 +333,7 @@ impl<'a> Discovery<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn ping(&mut self, node: &NodeEntry) -> Result<(), Error> {
|
||||
fn ping(&mut self, node: &NodeEntry, reason: PingReason) -> Result<(), Error> {
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(&PROTOCOL_VERSION);
|
||||
self.public_endpoint.to_rlp_list(&mut rlp);
|
||||
@ -336,6 +347,7 @@ impl<'a> Discovery<'a> {
|
||||
node: node.clone(),
|
||||
echo_hash: hash,
|
||||
deprecated_echo_hash: old_parity_hash,
|
||||
reason: reason
|
||||
});
|
||||
|
||||
trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id);
|
||||
@ -514,7 +526,7 @@ impl<'a> Discovery<'a> {
|
||||
if request.deprecated_echo_hash == echo_hash {
|
||||
trace!(target: "discovery", "Got Pong from an old parity-ethereum version.");
|
||||
}
|
||||
Some(request.node.clone())
|
||||
Some((request.node.clone(), request.reason.clone()))
|
||||
}
|
||||
};
|
||||
|
||||
@ -528,7 +540,10 @@ impl<'a> Discovery<'a> {
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(node) = expected_node {
|
||||
if let Some((node, ping_reason)) = expected_node {
|
||||
if let PingReason::FromDiscoveryRequest(target) = ping_reason {
|
||||
self.respond_with_discovery(target, &node)?;
|
||||
}
|
||||
Ok(self.update_node(node))
|
||||
} else {
|
||||
debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from);
|
||||
@ -536,21 +551,59 @@ impl<'a> Discovery<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_find_node(&mut self, rlp: &Rlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
|
||||
fn on_find_node(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
|
||||
trace!(target: "discovery", "Got FindNode from {:?}", &from);
|
||||
let target: NodeId = rlp.val_at(0)?;
|
||||
let timestamp: u64 = rlp.val_at(1)?;
|
||||
self.check_timestamp(timestamp)?;
|
||||
|
||||
let node = NodeEntry {
|
||||
id: node_id.clone(),
|
||||
endpoint: NodeEndpoint {
|
||||
address: *from,
|
||||
udp_port: from.port()
|
||||
}
|
||||
};
|
||||
|
||||
if self.is_a_valid_known_node(&node) {
|
||||
self.respond_with_discovery(target, &node)?;
|
||||
} else {
|
||||
// Make sure the request source is actually there and responds to pings before actually responding
|
||||
self.try_ping(node, PingReason::FromDiscoveryRequest(target));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool {
|
||||
let id_hash = keccak(node.id);
|
||||
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
|
||||
Some(dist) => dist,
|
||||
None => {
|
||||
debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let bucket = &self.node_buckets[dist];
|
||||
if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) {
|
||||
debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node);
|
||||
(known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn respond_with_discovery(&mut self, target: NodeId, node: &NodeEntry) -> Result<(), Error> {
|
||||
let nearest = self.nearest_node_entries(&target);
|
||||
if nearest.is_empty() {
|
||||
return Ok(None);
|
||||
return Ok(());
|
||||
}
|
||||
let mut packets = Discovery::prepare_neighbours_packets(&nearest);
|
||||
for p in packets.drain(..) {
|
||||
self.send_packet(PACKET_NEIGHBOURS, from, &p)?;
|
||||
self.send_packet(PACKET_NEIGHBOURS, &node.endpoint.address, &p)?;
|
||||
}
|
||||
trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &from);
|
||||
Ok(None)
|
||||
trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_neighbours_packets(nearest: &[NodeEntry]) -> Vec<Bytes> {
|
||||
@ -825,7 +878,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// After 4 discovery rounds, the first one should have learned about the rest.
|
||||
for _round in 0 .. 4 {
|
||||
for _round in 0 .. 5 {
|
||||
discovery_handlers[0].round();
|
||||
|
||||
let mut continue_loop = true;
|
||||
@ -833,9 +886,9 @@ mod tests {
|
||||
continue_loop = false;
|
||||
|
||||
// Process all queued messages.
|
||||
for i in 0 .. 5 {
|
||||
let src = discovery_handlers[i].public_endpoint.address.clone();
|
||||
while let Some(datagram) = discovery_handlers[i].dequeue_send() {
|
||||
for i in 0 .. 20 {
|
||||
let src = discovery_handlers[i%5].public_endpoint.address.clone();
|
||||
while let Some(datagram) = discovery_handlers[i%5].dequeue_send() {
|
||||
let dest = discovery_handlers.iter_mut()
|
||||
.find(|disc| datagram.address == disc.public_endpoint.address)
|
||||
.unwrap();
|
||||
@ -927,14 +980,14 @@ mod tests {
|
||||
let mut discovery = Discovery { request_backoff: &request_backoff, ..discovery };
|
||||
|
||||
for _ in 0..2 {
|
||||
discovery.ping(&node_entries[101]).unwrap();
|
||||
discovery.ping(&node_entries[101], PingReason::Default).unwrap();
|
||||
let num_nodes = total_bucket_nodes(&discovery.node_buckets);
|
||||
discovery.check_expired(Instant::now() + PING_TIMEOUT);
|
||||
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
|
||||
assert_eq!(removed, 0);
|
||||
}
|
||||
|
||||
discovery.ping(&node_entries[101]).unwrap();
|
||||
discovery.ping(&node_entries[101], PingReason::Default).unwrap();
|
||||
let num_nodes = total_bucket_nodes(&discovery.node_buckets);
|
||||
discovery.check_expired(Instant::now() + PING_TIMEOUT);
|
||||
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
|
||||
@ -1121,7 +1174,7 @@ mod tests {
|
||||
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
|
||||
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());
|
||||
|
||||
discovery1.ping(&NodeEntry { id: discovery2.id, endpoint: ep2.clone() }).unwrap();
|
||||
discovery1.ping(&NodeEntry { id: discovery2.id, endpoint: ep2.clone() }, PingReason::Default).unwrap();
|
||||
let ping_data = discovery1.dequeue_send().unwrap();
|
||||
assert!(!discovery1.any_sends_queued());
|
||||
let data = &ping_data.payload[(32 + 65)..];
|
||||
|
Loading…
Reference in New Issue
Block a user