[devp2p discovery]: cleanup (#11547)

* [devp2p discovery]: cleanup

* [devp2p]: remove lifetime from Discovery

This commit it removes the lifetime on type `Discovery` by making `request_backoff: &'static [Duration]`
instead.

* [devp2p discovery]: pass SockAddr by value

* [devp2p discovery]: remove needless clones

* [devp2p discovery]: take payload by value
This commit is contained in:
Niklas Adolfsson 2020-03-10 18:35:49 +01:00 committed by GitHub
parent 9e77e7e193
commit b7c97f90b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 50 deletions

View File

@ -121,7 +121,7 @@ enum NodeValidity {
#[derive(Debug)]
enum BucketError {
Ourselves,
NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize},
NotInTheBucket { node_entry: NodeEntry, bucket_distance: usize },
}
struct PingRequest {
@ -137,22 +137,14 @@ struct PingRequest {
reason: PingReason
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct NodeBucket {
nodes: VecDeque<BucketEntry>, //sorted by last active
}
impl Default for NodeBucket {
fn default() -> Self {
NodeBucket::new()
}
}
impl NodeBucket {
fn new() -> Self {
NodeBucket {
nodes: VecDeque::new()
}
Self::default()
}
}
@ -161,7 +153,7 @@ pub struct Datagram {
pub address: SocketAddr,
}
pub struct Discovery<'a> {
pub struct Discovery {
id: NodeId,
id_hash: H256,
secret: Secret,
@ -182,7 +174,7 @@ pub struct Discovery<'a> {
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
ip_filter: IpFilter,
request_backoff: &'a [Duration],
request_backoff: &'static [Duration],
}
pub struct TableUpdates {
@ -190,8 +182,8 @@ pub struct TableUpdates {
pub removed: HashSet<NodeId>,
}
impl<'a> Discovery<'a> {
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery<'static> {
impl Discovery {
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
Discovery {
id: *key.public(),
id_hash: keccak(key.public()),
@ -243,7 +235,8 @@ impl<'a> Discovery<'a> {
};
let bucket = &mut self.node_buckets[dist];
bucket.nodes.iter_mut().find(|n| n.address.id == e.id)
.map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| {
.ok_or_else(|| BucketError::NotInTheBucket { node_entry: e.clone(), bucket_distance: dist })
.and_then(|entry| {
entry.address = e;
entry.last_seen = Instant::now();
entry.backoff_until = Instant::now();
@ -389,14 +382,14 @@ impl<'a> Discovery<'a> {
node.endpoint.to_rlp_list(&mut rlp);
append_expiration(&mut rlp);
let old_parity_hash = keccak(rlp.as_raw());
let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?;
let hash = self.send_packet(PACKET_PING, node.endpoint.udp_address(), rlp.drain())?;
self.in_flight_pings.insert(node.id, PingRequest {
sent_at: Instant::now(),
node: node.clone(),
echo_hash: hash,
deprecated_echo_hash: old_parity_hash,
reason: reason
reason,
});
trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id);
@ -407,7 +400,7 @@ impl<'a> Discovery<'a> {
let mut rlp = RlpStream::new_list(2);
rlp.append(target);
append_expiration(&mut rlp);
self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?;
self.send_packet(PACKET_FIND_NODE, node.endpoint.udp_address(), rlp.drain())?;
self.in_flight_find_nodes.insert(node.id, FindNodeRequest {
sent_at: Instant::now(),
@ -419,10 +412,10 @@ impl<'a> Discovery<'a> {
Ok(())
}
fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) -> Result<H256, Error> {
fn send_packet(&mut self, packet_id: u8, address: SocketAddr, payload: Bytes) -> Result<H256, Error> {
let packet = assemble_packet(packet_id, payload, &self.secret)?;
let hash = H256::from_slice(&packet[0..32]);
self.send_to(packet, address.clone());
self.send_to(packet, address);
Ok(hash)
}
@ -498,10 +491,10 @@ impl<'a> Discovery<'a> {
let packet_id = signed[0];
let rlp = Rlp::new(&signed[1..]);
match packet_id {
PACKET_PING => self.on_ping(&rlp, &node_id, &from, hash_signed.as_bytes()),
PACKET_PONG => self.on_pong(&rlp, &node_id, &from),
PACKET_FIND_NODE => self.on_find_node(&rlp, &node_id, &from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, &node_id, &from),
PACKET_PING => self.on_ping(&rlp, node_id, from, hash_signed.as_bytes()),
PACKET_PONG => self.on_pong(&rlp, node_id, from),
PACKET_FIND_NODE => self.on_find_node(&rlp, node_id, from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, node_id, from),
_ => {
debug!(target: "discovery", "Unknown UDP packet: {}", packet_id);
Ok(None)
@ -523,12 +516,12 @@ impl<'a> Discovery<'a> {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
}
fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from);
let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) {
node_endpoint
} else {
let mut address = from.clone();
let mut address = from;
// address here is the node's tcp port. If we are unable to get the `NodeEndpoint` from the `ping_from`
// rlp field then this is most likely a BootNode, set the tcp port to 0 because it can not be used for syncing.
address.set_port(0);
@ -542,7 +535,7 @@ impl<'a> Discovery<'a> {
self.check_timestamp(timestamp)?;
let mut response = RlpStream::new_list(3);
let pong_to = NodeEndpoint {
address: from.clone(),
address: from,
udp_port: ping_from.udp_port
};
// Here the PONG's `To` field should be the node we are
@ -555,27 +548,27 @@ impl<'a> Discovery<'a> {
response.append(&echo_hash);
append_expiration(&mut response);
self.send_packet(PACKET_PONG, from, &response.drain())?;
self.send_packet(PACKET_PONG, from, response.drain())?;
let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() };
let entry = NodeEntry { id: node_id, endpoint: pong_to };
if !entry.endpoint.is_valid_discovery_node() {
debug!(target: "discovery", "Got bad address: {:?}", entry);
} else if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
} else {
self.add_node(entry.clone());
self.add_node(entry);
}
Ok(None)
}
fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
fn on_pong(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Pong from {:?} ; node_id={:#x}", &from, node_id);
let _pong_to = NodeEndpoint::from_rlp(&rlp.at(0)?)?;
let echo_hash: H256 = rlp.val_at(1)?;
let timestamp: u64 = rlp.val_at(2)?;
self.check_timestamp(timestamp)?;
let expected_node = match self.in_flight_pings.entry(*node_id) {
let expected_node = match self.in_flight_pings.entry(node_id) {
Entry::Occupied(entry) => {
let expected_node = {
let request = entry.get();
@ -586,7 +579,7 @@ impl<'a> Discovery<'a> {
if request.deprecated_echo_hash == echo_hash {
trace!(target: "discovery", "Got Pong from an old open-ethereum version.");
}
Some((request.node.clone(), request.reason.clone()))
Some((request.node.clone(), request.reason))
}
};
@ -629,16 +622,16 @@ impl<'a> Discovery<'a> {
}
}
fn on_find_node(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
fn on_find_node(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got FindNode from {:?}", &from);
let target: NodeId = rlp.val_at(0)?;
let timestamp: u64 = rlp.val_at(1)?;
self.check_timestamp(timestamp)?;
let node = NodeEntry {
id: node_id.clone(),
id: node_id,
endpoint: NodeEndpoint {
address: *from,
address: from,
udp_port: from.port()
}
};
@ -688,7 +681,7 @@ impl<'a> Discovery<'a> {
}
let mut packets = Discovery::prepare_neighbours_packets(&nearest);
for p in packets.drain(..) {
self.send_packet(PACKET_NEIGHBOURS, &node.endpoint.address, &p)?;
self.send_packet(PACKET_NEIGHBOURS, node.endpoint.address, p)?;
}
trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint);
Ok(())
@ -711,10 +704,10 @@ impl<'a> Discovery<'a> {
packets.collect()
}
fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
fn on_neighbours(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
let results_count = rlp.at(0)?.item_count()?;
let is_expected = match self.in_flight_find_nodes.entry(*node_id) {
let is_expected = match self.in_flight_find_nodes.entry(node_id) {
Entry::Occupied(mut entry) => {
let expected = {
let request = entry.get_mut();
@ -862,11 +855,11 @@ fn append_expiration(rlp: &mut RlpStream) {
rlp.append(&timestamp);
}
fn assemble_packet(packet_id: u8, bytes: &[u8], secret: &Secret) -> Result<Bytes, Error> {
let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65 + 1);
fn assemble_packet(packet_id: u8, payload: Bytes, secret: &Secret) -> Result<Bytes, Error> {
let mut packet = Bytes::with_capacity(payload.len() + 32 + 65 + 1);
packet.resize(32 + 65, 0); // Filled in below
packet.push(packet_id);
packet.extend_from_slice(bytes);
packet.extend(payload);
let hash = keccak(&packet[(32 + 65)..]);
let signature = match sign(secret, &hash) {
@ -1043,7 +1036,7 @@ mod tests {
let key = Random.generate();
discovery.send_find_node(&node_entries[100], key.public()).unwrap();
for payload in Discovery::prepare_neighbours_packets(&node_entries[101..116]) {
let packet = assemble_packet(PACKET_NEIGHBOURS, &payload, &key.secret()).unwrap();
let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from.clone()).unwrap();
}
@ -1055,7 +1048,7 @@ mod tests {
// FIND_NODE does not time out because it receives k results.
discovery.send_find_node(&node_entries[100], key.public()).unwrap();
for payload in Discovery::prepare_neighbours_packets(&node_entries[101..117]) {
let packet = assemble_packet(PACKET_NEIGHBOURS, &payload, &key.secret()).unwrap();
let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from.clone()).unwrap();
}
@ -1065,8 +1058,8 @@ mod tests {
assert_eq!(removed, 0);
// Test bucket evictions with retries.
let request_backoff = [Duration::new(0, 0); 2];
let mut discovery = Discovery { request_backoff: &request_backoff, ..discovery };
const TEST_REQUEST_BACKOFF: [Duration; 2] = [Duration::from_secs(0); 2];
let mut discovery = Discovery { request_backoff: &TEST_REQUEST_BACKOFF, ..discovery };
for _ in 0..2 {
discovery.ping(&node_entries[101], PingReason::Default).unwrap();
@ -1289,7 +1282,7 @@ mod tests {
incorrect_pong_rlp.append(&H256::zero());
append_expiration(&mut incorrect_pong_rlp);
let incorrect_pong_data = assemble_packet(
PACKET_PONG, &incorrect_pong_rlp.drain(), &discovery2.secret
PACKET_PONG, incorrect_pong_rlp.drain(), &discovery2.secret
).unwrap();
if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address.clone()).unwrap() {
panic!("Expected no changes to discovery1's table because pong hash is incorrect");
@ -1318,7 +1311,7 @@ mod tests {
unexpected_pong_rlp.append(&H256::zero());
append_expiration(&mut unexpected_pong_rlp);
let unexpected_pong = assemble_packet(
PACKET_PONG, &unexpected_pong_rlp.drain(), key3.secret()
PACKET_PONG, unexpected_pong_rlp.drain(), key3.secret()
).unwrap();
if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address.clone()).unwrap() {
panic!("Expected no changes to discovery1's table for unexpected pong");

View File

@ -267,7 +267,7 @@ pub struct Host {
udp_socket: Mutex<Option<UdpSocket>>,
tcp_listener: Mutex<TcpListener>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery<'static>>>,
discovery: Mutex<Option<Discovery>>,
nodes: RwLock<NodeTable>,
handlers: RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,