Improve P2P discovery (#9526)
* Add `target` to Rust traces * network-devp2p: Don't remove discovery peer in main sync * network-p2p: Refresh discovery more often * Update Peer discovery protocol * Run discovery more often when not enough nodes connected * Start the first discovery early * Update fast discovery rate * Fix tests * Fix `ping` tests * Fixing remote Node address ; adding PingPong round * Fix tests: update new +1 PingPong round * Increase slow Discovery rate Check in flight FindNode before pings * Add `deprecated` to deprecated_echo_hash * Refactor `discovery_round` branching
This commit is contained in:
		
							parent
							
								
									57d2c8c94a
								
							
						
					
					
						commit
						753fd4bda3
					
				| @ -200,7 +200,7 @@ impl SyncPropagator { | ||||
| 								let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); | ||||
| 								if !appended { | ||||
| 									// Maximal packet size reached just proceed with sending
 | ||||
| 									debug!("Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); | ||||
| 									debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); | ||||
| 									to_send = to_send.into_iter().take(pushed).collect(); | ||||
| 									break; | ||||
| 								} | ||||
|  | ||||
| @ -42,9 +42,9 @@ const PACKET_PONG: u8 = 2; | ||||
| const PACKET_FIND_NODE: u8 = 3; | ||||
| const PACKET_NEIGHBOURS: u8 = 4; | ||||
| 
 | ||||
| const PING_TIMEOUT: Duration = Duration::from_millis(300); | ||||
| const PING_TIMEOUT: Duration = Duration::from_millis(500); | ||||
| const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2); | ||||
| const EXPIRY_TIME: Duration = Duration::from_secs(60); | ||||
| const EXPIRY_TIME: Duration = Duration::from_secs(20); | ||||
| const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
 | ||||
