From 12cbe93fbe20b06a78a5b3d3fed9a8cb6a58bfac Mon Sep 17 00:00:00 2001 From: Valentin Shergin Date: Tue, 7 Apr 2020 12:50:45 -0700 Subject: [PATCH] Deduplicating crate dependencies (part 2 of n, `slab`) (#11613) The change includes only `slab` module. --- Cargo.lock | 8 +---- util/io/Cargo.toml | 2 +- util/network-devp2p/Cargo.toml | 2 +- util/network-devp2p/src/host.rs | 52 +++++++++++++++------------------ 4 files changed, 27 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a8a21a71..8c76c6c3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1474,7 +1474,7 @@ dependencies = [ "secp256k1", "serde", "serde_json", - "slab 0.2.0", + "slab 0.4.2", "tempfile", "tiny-keccak 1.5.0", ] @@ -4591,12 +4591,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e88f89a550c01e4cd809f3df4f52dc9e939f3273a2017eabd5c6d12fd98bb23" -[[package]] -name = "slab" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbdd334bd28d328dad1c41b0ea662517883d8880d8533895ef96c8003dec9c4" - [[package]] name = "slab" version = "0.3.0" diff --git a/util/io/Cargo.toml b/util/io/Cargo.toml index cf9cba493..39bcc262e 100644 --- a/util/io/Cargo.toml +++ b/util/io/Cargo.toml @@ -13,7 +13,7 @@ mio = { version = "0.6.19", optional = true } crossbeam-deque = "0.7.3" parking_lot = "0.10.0" log = "0.4" -slab = "0.4" +slab = "0.4.2" num_cpus = "1.8" timer = "0.2" time = "0.1" diff --git a/util/network-devp2p/Cargo.toml b/util/network-devp2p/Cargo.toml index 9417ea0d2..b07f7ba3a 100644 --- a/util/network-devp2p/Cargo.toml +++ b/util/network-devp2p/Cargo.toml @@ -33,7 +33,7 @@ rlp = "0.4.0" secp256k1 = "0.17" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -slab = "0.2" +slab = "0.4.2" tiny-keccak = "1.4" [dev-dependencies] diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 9571ddba1..1d843e83a 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -24,6 +24,7 @@ use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::time::Duration; +use slab::Slab; use ethereum_types::H256; use keccak_hash::keccak; @@ -55,8 +56,6 @@ use crate::{ session::{Session, SessionData} }; -type Slab = ::slab::Slab; - const MAX_SESSIONS: usize = 2048 + MAX_HANDSHAKES; const MAX_HANDSHAKES: usize = 1024; @@ -153,7 +152,7 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> { let session = self.resolve_session(peer); if let Some(session) = session { session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?; - } else { + } else { trace!(target: "network", "Send: Peer no longer exist") } Ok(()) @@ -339,7 +338,7 @@ impl Host { discovery: Mutex::new(None), udp_socket: Mutex::new(None), tcp_listener: Mutex::new(tcp_listener), - sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), + sessions: Arc::new(RwLock::new(Slab::with_capacity(MAX_SESSIONS))), nodes: RwLock::new(NodeTable::new(path)), handlers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()), @@ -399,7 +398,7 @@ impl Host { // disconnect all non-reserved peers here. let reserved: HashSet = self.reserved_nodes.read().clone(); let mut to_kill = Vec::new(); - for e in self.sessions.read().iter() { + for (_, e) in self.sessions.read().iter() { let mut s = e.lock(); { let id = s.id(); @@ -439,7 +438,7 @@ impl Host { pub fn stop(&self, io: &IoContext) { self.stopping.store(true, AtomicOrdering::Release); let mut to_kill = Vec::new(); - for e in self.sessions.read().iter() { + for (_, e) in self.sessions.read().iter() { let mut s = e.lock(); s.disconnect(io, DisconnectReason::ClientQuit); to_kill.push(s.token()); @@ -456,7 +455,7 @@ impl Host { let sessions = self.sessions.read(); let sessions = &*sessions; - let mut peers = Vec::with_capacity(sessions.count()); + let mut peers = Vec::with_capacity(sessions.len()); for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) { if sessions.get(i).is_some() { peers.push(i); @@ -533,7 +532,7 @@ impl Host { } fn have_session(&self, id: &NodeId) -> bool { - self.sessions.read().iter().any(|e| e.lock().info.id == Some(*id)) + self.sessions.read().iter().any(|(_, e)| e.lock().info.id == Some(*id)) } // returns (handshakes, egress, ingress) @@ -541,7 +540,7 @@ impl Host { let mut handshakes = 0; let mut egress = 0; let mut ingress = 0; - for s in self.sessions.read().iter() { + for (_, s) in self.sessions.read().iter() { match s.try_lock() { Some(ref s) if s.is_ready() && s.info.originated => egress += 1, Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1, @@ -552,12 +551,12 @@ impl Host { } fn connecting_to(&self, id: &NodeId) -> bool { - self.sessions.read().iter().any(|e| e.lock().id() == Some(id)) + self.sessions.read().iter().any(|(_, e)| e.lock().id() == Some(id)) } fn keep_alive(&self, io: &IoContext) { let mut to_kill = Vec::new(); - for e in self.sessions.read().iter() { + for (_, e) in self.sessions.read().iter() { let mut s = e.lock(); if !s.keep_alive(io) { s.disconnect(io, DisconnectReason::PingTimeout); @@ -674,21 +673,18 @@ impl Host { let nonce = self.info.write().next_nonce(); let mut sessions = self.sessions.write(); - let token = sessions.insert_with_opt(|token| { - trace!(target: "network", "{}: Initiating session {:?}", token, id); - match Session::new(io, socket, token, id, &nonce, &self.info.read()) { - Ok(s) => Some(Arc::new(Mutex::new(s))), - Err(e) => { - debug!(target: "network", "Session create error: {:?}", e); - None - } - } - }); + let entry = sessions.vacant_entry(); + let key = entry.key(); - match token { - Some(t) => io.register_stream(t).map(|_| ()).map_err(Into::into), - None => { - debug!(target: "network", "Max sessions reached"); + trace!(target: "network", "{}: Initiating session {:?}", key, id); + + match Session::new(io, socket, key, id, &nonce, &self.info.read()) { + Ok(session) => { + entry.insert(Arc::new(Mutex::new(session))); + io.register_stream(key).map(|_| ()).map_err(Into::into) + }, + Err(e) => { + debug!(target: "network", "Session create error: {:?}", e); Ok(()) } } @@ -854,7 +850,7 @@ impl Host { let handlers = self.handlers.read(); if !ready_data.is_empty() { - let duplicate = self.sessions.read().iter().any(|e| { + let duplicate = self.sessions.read().iter().any(|(_, e)| { let session = e.lock(); session.token() != token && session.info.id == ready_id }); @@ -961,7 +957,7 @@ impl Host { if !s.expired() { if s.is_ready() { for (p, _) in self.handlers.read().iter() { - if s.have_capability(*p) { + if s.have_capability(*p) { to_disconnect.push(*p); } } @@ -992,7 +988,7 @@ impl Host { let mut to_remove: Vec = Vec::new(); { let sessions = self.sessions.read(); - for c in sessions.iter() { + for (_, c) in sessions.iter() { let s = c.lock(); if let Some(id) = s.id() { if node_changes.removed.contains(id) {