Ping discovery nodes gradually (#1671)

This commit is contained in:
Arkadiy Paronyan 2016-07-20 12:41:31 +02:00 committed by Gav Wood
parent 8a8cfb133f
commit 7dd29825c4
2 changed files with 29 additions and 14 deletions

View File

@ -46,6 +46,7 @@ const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4;
const PING_TIMEOUT_MS: u64 = 300;
const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
#[derive(Clone, Debug)]
pub struct NodeEntry {
@ -93,6 +94,7 @@ pub struct Discovery {
node_buckets: Vec<NodeBucket>,
send_queue: VecDeque<Datagramm>,
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
}
pub struct TableUpdates {
@ -115,16 +117,21 @@ impl Discovery {
udp_socket: socket,
send_queue: VecDeque::new(),
check_timestamps: true,
adding_nodes: Vec::new(),
}
}
/// Add a new node to discovery table. Pings the node.
pub fn add_node(&mut self, e: NodeEntry, new: bool) {
pub fn add_node(&mut self, e: NodeEntry) {
let endpoint = e.endpoint.clone();
self.update_node(e);
if new {
self.ping(&endpoint);
}
self.ping(&endpoint);
}
/// Add a list of nodes. Pings a few nodes each round
pub fn add_node_list(&mut self, nodes: Vec<NodeEntry>) {
self.adding_nodes = nodes;
self.update_new_nodes();
}
/// Add a list of known nodes to the table.
@ -173,7 +180,17 @@ impl Discovery {
self.discovery_nodes.clear();
}
fn update_new_nodes(&mut self) {
let mut count = 0usize;
while !self.adding_nodes.is_empty() && count < MAX_NODES_PING {
let node = self.adding_nodes.pop().expect("pop is always Some if not empty; qed");
self.add_node(node);
count += 1;
}
}
fn discover(&mut self) {
self.update_new_nodes();
if self.discovery_round == DISCOVERY_MAX_STEPS {
return;
}
@ -551,10 +568,10 @@ mod tests {
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
discovery1.add_node(NodeEntry { id: node1.id.clone(), endpoint: node1.endpoint.clone() }, true);
discovery1.add_node(NodeEntry { id: node2.id.clone(), endpoint: node2.endpoint.clone() }, true);
discovery1.add_node(NodeEntry { id: node1.id.clone(), endpoint: node1.endpoint.clone() });
discovery1.add_node(NodeEntry { id: node2.id.clone(), endpoint: node2.endpoint.clone() });
discovery2.add_node(NodeEntry { id: key1.public().clone(), endpoint: ep1.clone() }, true);
discovery2.add_node(NodeEntry { id: key1.public().clone(), endpoint: ep1.clone() });
discovery2.refresh();
for _ in 0 .. 10 {
@ -581,7 +598,7 @@ mod tests {
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0);
for _ in 0..1200 {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }, true);
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
}
assert!(Discovery::nearest_node_entries(&NodeId::new(), &discovery.node_buckets).len() <= 16);
let removed = discovery.check_expired(true).len();

View File

@ -419,7 +419,7 @@ impl Host {
self.nodes.write().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock() {
discovery.add_node(entry, true);
discovery.add_node(entry);
}
}
}
@ -433,7 +433,7 @@ impl Host {
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
if let Some(ref mut discovery) = *self.discovery.lock() {
discovery.add_node(entry, false);
discovery.add_node(entry);
}
Ok(())
@ -549,9 +549,7 @@ impl Host {
if let Some(mut discovery) = discovery {
discovery.init_node_list(self.nodes.read().unordered_entries());
for n in self.nodes.read().unordered_entries() {
discovery.add_node(n.clone(), false);
}
discovery.add_node_list(self.nodes.read().unordered_entries());
*self.discovery.lock() = Some(discovery);
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
@ -788,7 +786,7 @@ impl Host {
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock();
if let Some(ref mut discovery) = *discovery.deref_mut() {
discovery.add_node(entry, true);
discovery.add_node(entry);
}
}
}