| const REQUEST_BACKOFF: [Duration; 4] = [ | ||||
| 	Duration::from_secs(1), | ||||
| @ -80,15 +80,29 @@ impl BucketEntry { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| pub struct NodeBucket { | ||||
| 	nodes: VecDeque<BucketEntry>, //sorted by last active
 | ||||
| struct FindNodeRequest { | ||||
| 	// Time when the request was sent
 | ||||
| 	sent_at: Instant, | ||||
| 	// Number of items sent by the node
 | ||||
| 	response_count: usize, | ||||
| 	// Whether the request have been answered yet
 | ||||
| 	answered: bool, | ||||
| } | ||||
| 
 | ||||
| struct PendingRequest { | ||||
| 	packet_id: u8, | ||||
| struct PingRequest { | ||||
| 	// Time when the request was sent
 | ||||
| 	sent_at: Instant, | ||||
| 	packet_hash: H256, | ||||
| 	response_count: usize, // Some requests (eg. FIND_NODE) have multi-packet responses
 | ||||
| 	// The node to which the request was sent
 | ||||
| 	node: NodeEntry, | ||||
| 	// The hash sent in the Ping request
 | ||||
| 	echo_hash: H256, | ||||
| 	// The hash Parity used to respond with (until rev 01f825b0e1f1c4c420197b51fc801cbe89284b29)
 | ||||
| 	#[deprecated()] | ||||
| 	deprecated_echo_hash: H256, | ||||
| } | ||||
| 
 | ||||
| pub struct NodeBucket { | ||||
| 	nodes: VecDeque<BucketEntry>, //sorted by last active
 | ||||
| } | ||||
| 
 | ||||
| impl Default for NodeBucket { | ||||
| @ -115,13 +129,13 @@ pub struct Discovery<'a> { | ||||
| 	id_hash: H256, | ||||
| 	secret: Secret, | ||||
| 	public_endpoint: NodeEndpoint, | ||||
| 	discovery_round: u16, | ||||
| 	discovery_initiated: bool, | ||||
| 	discovery_round: Option<u16>, | ||||
| 	discovery_id: NodeId, | ||||
| 	discovery_nodes: HashSet<NodeId>, | ||||
| 	node_buckets: Vec<NodeBucket>, | ||||
| 	in_flight_requests: HashMap<NodeId, PendingRequest>, | ||||
| 	expiring_pings: VecDeque<(NodeId, Instant)>, | ||||
| 	expiring_finds: VecDeque<(NodeId, Instant)>, | ||||
| 	in_flight_pings: HashMap<NodeId, PingRequest>, | ||||
| 	in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>, | ||||
| 	send_queue: VecDeque<Datagram>, | ||||
| 	check_timestamps: bool, | ||||
| 	adding_nodes: Vec<NodeEntry>, | ||||
| @ -141,13 +155,13 @@ impl<'a> Discovery<'a> { | ||||
| 			id_hash: keccak(key.public()), | ||||
| 			secret: key.secret().clone(), | ||||
| 			public_endpoint: public, | ||||
| 			discovery_round: 0, | ||||
| 			discovery_initiated: false, | ||||
| 			discovery_round: None, | ||||
| 			discovery_id: NodeId::new(), | ||||
| 			discovery_nodes: HashSet::new(), | ||||
| 			node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), | ||||
| 			in_flight_requests: HashMap::new(), | ||||
| 			expiring_pings: VecDeque::new(), | ||||
| 			expiring_finds: VecDeque::new(), | ||||
| 			in_flight_pings: HashMap::new(), | ||||
| 			in_flight_find_nodes: HashMap::new(), | ||||
| 			send_queue: VecDeque::new(), | ||||
| 			check_timestamps: true, | ||||
| 			adding_nodes: Vec::new(), | ||||
| @ -175,15 +189,6 @@ impl<'a> Discovery<'a> { | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/// Add a list of known nodes to the table.
 | ||||
| 	pub fn init_node_list(&mut self, nodes: Vec<NodeEntry>) { | ||||
| 		for n in nodes { | ||||
| 			if self.is_allowed(&n) { | ||||
| 				self.update_node(n); | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> { | ||||
| 		trace!(target: "discovery", "Inserting {:?}", &e); | ||||
| 		let id_hash = keccak(e.id); | ||||
| @ -224,13 +229,20 @@ impl<'a> Discovery<'a> { | ||||
| 	/// Starts the discovery process at round 0
 | ||||
| 	fn start(&mut self) { | ||||
| 		trace!(target: "discovery", "Starting discovery"); | ||||
| 		self.discovery_round = 0; | ||||
| 		self.discovery_round = Some(0); | ||||
| 		self.discovery_id.randomize(); //TODO: use cryptographic nonce
 | ||||
| 		self.discovery_nodes.clear(); | ||||
| 	} | ||||
| 
 | ||||
| 	/// Complete the discovery process
 | ||||
| 	fn stop(&mut self) { | ||||
| 		trace!(target: "discovery", "Completing discovery"); | ||||
| 		self.discovery_round = None; | ||||
| 		self.discovery_nodes.clear(); | ||||
| 	} | ||||
| 
 | ||||
| 	fn update_new_nodes(&mut self) { | ||||
| 		while self.in_flight_requests.len() < MAX_NODES_PING { | ||||
| 		while self.in_flight_pings.len() < MAX_NODES_PING { | ||||
| 			match self.adding_nodes.pop() { | ||||
| 				Some(next) => self.try_ping(next), | ||||
| 				None => break, | ||||
| @ -239,8 +251,12 @@ impl<'a> Discovery<'a> { | ||||
| 	} | ||||
| 
 | ||||
| 	fn discover(&mut self) { | ||||
| 		self.update_new_nodes(); | ||||
| 		if self.discovery_round == DISCOVERY_MAX_STEPS { | ||||
| 		let discovery_round = match self.discovery_round { | ||||
| 			Some(r) => r, | ||||
| 			None => return, | ||||
| 		}; | ||||
| 		if discovery_round == DISCOVERY_MAX_STEPS { | ||||
| 			self.stop(); | ||||
| 			return; | ||||
| 		} | ||||
| 		trace!(target: "discovery", "Starting round {:?}", self.discovery_round); | ||||
| @ -263,12 +279,10 @@ impl<'a> Discovery<'a> { | ||||
| 		} | ||||
| 
 | ||||
| 		if tried_count == 0 { | ||||
| 			trace!(target: "discovery", "Completing discovery"); | ||||
| 			self.discovery_round = DISCOVERY_MAX_STEPS; | ||||
| 			self.discovery_nodes.clear(); | ||||
| 			self.stop(); | ||||
| 			return; | ||||
| 		} | ||||
| 		self.discovery_round += 1; | ||||
| 		self.discovery_round = Some(discovery_round + 1); | ||||
| 	} | ||||
| 
 | ||||
| 	/// The base 2 log of the distance between a and b using the XOR metric.
 | ||||
| @ -285,14 +299,20 @@ impl<'a> Discovery<'a> { | ||||
| 	} | ||||
| 
 | ||||
| 	fn try_ping(&mut self, node: NodeEntry) { | ||||
| 		if !self.is_allowed(&node) || | ||||
| 			self.in_flight_requests.contains_key(&node.id) || | ||||
| 			self.adding_nodes.iter().any(|n| n.id == node.id) | ||||
| 		{ | ||||
| 		if !self.is_allowed(&node) { | ||||
| 			trace!(target: "discovery", "Node {:?} not allowed", node); | ||||
| 			return; | ||||
| 		} | ||||
| 		if self.in_flight_pings.contains_key(&node.id) || self.in_flight_find_nodes.contains_key(&node.id) { | ||||
| 			trace!(target: "discovery", "Node {:?} in flight requests", node); | ||||
| 			return; | ||||
| 		} | ||||
| 		if self.adding_nodes.iter().any(|n| n.id == node.id) { | ||||
| 			trace!(target: "discovery", "Node {:?} in adding nodes", node); | ||||
| 			return; | ||||
| 		} | ||||
| 
 | ||||
| 		if self.in_flight_requests.len() < MAX_NODES_PING { | ||||
| 		if self.in_flight_pings.len() < MAX_NODES_PING { | ||||
| 			self.ping(&node) | ||||
| 				.unwrap_or_else(|e| { | ||||
| 					warn!(target: "discovery", "Error sending Ping packet: {:?}", e); | ||||
| @ -308,18 +328,17 @@ impl<'a> Discovery<'a> { | ||||
| 		self.public_endpoint.to_rlp_list(&mut rlp); | ||||
| 		node.endpoint.to_rlp_list(&mut rlp); | ||||
| 		append_expiration(&mut rlp); | ||||
| 		let old_parity_hash = keccak(rlp.as_raw()); | ||||
| 		let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?; | ||||
| 
 | ||||
| 		let request_info = PendingRequest { | ||||
| 			packet_id: PACKET_PING, | ||||
| 		self.in_flight_pings.insert(node.id, PingRequest { | ||||
| 			sent_at: Instant::now(), | ||||
| 			packet_hash: hash, | ||||
| 			response_count: 0, | ||||
| 		}; | ||||
| 		self.expiring_pings.push_back((node.id, request_info.sent_at)); | ||||
| 		self.in_flight_requests.insert(node.id, request_info); | ||||
| 			node: node.clone(), | ||||
| 			echo_hash: hash, | ||||
| 			deprecated_echo_hash: old_parity_hash, | ||||
| 		}); | ||||
| 
 | ||||
| 		trace!(target: "discovery", "Sent Ping to {:?}", &node.endpoint); | ||||
| 		trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id); | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| @ -327,16 +346,13 @@ impl<'a> Discovery<'a> { | ||||
| 		let mut rlp = RlpStream::new_list(2); | ||||
| 		rlp.append(target); | ||||
| 		append_expiration(&mut rlp); | ||||
| 		let hash = self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?; | ||||
| 		self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?; | ||||
| 
 | ||||
| 		let request_info = PendingRequest { | ||||
| 			packet_id: PACKET_FIND_NODE, | ||||
| 		self.in_flight_find_nodes.insert(node.id, FindNodeRequest { | ||||
| 			sent_at: Instant::now(), | ||||
| 			packet_hash: hash, | ||||
| 			response_count: 0, | ||||
| 		}; | ||||
| 		self.expiring_finds.push_back((node.id, request_info.sent_at)); | ||||
| 		self.in_flight_requests.insert(node.id, request_info); | ||||
| 			answered: false, | ||||
| 		}); | ||||
| 
 | ||||
| 		trace!(target: "discovery", "Sent FindNode to {:?}", &node.endpoint); | ||||
| 		Ok(()) | ||||
| @ -448,20 +464,31 @@ impl<'a> Discovery<'a> { | ||||
| 		entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id | ||||
| 	} | ||||
| 
 | ||||
| 	fn on_ping(&mut self, rlp: &Rlp, node: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> { | ||||
| 	fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> { | ||||
| 		trace!(target: "discovery", "Got Ping from {:?}", &from); | ||||
| 		let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?; | ||||
| 		let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?; | ||||
| 		let ping_from = NodeEndpoint::from_rlp(&rlp.at(1)?)?; | ||||
| 		let ping_to = NodeEndpoint::from_rlp(&rlp.at(2)?)?; | ||||
| 		let timestamp: u64 = rlp.val_at(3)?; | ||||
| 		self.check_timestamp(timestamp)?; | ||||
| 
 | ||||
| 		let mut response = RlpStream::new_list(3); | ||||
| 		dest.to_rlp_list(&mut response); | ||||
| 		let pong_to = NodeEndpoint { | ||||
| 			address: from.clone(), | ||||
| 			udp_port: ping_from.udp_port | ||||
| 		}; | ||||
| 		// Here the PONG's `To` field should be the node we are
 | ||||
| 		// sending the request to
 | ||||
| 		// WARNING: this field _should not be used_, but old Parity versions
 | ||||
| 		// use it in order to get the node's address.
 | ||||
| 		// So this is a temporary fix so that older Parity versions don't brake completely.
 | ||||
| 		ping_to.to_rlp_list(&mut response); | ||||
| 		// pong_to.to_rlp_list(&mut response);
 | ||||
| 
 | ||||
| 		response.append(&echo_hash); | ||||
| 		append_expiration(&mut response); | ||||
| 		self.send_packet(PACKET_PONG, from, &response.drain())?; | ||||
| 
 | ||||
| 		let entry = NodeEntry { id: *node, endpoint: source.clone() }; | ||||
| 		let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() }; | ||||
| 		if !entry.endpoint.is_valid() { | ||||
| 			debug!(target: "discovery", "Got bad address: {:?}", entry); | ||||
| 		} else if !self.is_allowed(&entry) { | ||||
| @ -469,40 +496,45 @@ impl<'a> Discovery<'a> { | ||||
| 		} else { | ||||
| 			self.add_node(entry.clone()); | ||||
| 		} | ||||
| 
 | ||||
| 		Ok(None) | ||||
| 	} | ||||
| 
 | ||||
| 	fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> { | ||||
| 		trace!(target: "discovery", "Got Pong from {:?}", &from); | ||||
| 		let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?; | ||||
| 		trace!(target: "discovery", "Got Pong from {:?} ; node_id={:#x}", &from, node_id); | ||||
| 		let _pong_to = NodeEndpoint::from_rlp(&rlp.at(0)?)?; | ||||
| 		let echo_hash: H256 = rlp.val_at(1)?; | ||||
| 		let timestamp: u64 = rlp.val_at(2)?; | ||||
| 		self.check_timestamp(timestamp)?; | ||||
| 		let mut node = NodeEntry { id: *node_id, endpoint: dest }; | ||||
| 		if !node.endpoint.is_valid() { | ||||
| 			debug!(target: "discovery", "Bad address: {:?}", node); | ||||
| 			node.endpoint.address = *from; | ||||
| 		} | ||||
| 
 | ||||
| 		let is_expected = match self.in_flight_requests.entry(*node_id) { | ||||
| 		let expected_node = match self.in_flight_pings.entry(*node_id) { | ||||
| 			Entry::Occupied(entry) => { | ||||
| 				let is_expected = { | ||||
| 				let expected_node = { | ||||
| 					let request = entry.get(); | ||||
| 					request.packet_id == PACKET_PING && request.packet_hash == echo_hash | ||||
| 					if request.echo_hash != echo_hash && request.deprecated_echo_hash != echo_hash { | ||||
| 						debug!(target: "discovery", "Got unexpected Pong from {:?} ; packet_hash={:#x} ; expected_hash={:#x}", &from, request.echo_hash, echo_hash); | ||||
| 						None | ||||
| 					} else { | ||||
| 						if request.deprecated_echo_hash == echo_hash { | ||||
| 							trace!(target: "discovery", "Got Pong from an old parity-ethereum version."); | ||||
| 						} | ||||
| 						Some(request.node.clone()) | ||||
| 					} | ||||
| 				}; | ||||
| 				if is_expected { | ||||
| 
 | ||||
| 				if expected_node.is_some() { | ||||
| 					entry.remove(); | ||||
| 				} | ||||
| 				is_expected | ||||
| 				expected_node | ||||
| 			}, | ||||
| 			Entry::Vacant(_) => { | ||||
| 				None | ||||
| 			}, | ||||
| 			Entry::Vacant(_) => false | ||||
| 		}; | ||||
| 
 | ||||
| 		if is_expected { | ||||
| 		if let Some(node) = expected_node { | ||||
| 			Ok(self.update_node(node)) | ||||
| 		} else { | ||||
| 			debug!(target: "discovery", "Got unexpected Pong from {:?}", &from); | ||||
| 			debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from); | ||||
| 			Ok(None) | ||||
| 		} | ||||
| 	} | ||||
| @ -544,29 +576,32 @@ impl<'a> Discovery<'a> { | ||||
| 	fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> { | ||||
| 		let results_count = rlp.at(0)?.item_count()?; | ||||
| 
 | ||||
| 		let is_expected = match self.in_flight_requests.entry(*node_id) { | ||||
| 		let is_expected = match self.in_flight_find_nodes.entry(*node_id) { | ||||
| 			Entry::Occupied(mut entry) => { | ||||
| 				let result = { | ||||
| 				let expected = { | ||||
| 					let request = entry.get_mut(); | ||||
| 					if request.packet_id == PACKET_FIND_NODE && | ||||
| 						request.response_count + results_count <= BUCKET_SIZE | ||||
| 					{ | ||||
| 					// Mark the request as answered
 | ||||
| 					request.answered = true; | ||||
| 					if request.response_count + results_count <= BUCKET_SIZE { | ||||
| 						request.response_count += results_count; | ||||
| 						true | ||||
| 					} else { | ||||
| 						debug!(target: "discovery", "Got unexpected Neighbors from {:?} ; oversized packet ({} + {}) node_id={:#x}", &from, request.response_count, results_count, node_id); | ||||
| 						false | ||||
| 					} | ||||
| 				}; | ||||
| 				if entry.get().response_count == BUCKET_SIZE { | ||||
| 					entry.remove(); | ||||
| 				} | ||||
| 				result | ||||
| 				expected | ||||
| 			} | ||||
| 			Entry::Vacant(_) => false, | ||||
| 			Entry::Vacant(_) => { | ||||
| 				debug!(target: "discovery", "Got unexpected Neighbors from {:?} ; couldn't find node_id={:#x}", &from, node_id); | ||||
| 				false | ||||
| 			}, | ||||
| 		}; | ||||
| 
 | ||||
| 		if !is_expected { | ||||
| 			debug!(target: "discovery", "Got unexpected Neighbors from {:?}", &from); | ||||
| 			return Ok(None); | ||||
| 		} | ||||
| 
 | ||||
| @ -591,65 +626,74 @@ impl<'a> Discovery<'a> { | ||||
| 		Ok(None) | ||||
| 	} | ||||
| 
 | ||||
| 	fn check_expired(&mut self, time: Instant) -> HashSet<NodeId> { | ||||
| 		let mut removed: HashSet<NodeId> = HashSet::new(); | ||||
| 		while let Some((node_id, sent_at)) = self.expiring_pings.pop_front() { | ||||
| 			if time.duration_since(sent_at) <= PING_TIMEOUT { | ||||
| 				self.expiring_pings.push_front((node_id, sent_at)); | ||||
| 				break; | ||||
| 	fn check_expired(&mut self, time: Instant) { | ||||
| 		let mut nodes_to_expire = Vec::new(); | ||||
| 		self.in_flight_pings.retain(|node_id, ping_request| { | ||||
| 			if time.duration_since(ping_request.sent_at) > PING_TIMEOUT { | ||||
| 				debug!(target: "discovery", "Removing expired PING request for node_id={:#x}", node_id); | ||||
| 				nodes_to_expire.push(*node_id); | ||||
| 				false | ||||
| 			} else { | ||||
| 				true | ||||
| 			} | ||||
| 			self.expire_in_flight_request(node_id, sent_at, &mut removed); | ||||
| 		} | ||||
| 		while let Some((node_id, sent_at)) = self.expiring_finds.pop_front() { | ||||
| 			if time.duration_since(sent_at) <= FIND_NODE_TIMEOUT { | ||||
| 				self.expiring_finds.push_front((node_id, sent_at)); | ||||
| 				break; | ||||
| 			} | ||||
| 			self.expire_in_flight_request(node_id, sent_at, &mut removed); | ||||
| 		} | ||||
| 		removed | ||||
| 	} | ||||
| 
 | ||||
| 	fn expire_in_flight_request(&mut self, node_id: NodeId, sent_at: Instant, removed: &mut HashSet<NodeId>) { | ||||
| 		if let Entry::Occupied(entry) = self.in_flight_requests.entry(node_id) { | ||||
| 			if entry.get().sent_at == sent_at { | ||||
| 				entry.remove(); | ||||
| 
 | ||||
| 				// Attempt to remove from bucket if in one.
 | ||||
| 				let id_hash = keccak(&node_id); | ||||
| 				let dist = Discovery::distance(&self.id_hash, &id_hash) | ||||
| 					.expect("distance is None only if id hashes are equal; will never send request to self; qed"); | ||||
| 				let bucket = &mut self.node_buckets[dist]; | ||||
| 				if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) { | ||||
| 					if bucket.nodes[index].fail_count < self.request_backoff.len() { | ||||
| 						let node = &mut bucket.nodes[index]; | ||||
| 						node.backoff_until = Instant::now() + self.request_backoff[node.fail_count]; | ||||
| 						node.fail_count += 1; | ||||
| 						trace!( | ||||
| 							target: "discovery", | ||||
| 							"Requests to node {:?} timed out {} consecutive time(s)", | ||||
| 							&node.address, node.fail_count | ||||
| 						); | ||||
| 					} else { | ||||
| 						removed.insert(node_id); | ||||
| 						let node = bucket.nodes.remove(index).expect("index was located in if condition"); | ||||
| 						debug!(target: "discovery", "Removed expired node {:?}", &node.address); | ||||
| 					} | ||||
| 		}); | ||||
| 		self.in_flight_find_nodes.retain(|node_id, find_node_request| { | ||||
| 			if time.duration_since(find_node_request.sent_at) > FIND_NODE_TIMEOUT { | ||||
| 				if !find_node_request.answered { | ||||
| 					debug!(target: "discovery", "Removing expired FIND NODE request for node_id={:#x}", node_id); | ||||
| 					nodes_to_expire.push(*node_id); | ||||
| 				} | ||||
| 				false | ||||
| 			} else { | ||||
| 				true | ||||
| 			} | ||||
| 		}); | ||||
| 		for node_id in nodes_to_expire { | ||||
| 			self.expire_node_request(node_id); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn expire_node_request(&mut self, node_id: NodeId) { | ||||
| 		// Attempt to remove from bucket if in one.
 | ||||
| 		let id_hash = keccak(&node_id); | ||||
| 		let dist = Discovery::distance(&self.id_hash, &id_hash) | ||||
| 			.expect("distance is None only if id hashes are equal; will never send request to self; qed"); | ||||
| 		let bucket = &mut self.node_buckets[dist]; | ||||
| 		if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) { | ||||
| 			if bucket.nodes[index].fail_count < self.request_backoff.len() { | ||||
| 				let node = &mut bucket.nodes[index]; | ||||
| 				node.backoff_until = Instant::now() + self.request_backoff[node.fail_count]; | ||||
| 				node.fail_count += 1; | ||||
| 				trace!( | ||||
| 					target: "discovery", | ||||
| 					"Requests to node {:?} timed out {} consecutive time(s)", | ||||
| 					&node.address, node.fail_count | ||||
| 				); | ||||
| 			} else { | ||||
| 				let node = bucket.nodes.remove(index).expect("index was located in if condition"); | ||||
| 				debug!(target: "discovery", "Removed expired node {:?}", &node.address); | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	pub fn round(&mut self) -> Option<TableUpdates> { | ||||
| 		let removed = self.check_expired(Instant::now()); | ||||
| 		self.discover(); | ||||
| 		if !removed.is_empty() { | ||||
| 			Some(TableUpdates { added: HashMap::new(), removed }) | ||||
| 		} else { None } | ||||
| 
 | ||||
| 	pub fn round(&mut self) { | ||||
| 		self.check_expired(Instant::now()); | ||||
| 		self.update_new_nodes(); | ||||
| 
 | ||||
| 		if self.discovery_round.is_some() { | ||||
| 			self.discover(); | ||||
| 		// Start discovering if the first pings have been sent (or timed out)
 | ||||
| 		} else if self.in_flight_pings.len() == 0 && !self.discovery_initiated { | ||||
| 			self.discovery_initiated = true; | ||||
| 			self.refresh(); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	pub fn refresh(&mut self) { | ||||
| 		self.start(); | ||||
| 		if self.discovery_round.is_none() { | ||||
| 			self.start(); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	pub fn any_sends_queued(&self) -> bool { | ||||
| @ -663,6 +707,16 @@ impl<'a> Discovery<'a> { | ||||
| 	pub fn requeue_send(&mut self, datagram: Datagram) { | ||||
| 		self.send_queue.push_front(datagram) | ||||
| 	} | ||||
| 
 | ||||
| 	/// Add a list of known nodes to the table.
 | ||||
| 	#[cfg(test)] | ||||
| 	pub fn init_node_list(&mut self, nodes: Vec<NodeEntry>) { | ||||
| 		for n in nodes { | ||||
| 			if self.is_allowed(&n) { | ||||
| 				self.update_node(n); | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| fn append_expiration(rlp: &mut RlpStream) { | ||||
| @ -738,13 +792,13 @@ mod tests { | ||||
| 
 | ||||
| 		for i in 1..(MAX_NODES_PING+1) { | ||||
| 			discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); | ||||
| 			assert_eq!(discovery.in_flight_requests.len(), i); | ||||
| 			assert_eq!(discovery.in_flight_pings.len(), i); | ||||
| 			assert_eq!(discovery.send_queue.len(), i); | ||||
| 			assert_eq!(discovery.adding_nodes.len(), 0); | ||||
| 		} | ||||
| 		for i in 1..20 { | ||||
| 			discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); | ||||
| 			assert_eq!(discovery.in_flight_requests.len(), MAX_NODES_PING); | ||||
| 			assert_eq!(discovery.in_flight_pings.len(), MAX_NODES_PING); | ||||
| 			assert_eq!(discovery.send_queue.len(), MAX_NODES_PING); | ||||
| 			assert_eq!(discovery.adding_nodes.len(), i); | ||||
| 		} | ||||
| @ -821,23 +875,29 @@ mod tests { | ||||
| 		assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200); | ||||
| 
 | ||||
| 		// Requests have not expired yet.
 | ||||
| 		let removed = discovery.check_expired(Instant::now()).len(); | ||||
| 		let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 		discovery.check_expired(Instant::now()); | ||||
| 		let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 		assert_eq!(removed, 0); | ||||
| 
 | ||||
| 		// Expiring pings to bucket nodes removes them from bucket.
 | ||||
| 		let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); | ||||
| 		let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 		discovery.check_expired(Instant::now() + PING_TIMEOUT); | ||||
| 		let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 		assert!(removed > 0); | ||||
| 		assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200 - removed); | ||||
| 
 | ||||
| 		for _ in 0..100 { | ||||
| 			discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); | ||||
| 		} | ||||
| 		assert!(discovery.in_flight_requests.len() > 0); | ||||
| 		assert!(discovery.in_flight_pings.len() > 0); | ||||
| 
 | ||||
| 		// Expire pings to nodes that are not in buckets.
 | ||||
| 		let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); | ||||
| 		let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 		discovery.check_expired(Instant::now() + PING_TIMEOUT); | ||||
| 		let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 		assert_eq!(removed, 0); | ||||
| 		assert_eq!(discovery.in_flight_requests.len(), 0); | ||||
| 		assert_eq!(discovery.in_flight_pings.len(), 0); | ||||
| 
 | ||||
| 		let from = SocketAddr::from_str("99.99.99.99:40445").unwrap(); | ||||
| 
 | ||||
| @ -849,7 +909,9 @@ mod tests { | ||||
| 			discovery.on_packet(&packet, from.clone()).unwrap(); | ||||
| 		} | ||||
| 
 | ||||
| 		let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len(); | ||||
| 		let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 		discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT); | ||||
| 		let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 		assert!(removed > 0); | ||||
| 
 | ||||
| 		// FIND_NODE does not time out because it receives k results.
 | ||||
| @ -859,7 +921,9 @@ mod tests { | ||||
| 			discovery.on_packet(&packet, from.clone()).unwrap(); | ||||
| 		} | ||||
| 
 | ||||
| 		let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len(); | ||||
| 		let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 		discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT); | ||||
| 		let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 		assert_eq!(removed, 0); | ||||
| 
 | ||||
| 		// Test bucket evictions with retries.
 | ||||
| @ -868,12 +932,16 @@ mod tests { | ||||
| 
 | ||||
| 		for _ in 0..2 { | ||||
| 			discovery.ping(&node_entries[101]).unwrap(); | ||||
| 			let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); | ||||
| 			let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 			discovery.check_expired(Instant::now() + PING_TIMEOUT); | ||||
| 			let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 			assert_eq!(removed, 0); | ||||
| 		} | ||||
| 
 | ||||
| 		discovery.ping(&node_entries[101]).unwrap(); | ||||
| 		let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); | ||||
| 		let num_nodes = total_bucket_nodes(&discovery.node_buckets); | ||||
| 		discovery.check_expired(Instant::now() + PING_TIMEOUT); | ||||
| 		let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); | ||||
| 		assert_eq!(removed, 1); | ||||
| 	} | ||||
| 
 | ||||
| @ -1066,9 +1134,11 @@ mod tests { | ||||
| 		assert_eq!(ep1, NodeEndpoint::from_rlp(&rlp.at(1).unwrap()).unwrap()); | ||||
| 		assert_eq!(ep2, NodeEndpoint::from_rlp(&rlp.at(2).unwrap()).unwrap()); | ||||
| 
 | ||||
| 		// `discovery1` should be added to node table on ping received
 | ||||
| 		if let Some(_) = discovery2.on_packet(&ping_data.payload, ep1.address.clone()).unwrap() { | ||||
| 			panic!("Expected no changes to discovery2's table"); | ||||
| 		} | ||||
| 
 | ||||
| 		let pong_data = discovery2.dequeue_send().unwrap(); | ||||
| 		let data = &pong_data.payload[(32 + 65)..]; | ||||
| 		assert_eq!(data[0], PACKET_PONG); | ||||
|  | ||||
| @ -59,8 +59,9 @@ const TCP_ACCEPT: StreamToken = SYS_TIMER + 1; | ||||
| const IDLE: TimerToken = SYS_TIMER + 2; | ||||
| const DISCOVERY: StreamToken = SYS_TIMER + 3; | ||||
| const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4; | ||||
| const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 5; | ||||
| const NODE_TABLE: TimerToken = SYS_TIMER + 6; | ||||
| const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5; | ||||
| const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6; | ||||
| const NODE_TABLE: TimerToken = SYS_TIMER + 7; | ||||
| const FIRST_SESSION: StreamToken = 0; | ||||
| const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1; | ||||
| const USER_TIMER: TimerToken = LAST_SESSION + 256; | ||||
| @ -71,6 +72,8 @@ const SYS_TIMER: TimerToken = LAST_SESSION + 1; | ||||
| const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1); | ||||
| // for DISCOVERY_REFRESH TimerToken
 | ||||
| const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60); | ||||
| // for FAST_DISCOVERY_REFRESH TimerToken
 | ||||
| const FAST_DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(10); | ||||
| // for DISCOVERY_ROUND TimerToken
 | ||||
| const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300); | ||||
| // for NODE_TABLE TimerToken
 | ||||
| @ -478,10 +481,10 @@ impl Host { | ||||
| 			let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket"); | ||||
| 			*self.udp_socket.lock() = Some(socket); | ||||
| 
 | ||||
| 			discovery.init_node_list(self.nodes.read().entries()); | ||||
| 			discovery.add_node_list(self.nodes.read().entries()); | ||||
| 			*self.discovery.lock() = Some(discovery); | ||||
| 			io.register_stream(DISCOVERY)?; | ||||
| 			io.register_timer(FAST_DISCOVERY_REFRESH, FAST_DISCOVERY_REFRESH_TIMEOUT)?; | ||||
| 			io.register_timer(DISCOVERY_REFRESH, DISCOVERY_REFRESH_TIMEOUT)?; | ||||
| 			io.register_timer(DISCOVERY_ROUND, DISCOVERY_ROUND_TIMEOUT)?; | ||||
| 		} | ||||
| @ -533,6 +536,18 @@ impl Host { | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn has_enough_peers(&self) -> bool { | ||||
| 		let min_peers = { | ||||
| 			let info = self.info.read(); | ||||
| 			let config = &info.config; | ||||
| 
 | ||||
| 			config.min_peers | ||||
| 		}; | ||||
| 		let (_, egress_count, ingress_count) = self.session_count(); | ||||
| 
 | ||||
| 		return egress_count + ingress_count >= min_peers as usize; | ||||
| 	} | ||||
| 
 | ||||
| 	fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) { | ||||
| 		let (min_peers, mut pin, max_handshakes, allow_ips, self_id) = { | ||||
| 			let info = self.info.read(); | ||||
| @ -1014,16 +1029,23 @@ impl IoHandler<NetworkIoMessage> for Host { | ||||
| 			IDLE => self.maintain_network(io), | ||||
| 			FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), | ||||
| 			DISCOVERY_REFRESH => { | ||||
| 				if let Some(d) = self.discovery.lock().as_mut() { | ||||
| 					d.refresh(); | ||||
|                                 } | ||||
| 				// Run the _slow_ discovery if enough peers are connected
 | ||||
| 				if !self.has_enough_peers() { | ||||
| 					return; | ||||
| 				} | ||||
| 				self.discovery.lock().as_mut().map(|d| d.refresh()); | ||||
| 				io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); | ||||
| 			}, | ||||
| 			FAST_DISCOVERY_REFRESH => { | ||||
| 				// Run the fast discovery if not enough peers are connected
 | ||||
| 				if self.has_enough_peers() { | ||||
| 					return; | ||||
| 				} | ||||
| 				self.discovery.lock().as_mut().map(|d| d.refresh()); | ||||
| 				io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); | ||||
| 			}, | ||||
| 			DISCOVERY_ROUND => { | ||||
| 				let node_changes = { self.discovery.lock().as_mut().and_then(|d| d.round()) }; | ||||
| 				if let Some(node_changes) = node_changes { | ||||
| 					self.update_nodes(io, node_changes); | ||||
| 				} | ||||
| 				self.discovery.lock().as_mut().map(|d| d.round()); | ||||
| 				io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); | ||||
| 			}, | ||||
| 			NODE_TABLE => { | ||||
|  | ||||
| @ -385,7 +385,7 @@ impl NodeTable { | ||||
| 			None => return, | ||||
| 		}; | ||||
| 		if let Err(e) = fs::create_dir_all(&path) { | ||||
| 			warn!("Error creating node table directory: {:?}", e); | ||||
| 			warn!(target: "network", "Error creating node table directory: {:?}", e); | ||||
| 			return; | ||||
| 		} | ||||
| 		path.push(NODES_FILE); | ||||
| @ -400,11 +400,11 @@ impl NodeTable { | ||||
| 		match fs::File::create(&path) { | ||||
| 			Ok(file) => { | ||||
| 				if let Err(e) = serde_json::to_writer_pretty(file, &table) { | ||||
| 					warn!("Error writing node table file: {:?}", e); | ||||
| 					warn!(target: "network", "Error writing node table file: {:?}", e); | ||||
| 				} | ||||
| 			}, | ||||
| 			Err(e) => { | ||||
| 				warn!("Error creating node table file: {:?}", e); | ||||
| 				warn!(target: "network", "Error creating node table file: {:?}", e); | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| @ -418,7 +418,7 @@ impl NodeTable { | ||||
| 		let file = match fs::File::open(&path) { | ||||
| 			Ok(file) => file, | ||||
| 			Err(e) => { | ||||
| 				debug!("Error opening node table file: {:?}", e); | ||||
| 				debug!(target: "network", "Error opening node table file: {:?}", e); | ||||
| 				return Default::default(); | ||||
| 			}, | ||||
| 		}; | ||||
| @ -431,7 +431,7 @@ impl NodeTable { | ||||
| 					.collect() | ||||
| 			}, | ||||
| 			Err(e) => { | ||||
| 				warn!("Error reading node table file: {:?}", e); | ||||
| 				warn!(target: "network", "Error reading node table file: {:?}", e); | ||||
| 				Default::default() | ||||
| 			}, | ||||
| 		} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user