diff --git a/util/io/src/service.rs b/util/io/src/service.rs index 01f390a2c..da2e653ec 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service.rs @@ -399,8 +399,8 @@ impl IoService where Message: Send + Sync + Clone + 'static { let thread = thread::spawn(move || { let p = panic.clone(); panic.catch_panic(move || { - IoManager::::start(p, &mut event_loop, h).unwrap(); - }).unwrap() + IoManager::::start(p, &mut event_loop, h).expect("Error starting IO service"); + }).expect("Error starting panic handler") }); Ok(IoService { panic_handler: panic_handler, @@ -434,7 +434,11 @@ impl Drop for IoService where Message: Send + Sync + Clone { fn drop(&mut self) { trace!(target: "shutdown", "[IoService] Closing..."); self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e)); - self.thread.take().unwrap().join().ok(); + if let Some(thread) = self.thread.take() { + thread.join().unwrap_or_else(|e| { + debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e); + }); + } trace!(target: "shutdown", "[IoService] Closed."); } } diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index fd2040468..a946fff56 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -15,7 +15,6 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::mem; use std::thread::{JoinHandle, self}; use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use crossbeam::sync::chase_lev; @@ -81,7 +80,7 @@ impl Worker { LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE)); panic_handler.catch_panic(move || { Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting) - }).unwrap() + }).expect("Error starting panic handler") }) .expect("Error creating worker thread")); worker @@ -94,7 +93,7 @@ impl Worker { where Message: Send + Sync + Clone + 'static { loop { { - let lock = wait_mutex.lock().unwrap(); + let lock = wait_mutex.lock().expect("Poisoned work_loop mutex"); if deleting.load(AtomicOrdering::Acquire) { return; } @@ -134,11 +133,12 @@ impl Worker { impl Drop for Worker { fn drop(&mut self) { trace!(target: "shutdown", "[IoWorker] Closing..."); - let _ = self.wait_mutex.lock().unwrap(); + let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex"); self.deleting.store(true, AtomicOrdering::Release); self.wait.notify_all(); - let thread = mem::replace(&mut self.thread, None).unwrap(); - thread.join().ok(); + if let Some(thread) = self.thread.take() { + thread.join().ok(); + } trace!(target: "shutdown", "[IoWorker] Closed"); } } diff --git a/util/network/src/connection.rs b/util/network/src/connection.rs index 0d7c14239..05976b5e6 100644 --- a/util/network/src/connection.rs +++ b/util/network/src/connection.rs @@ -125,11 +125,11 @@ impl GenericConnection { /// Writable IO handler. Called when the socket is ready to send. pub fn writable(&mut self, io: &IoContext) -> Result where Message: Send + Clone + Sync + 'static { - if self.send_queue.is_empty() { - return Ok(WriteStatus::Complete) - } { - let buf = self.send_queue.front_mut().unwrap(); + let buf = match self.send_queue.front_mut() { + Some(buf) => buf, + None => return Ok(WriteStatus::Complete), + }; let send_size = buf.get_ref().len(); let pos = buf.position() as usize; if (pos as usize) >= send_size { @@ -439,7 +439,7 @@ impl EncryptedConnection { let mut prev = H128::new(); mac.clone().finalize(&mut prev); let mut enc = H128::new(); - mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).unwrap(); + mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).expect("Error updating MAC"); mac_encoder.reset(); enc = enc ^ if seed.is_empty() { prev } else { H128::from_slice(seed) }; diff --git a/util/network/src/discovery.rs b/util/network/src/discovery.rs index ef5573d4d..18bd858eb 100644 --- a/util/network/src/discovery.rs +++ b/util/network/src/discovery.rs @@ -156,7 +156,7 @@ impl Discovery { trace!(target: "discovery", "Inserting {:?}", &e); let id_hash = e.id.sha3(); let ping = { - let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id_hash) as usize).unwrap(); + let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id_hash) as usize]; let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) { node.address = e.clone(); node.timeout = None; @@ -169,8 +169,9 @@ impl Discovery { if bucket.nodes.len() > BUCKET_SIZE { //ping least active node - bucket.nodes.back_mut().unwrap().timeout = Some(time::precise_time_ns()); - Some(bucket.nodes.back().unwrap().address.endpoint.clone()) + let mut last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0"); + last.timeout = Some(time::precise_time_ns()); + Some(last.address.endpoint.clone()) } else { None } }; if let Some(endpoint) = ping { @@ -179,7 +180,7 @@ impl Discovery { } fn clear_ping(&mut self, id: &NodeId) { - let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id.sha3()) as usize).unwrap(); + let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id.sha3()) as usize]; if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) { node.timeout = None; } @@ -294,7 +295,7 @@ impl Discovery { if count == BUCKET_SIZE { // delete the most distant element let remove = { - let (key, last) = found.iter_mut().next_back().unwrap(); + let (key, last) = found.iter_mut().next_back().expect("Last element is always Some when count > 0"); last.pop(); if last.is_empty() { Some(key.clone()) } else { None } }; @@ -316,8 +317,7 @@ impl Discovery { } pub fn writable(&mut self, io: &IoContext) where Message: Send + Sync + Clone { - while !self.send_queue.is_empty() { - let data = self.send_queue.pop_front().unwrap(); + while let Some(data) = self.send_queue.pop_front() { match self.udp_socket.send_to(&data.payload, &data.address) { Ok(Some(size)) if size == data.payload.len() => { }, diff --git a/util/network/src/host.rs b/util/network/src/host.rs index b693d0abd..887b7ade0 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::net::SocketAddr; +use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; @@ -48,6 +48,8 @@ type Slab = ::slab::Slab; const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES; const MAX_HANDSHAKES: usize = 1024; +const DEFAULT_PORT: u16 = 30303; + // Tokens const TCP_ACCEPT: usize = SYS_TIMER + 1; const IDLE: usize = SYS_TIMER + 2; @@ -135,14 +137,14 @@ impl NetworkConfiguration { /// Create new default configuration with sepcified listen port. pub fn new_with_port(port: u16) -> NetworkConfiguration { let mut config = NetworkConfiguration::new(); - config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap()); + config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port))); config } /// Create new default configuration for localhost-only connection with random port (usefull for testing) pub fn new_local() -> NetworkConfiguration { let mut config = NetworkConfiguration::new(); - config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap()); + config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0))); config.nat_enabled = false; config } @@ -259,7 +261,7 @@ impl<'s> NetworkContext<'s> { /// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing. pub fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), NetworkError> { assert!(self.session.is_some(), "Respond called without network context"); - self.send(self.session_id.unwrap(), packet_id, data) + self.session_id.map_or_else(|| Err(NetworkError::Expired), |id| self.send(id, packet_id, data)) } /// Get an IoChannel. @@ -382,16 +384,16 @@ impl Host { trace!(target: "host", "Creating new Host object"); let mut listen_address = match config.listen_address { - None => SocketAddr::from_str("0.0.0.0:30304").unwrap(), + None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)), Some(addr) => addr, }; let keys = if let Some(ref secret) = config.use_secret { - KeyPair::from_secret(secret.clone()).unwrap() + try!(KeyPair::from_secret(secret.clone())) } else { config.config_path.clone().and_then(|ref p| load_key(Path::new(&p))) .map_or_else(|| { - let key = Random.generate().unwrap(); + let key = Random.generate().expect("Error generating random key pair"); if let Some(path) = config.config_path.clone() { save_key(Path::new(&path), key.secret()); } @@ -488,7 +490,7 @@ impl Host { let mut s = e.lock(); { let id = s.id(); - if id.is_some() && reserved.contains(id.unwrap()) { + if id.map_or(false, |id| reserved.contains(id)) { continue; } } @@ -814,11 +816,12 @@ impl Host { (info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny) }; + // Check for the session limit. session_counts accounts for the new session. if reserved_only || - (s.info.originated && session_count >= min_peers) || - (!s.info.originated && session_count >= max_peers) { + (s.info.originated && session_count > min_peers) || + (!s.info.originated && session_count > max_peers) { // only proceed if the connecting peer is reserved. - if !self.reserved_nodes.read().contains(s.id().unwrap()) { + if !self.reserved_nodes.read().contains(s.id().expect("Ready session always has id")) { s.disconnect(io, DisconnectReason::TooManyPeers); return; } @@ -827,7 +830,7 @@ impl Host { // Add it to the node table if !s.info.originated { if let Ok(address) = s.remote_addr() { - let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; + let entry = NodeEntry { id: s.id().expect("Ready session always has id").clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); let mut discovery = self.discovery.lock(); if let Some(ref mut discovery) = *discovery { @@ -861,15 +864,17 @@ impl Host { } let handlers = self.handlers.read(); for p in ready_data { - let h = handlers.get(&p).unwrap().clone(); self.stats.inc_sessions(); let reserved = self.reserved_nodes.read(); - h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); + if let Some(h) = handlers.get(&p).clone() { + h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); + } } for (p, packet_id, data) in packet_data { - let h = handlers.get(&p).unwrap().clone(); let reserved = self.reserved_nodes.read(); - h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); + if let Some(h) = handlers.get(&p).clone() { + h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); + } } } @@ -909,9 +914,10 @@ impl Host { } } for p in to_disconnect { - let h = self.handlers.read().get(&p).unwrap().clone(); let reserved = self.reserved_nodes.read(); - h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); + if let Some(h) = self.handlers.read().get(&p).clone() { + h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); + } } if deregister { io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); @@ -975,7 +981,7 @@ impl IoHandler for Host { match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), DISCOVERY => { - let node_changes = { self.discovery.lock().as_mut().unwrap().readable(io) }; + let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -992,7 +998,7 @@ impl IoHandler for Host { match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), DISCOVERY => { - self.discovery.lock().as_mut().unwrap().writable(io); + self.discovery.lock().as_mut().map(|d| d.writable(io)); } _ => panic!("Received unknown writable token"), } @@ -1006,11 +1012,11 @@ impl IoHandler for Host { IDLE => self.maintain_network(io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { - self.discovery.lock().as_mut().unwrap().refresh(); + self.discovery.lock().as_mut().map(|d| d.refresh()); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, DISCOVERY_ROUND => { - let node_changes = { self.discovery.lock().as_mut().unwrap().round() }; + let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.round()) }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -1102,7 +1108,7 @@ impl IoHandler for Host { session.lock().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), + DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"), TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } @@ -1130,7 +1136,7 @@ impl IoHandler for Host { connection.lock().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), + DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"), TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } @@ -1200,7 +1206,7 @@ fn key_save_load() { #[test] fn host_client_url() { - let mut config = NetworkConfiguration::new(); + let mut config = NetworkConfiguration::new_local(); let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".into(); config.use_secret = Some(key); let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap(); diff --git a/util/network/src/node_table.rs b/util/network/src/node_table.rs index 97fb29607..be70bd9a1 100644 --- a/util/network/src/node_table.rs +++ b/util/network/src/node_table.rs @@ -280,7 +280,7 @@ impl NodeTable { json.push_str("\"nodes\": [\n"); let node_ids = self.nodes(AllowIP::All); for i in 0 .. node_ids.len() { - let node = self.nodes.get(&node_ids[i]).unwrap(); + let node = self.nodes.get(&node_ids[i]).expect("self.nodes() only returns node IDs from self.nodes"); json.push_str(&format!("\t{{ \"url\": \"{}\", \"failures\": {} }}{}\n", node, node.failures, if i == node_ids.len() - 1 {""} else {","})) } json.push_str("]\n"); diff --git a/util/network/src/session.rs b/util/network/src/session.rs index b1224f56d..3a4779208 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -388,8 +388,9 @@ impl Session { Ok(SessionData::Continue) }, PACKET_PONG => { - self.pong_time_ns = Some(time::precise_time_ns()); - self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000); + let time = time::precise_time_ns(); + self.pong_time_ns = Some(time); + self.info.ping_ms = Some((time - self.ping_time_ns) / 1000_000); Ok(SessionData::Continue) }, PACKET_GET_PEERS => Ok(SessionData::None), //TODO;