From 7dd29825c4b35a27448a1a651b9bbe1f8ea72a4a Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 20 Jul 2016 12:41:31 +0200 Subject: [PATCH] Ping discovery nodes gradually (#1671) --- util/src/network/discovery.rs | 33 +++++++++++++++++++++++++-------- util/src/network/host.rs | 10 ++++------ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/util/src/network/discovery.rs b/util/src/network/discovery.rs index b4a4eef92..5670c4cfa 100644 --- a/util/src/network/discovery.rs +++ b/util/src/network/discovery.rs @@ -46,6 +46,7 @@ const PACKET_FIND_NODE: u8 = 3; const PACKET_NEIGHBOURS: u8 = 4; const PING_TIMEOUT_MS: u64 = 300; +const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once #[derive(Clone, Debug)] pub struct NodeEntry { @@ -93,6 +94,7 @@ pub struct Discovery { node_buckets: Vec, send_queue: VecDeque, check_timestamps: bool, + adding_nodes: Vec, } pub struct TableUpdates { @@ -115,16 +117,21 @@ impl Discovery { udp_socket: socket, send_queue: VecDeque::new(), check_timestamps: true, + adding_nodes: Vec::new(), } } /// Add a new node to discovery table. Pings the node. - pub fn add_node(&mut self, e: NodeEntry, new: bool) { + pub fn add_node(&mut self, e: NodeEntry) { let endpoint = e.endpoint.clone(); self.update_node(e); - if new { - self.ping(&endpoint); - } + self.ping(&endpoint); + } + + /// Add a list of nodes. Pings a few nodes each round + pub fn add_node_list(&mut self, nodes: Vec) { + self.adding_nodes = nodes; + self.update_new_nodes(); } /// Add a list of known nodes to the table. @@ -173,7 +180,17 @@ impl Discovery { self.discovery_nodes.clear(); } + fn update_new_nodes(&mut self) { + let mut count = 0usize; + while !self.adding_nodes.is_empty() && count < MAX_NODES_PING { + let node = self.adding_nodes.pop().expect("pop is always Some if not empty; qed"); + self.add_node(node); + count += 1; + } + } + fn discover(&mut self) { + self.update_new_nodes(); if self.discovery_round == DISCOVERY_MAX_STEPS { return; } @@ -551,10 +568,10 @@ mod tests { let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap(); let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap(); - discovery1.add_node(NodeEntry { id: node1.id.clone(), endpoint: node1.endpoint.clone() }, true); - discovery1.add_node(NodeEntry { id: node2.id.clone(), endpoint: node2.endpoint.clone() }, true); + discovery1.add_node(NodeEntry { id: node1.id.clone(), endpoint: node1.endpoint.clone() }); + discovery1.add_node(NodeEntry { id: node2.id.clone(), endpoint: node2.endpoint.clone() }); - discovery2.add_node(NodeEntry { id: key1.public().clone(), endpoint: ep1.clone() }, true); + discovery2.add_node(NodeEntry { id: key1.public().clone(), endpoint: ep1.clone() }); discovery2.refresh(); for _ in 0 .. 10 { @@ -581,7 +598,7 @@ mod tests { let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 }; let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0); for _ in 0..1200 { - discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }, true); + discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); } assert!(Discovery::nearest_node_entries(&NodeId::new(), &discovery.node_buckets).len() <= 16); let removed = discovery.check_expired(true).len(); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 69ac37fa8..e986c6020 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -419,7 +419,7 @@ impl Host { self.nodes.write().add_node(n); if let Some(ref mut discovery) = *self.discovery.lock() { - discovery.add_node(entry, true); + discovery.add_node(entry); } } } @@ -433,7 +433,7 @@ impl Host { self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); if let Some(ref mut discovery) = *self.discovery.lock() { - discovery.add_node(entry, false); + discovery.add_node(entry); } Ok(()) @@ -549,9 +549,7 @@ impl Host { if let Some(mut discovery) = discovery { discovery.init_node_list(self.nodes.read().unordered_entries()); - for n in self.nodes.read().unordered_entries() { - discovery.add_node(n.clone(), false); - } + discovery.add_node_list(self.nodes.read().unordered_entries()); *self.discovery.lock() = Some(discovery); io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); @@ -788,7 +786,7 @@ impl Host { self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); let mut discovery = self.discovery.lock(); if let Some(ref mut discovery) = *discovery.deref_mut() { - discovery.add_node(entry, true); + discovery.add_node(entry); } } }