Sweep panickers from IO and network (#3018)
* Sweep panickers from IO and network * Typo and logging
This commit is contained in:
parent
60df9857ce
commit
2e70abdc40
@ -399,8 +399,8 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
|||||||
let thread = thread::spawn(move || {
|
let thread = thread::spawn(move || {
|
||||||
let p = panic.clone();
|
let p = panic.clone();
|
||||||
panic.catch_panic(move || {
|
panic.catch_panic(move || {
|
||||||
IoManager::<Message>::start(p, &mut event_loop, h).unwrap();
|
IoManager::<Message>::start(p, &mut event_loop, h).expect("Error starting IO service");
|
||||||
}).unwrap()
|
}).expect("Error starting panic handler")
|
||||||
});
|
});
|
||||||
Ok(IoService {
|
Ok(IoService {
|
||||||
panic_handler: panic_handler,
|
panic_handler: panic_handler,
|
||||||
@ -434,7 +434,11 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
trace!(target: "shutdown", "[IoService] Closing...");
|
trace!(target: "shutdown", "[IoService] Closing...");
|
||||||
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
|
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
|
||||||
self.thread.take().unwrap().join().ok();
|
if let Some(thread) = self.thread.take() {
|
||||||
|
thread.join().unwrap_or_else(|e| {
|
||||||
|
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
trace!(target: "shutdown", "[IoService] Closed.");
|
trace!(target: "shutdown", "[IoService] Closed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::mem;
|
|
||||||
use std::thread::{JoinHandle, self};
|
use std::thread::{JoinHandle, self};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||||
use crossbeam::sync::chase_lev;
|
use crossbeam::sync::chase_lev;
|
||||||
@ -81,7 +80,7 @@ impl Worker {
|
|||||||
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
|
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
|
||||||
panic_handler.catch_panic(move || {
|
panic_handler.catch_panic(move || {
|
||||||
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
|
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
|
||||||
}).unwrap()
|
}).expect("Error starting panic handler")
|
||||||
})
|
})
|
||||||
.expect("Error creating worker thread"));
|
.expect("Error creating worker thread"));
|
||||||
worker
|
worker
|
||||||
@ -94,7 +93,7 @@ impl Worker {
|
|||||||
where Message: Send + Sync + Clone + 'static {
|
where Message: Send + Sync + Clone + 'static {
|
||||||
loop {
|
loop {
|
||||||
{
|
{
|
||||||
let lock = wait_mutex.lock().unwrap();
|
let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
|
||||||
if deleting.load(AtomicOrdering::Acquire) {
|
if deleting.load(AtomicOrdering::Acquire) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -134,11 +133,12 @@ impl Worker {
|
|||||||
impl Drop for Worker {
|
impl Drop for Worker {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
trace!(target: "shutdown", "[IoWorker] Closing...");
|
trace!(target: "shutdown", "[IoWorker] Closing...");
|
||||||
let _ = self.wait_mutex.lock().unwrap();
|
let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex");
|
||||||
self.deleting.store(true, AtomicOrdering::Release);
|
self.deleting.store(true, AtomicOrdering::Release);
|
||||||
self.wait.notify_all();
|
self.wait.notify_all();
|
||||||
let thread = mem::replace(&mut self.thread, None).unwrap();
|
if let Some(thread) = self.thread.take() {
|
||||||
thread.join().ok();
|
thread.join().ok();
|
||||||
|
}
|
||||||
trace!(target: "shutdown", "[IoWorker] Closed");
|
trace!(target: "shutdown", "[IoWorker] Closed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -125,11 +125,11 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
|||||||
|
|
||||||
/// Writable IO handler. Called when the socket is ready to send.
|
/// Writable IO handler. Called when the socket is ready to send.
|
||||||
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
|
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
|
||||||
if self.send_queue.is_empty() {
|
|
||||||
return Ok(WriteStatus::Complete)
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
let buf = self.send_queue.front_mut().unwrap();
|
let buf = match self.send_queue.front_mut() {
|
||||||
|
Some(buf) => buf,
|
||||||
|
None => return Ok(WriteStatus::Complete),
|
||||||
|
};
|
||||||
let send_size = buf.get_ref().len();
|
let send_size = buf.get_ref().len();
|
||||||
let pos = buf.position() as usize;
|
let pos = buf.position() as usize;
|
||||||
if (pos as usize) >= send_size {
|
if (pos as usize) >= send_size {
|
||||||
@ -439,7 +439,7 @@ impl EncryptedConnection {
|
|||||||
let mut prev = H128::new();
|
let mut prev = H128::new();
|
||||||
mac.clone().finalize(&mut prev);
|
mac.clone().finalize(&mut prev);
|
||||||
let mut enc = H128::new();
|
let mut enc = H128::new();
|
||||||
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).unwrap();
|
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).expect("Error updating MAC");
|
||||||
mac_encoder.reset();
|
mac_encoder.reset();
|
||||||
|
|
||||||
enc = enc ^ if seed.is_empty() { prev } else { H128::from_slice(seed) };
|
enc = enc ^ if seed.is_empty() { prev } else { H128::from_slice(seed) };
|
||||||
|
@ -156,7 +156,7 @@ impl Discovery {
|
|||||||
trace!(target: "discovery", "Inserting {:?}", &e);
|
trace!(target: "discovery", "Inserting {:?}", &e);
|
||||||
let id_hash = e.id.sha3();
|
let id_hash = e.id.sha3();
|
||||||
let ping = {
|
let ping = {
|
||||||
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id_hash) as usize).unwrap();
|
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id_hash) as usize];
|
||||||
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
|
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
|
||||||
node.address = e.clone();
|
node.address = e.clone();
|
||||||
node.timeout = None;
|
node.timeout = None;
|
||||||
@ -169,8 +169,9 @@ impl Discovery {
|
|||||||
|
|
||||||
if bucket.nodes.len() > BUCKET_SIZE {
|
if bucket.nodes.len() > BUCKET_SIZE {
|
||||||
//ping least active node
|
//ping least active node
|
||||||
bucket.nodes.back_mut().unwrap().timeout = Some(time::precise_time_ns());
|
let mut last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0");
|
||||||
Some(bucket.nodes.back().unwrap().address.endpoint.clone())
|
last.timeout = Some(time::precise_time_ns());
|
||||||
|
Some(last.address.endpoint.clone())
|
||||||
} else { None }
|
} else { None }
|
||||||
};
|
};
|
||||||
if let Some(endpoint) = ping {
|
if let Some(endpoint) = ping {
|
||||||
@ -179,7 +180,7 @@ impl Discovery {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn clear_ping(&mut self, id: &NodeId) {
|
fn clear_ping(&mut self, id: &NodeId) {
|
||||||
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id.sha3()) as usize).unwrap();
|
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id.sha3()) as usize];
|
||||||
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
|
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
|
||||||
node.timeout = None;
|
node.timeout = None;
|
||||||
}
|
}
|
||||||
@ -294,7 +295,7 @@ impl Discovery {
|
|||||||
if count == BUCKET_SIZE {
|
if count == BUCKET_SIZE {
|
||||||
// delete the most distant element
|
// delete the most distant element
|
||||||
let remove = {
|
let remove = {
|
||||||
let (key, last) = found.iter_mut().next_back().unwrap();
|
let (key, last) = found.iter_mut().next_back().expect("Last element is always Some when count > 0");
|
||||||
last.pop();
|
last.pop();
|
||||||
if last.is_empty() { Some(key.clone()) } else { None }
|
if last.is_empty() { Some(key.clone()) } else { None }
|
||||||
};
|
};
|
||||||
@ -316,8 +317,7 @@ impl Discovery {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
|
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
|
||||||
while !self.send_queue.is_empty() {
|
while let Some(data) = self.send_queue.pop_front() {
|
||||||
let data = self.send_queue.pop_front().unwrap();
|
|
||||||
match self.udp_socket.send_to(&data.payload, &data.address) {
|
match self.udp_socket.send_to(&data.payload, &data.address) {
|
||||||
Ok(Some(size)) if size == data.payload.len() => {
|
Ok(Some(size)) if size == data.payload.len() => {
|
||||||
},
|
},
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -48,6 +48,8 @@ type Slab<T> = ::slab::Slab<T, usize>;
|
|||||||
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
|
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
|
||||||
const MAX_HANDSHAKES: usize = 1024;
|
const MAX_HANDSHAKES: usize = 1024;
|
||||||
|
|
||||||
|
const DEFAULT_PORT: u16 = 30303;
|
||||||
|
|
||||||
// Tokens
|
// Tokens
|
||||||
const TCP_ACCEPT: usize = SYS_TIMER + 1;
|
const TCP_ACCEPT: usize = SYS_TIMER + 1;
|
||||||
const IDLE: usize = SYS_TIMER + 2;
|
const IDLE: usize = SYS_TIMER + 2;
|
||||||
@ -135,14 +137,14 @@ impl NetworkConfiguration {
|
|||||||
/// Create new default configuration with sepcified listen port.
|
/// Create new default configuration with sepcified listen port.
|
||||||
pub fn new_with_port(port: u16) -> NetworkConfiguration {
|
pub fn new_with_port(port: u16) -> NetworkConfiguration {
|
||||||
let mut config = NetworkConfiguration::new();
|
let mut config = NetworkConfiguration::new();
|
||||||
config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap());
|
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
|
||||||
config
|
config
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
|
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
|
||||||
pub fn new_local() -> NetworkConfiguration {
|
pub fn new_local() -> NetworkConfiguration {
|
||||||
let mut config = NetworkConfiguration::new();
|
let mut config = NetworkConfiguration::new();
|
||||||
config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap());
|
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
|
||||||
config.nat_enabled = false;
|
config.nat_enabled = false;
|
||||||
config
|
config
|
||||||
}
|
}
|
||||||
@ -259,7 +261,7 @@ impl<'s> NetworkContext<'s> {
|
|||||||
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
|
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
|
||||||
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
|
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
|
||||||
assert!(self.session.is_some(), "Respond called without network context");
|
assert!(self.session.is_some(), "Respond called without network context");
|
||||||
self.send(self.session_id.unwrap(), packet_id, data)
|
self.session_id.map_or_else(|| Err(NetworkError::Expired), |id| self.send(id, packet_id, data))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an IoChannel.
|
/// Get an IoChannel.
|
||||||
@ -382,16 +384,16 @@ impl Host {
|
|||||||
trace!(target: "host", "Creating new Host object");
|
trace!(target: "host", "Creating new Host object");
|
||||||
|
|
||||||
let mut listen_address = match config.listen_address {
|
let mut listen_address = match config.listen_address {
|
||||||
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
|
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
|
||||||
Some(addr) => addr,
|
Some(addr) => addr,
|
||||||
};
|
};
|
||||||
|
|
||||||
let keys = if let Some(ref secret) = config.use_secret {
|
let keys = if let Some(ref secret) = config.use_secret {
|
||||||
KeyPair::from_secret(secret.clone()).unwrap()
|
try!(KeyPair::from_secret(secret.clone()))
|
||||||
} else {
|
} else {
|
||||||
config.config_path.clone().and_then(|ref p| load_key(Path::new(&p)))
|
config.config_path.clone().and_then(|ref p| load_key(Path::new(&p)))
|
||||||
.map_or_else(|| {
|
.map_or_else(|| {
|
||||||
let key = Random.generate().unwrap();
|
let key = Random.generate().expect("Error generating random key pair");
|
||||||
if let Some(path) = config.config_path.clone() {
|
if let Some(path) = config.config_path.clone() {
|
||||||
save_key(Path::new(&path), key.secret());
|
save_key(Path::new(&path), key.secret());
|
||||||
}
|
}
|
||||||
@ -488,7 +490,7 @@ impl Host {
|
|||||||
let mut s = e.lock();
|
let mut s = e.lock();
|
||||||
{
|
{
|
||||||
let id = s.id();
|
let id = s.id();
|
||||||
if id.is_some() && reserved.contains(id.unwrap()) {
|
if id.map_or(false, |id| reserved.contains(id)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -814,11 +816,12 @@ impl Host {
|
|||||||
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
|
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Check for the session limit. session_counts accounts for the new session.
|
||||||
if reserved_only ||
|
if reserved_only ||
|
||||||
(s.info.originated && session_count >= min_peers) ||
|
(s.info.originated && session_count > min_peers) ||
|
||||||
(!s.info.originated && session_count >= max_peers) {
|
(!s.info.originated && session_count > max_peers) {
|
||||||
// only proceed if the connecting peer is reserved.
|
// only proceed if the connecting peer is reserved.
|
||||||
if !self.reserved_nodes.read().contains(s.id().unwrap()) {
|
if !self.reserved_nodes.read().contains(s.id().expect("Ready session always has id")) {
|
||||||
s.disconnect(io, DisconnectReason::TooManyPeers);
|
s.disconnect(io, DisconnectReason::TooManyPeers);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -827,7 +830,7 @@ impl Host {
|
|||||||
// Add it to the node table
|
// Add it to the node table
|
||||||
if !s.info.originated {
|
if !s.info.originated {
|
||||||
if let Ok(address) = s.remote_addr() {
|
if let Ok(address) = s.remote_addr() {
|
||||||
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
|
let entry = NodeEntry { id: s.id().expect("Ready session always has id").clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
|
||||||
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
|
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
|
||||||
let mut discovery = self.discovery.lock();
|
let mut discovery = self.discovery.lock();
|
||||||
if let Some(ref mut discovery) = *discovery {
|
if let Some(ref mut discovery) = *discovery {
|
||||||
@ -861,17 +864,19 @@ impl Host {
|
|||||||
}
|
}
|
||||||
let handlers = self.handlers.read();
|
let handlers = self.handlers.read();
|
||||||
for p in ready_data {
|
for p in ready_data {
|
||||||
let h = handlers.get(&p).unwrap().clone();
|
|
||||||
self.stats.inc_sessions();
|
self.stats.inc_sessions();
|
||||||
let reserved = self.reserved_nodes.read();
|
let reserved = self.reserved_nodes.read();
|
||||||
|
if let Some(h) = handlers.get(&p).clone() {
|
||||||
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
|
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
for (p, packet_id, data) in packet_data {
|
for (p, packet_id, data) in packet_data {
|
||||||
let h = handlers.get(&p).unwrap().clone();
|
|
||||||
let reserved = self.reserved_nodes.read();
|
let reserved = self.reserved_nodes.read();
|
||||||
|
if let Some(h) = handlers.get(&p).clone() {
|
||||||
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
|
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
|
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
|
||||||
trace!(target: "network", "Connection timeout: {}", token);
|
trace!(target: "network", "Connection timeout: {}", token);
|
||||||
@ -909,10 +914,11 @@ impl Host {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for p in to_disconnect {
|
for p in to_disconnect {
|
||||||
let h = self.handlers.read().get(&p).unwrap().clone();
|
|
||||||
let reserved = self.reserved_nodes.read();
|
let reserved = self.reserved_nodes.read();
|
||||||
|
if let Some(h) = self.handlers.read().get(&p).clone() {
|
||||||
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
|
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if deregister {
|
if deregister {
|
||||||
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
|
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
|
||||||
}
|
}
|
||||||
@ -975,7 +981,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
let node_changes = { self.discovery.lock().as_mut().unwrap().readable(io) };
|
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) };
|
||||||
if let Some(node_changes) = node_changes {
|
if let Some(node_changes) = node_changes {
|
||||||
self.update_nodes(io, node_changes);
|
self.update_nodes(io, node_changes);
|
||||||
}
|
}
|
||||||
@ -992,7 +998,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
self.discovery.lock().as_mut().unwrap().writable(io);
|
self.discovery.lock().as_mut().map(|d| d.writable(io));
|
||||||
}
|
}
|
||||||
_ => panic!("Received unknown writable token"),
|
_ => panic!("Received unknown writable token"),
|
||||||
}
|
}
|
||||||
@ -1006,11 +1012,11 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
IDLE => self.maintain_network(io),
|
IDLE => self.maintain_network(io),
|
||||||
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
||||||
DISCOVERY_REFRESH => {
|
DISCOVERY_REFRESH => {
|
||||||
self.discovery.lock().as_mut().unwrap().refresh();
|
self.discovery.lock().as_mut().map(|d| d.refresh());
|
||||||
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
||||||
},
|
},
|
||||||
DISCOVERY_ROUND => {
|
DISCOVERY_ROUND => {
|
||||||
let node_changes = { self.discovery.lock().as_mut().unwrap().round() };
|
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.round()) };
|
||||||
if let Some(node_changes) = node_changes {
|
if let Some(node_changes) = node_changes {
|
||||||
self.update_nodes(io, node_changes);
|
self.update_nodes(io, node_changes);
|
||||||
}
|
}
|
||||||
@ -1102,7 +1108,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
|
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"),
|
||||||
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
|
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||||
_ => warn!("Unexpected stream registration")
|
_ => warn!("Unexpected stream registration")
|
||||||
}
|
}
|
||||||
@ -1130,7 +1136,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
|
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"),
|
||||||
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||||
_ => warn!("Unexpected stream update")
|
_ => warn!("Unexpected stream update")
|
||||||
}
|
}
|
||||||
@ -1200,7 +1206,7 @@ fn key_save_load() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn host_client_url() {
|
fn host_client_url() {
|
||||||
let mut config = NetworkConfiguration::new();
|
let mut config = NetworkConfiguration::new_local();
|
||||||
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".into();
|
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".into();
|
||||||
config.use_secret = Some(key);
|
config.use_secret = Some(key);
|
||||||
let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap();
|
let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap();
|
||||||
|
@ -280,7 +280,7 @@ impl NodeTable {
|
|||||||
json.push_str("\"nodes\": [\n");
|
json.push_str("\"nodes\": [\n");
|
||||||
let node_ids = self.nodes(AllowIP::All);
|
let node_ids = self.nodes(AllowIP::All);
|
||||||
for i in 0 .. node_ids.len() {
|
for i in 0 .. node_ids.len() {
|
||||||
let node = self.nodes.get(&node_ids[i]).unwrap();
|
let node = self.nodes.get(&node_ids[i]).expect("self.nodes() only returns node IDs from self.nodes");
|
||||||
json.push_str(&format!("\t{{ \"url\": \"{}\", \"failures\": {} }}{}\n", node, node.failures, if i == node_ids.len() - 1 {""} else {","}))
|
json.push_str(&format!("\t{{ \"url\": \"{}\", \"failures\": {} }}{}\n", node, node.failures, if i == node_ids.len() - 1 {""} else {","}))
|
||||||
}
|
}
|
||||||
json.push_str("]\n");
|
json.push_str("]\n");
|
||||||
|
@ -388,8 +388,9 @@ impl Session {
|
|||||||
Ok(SessionData::Continue)
|
Ok(SessionData::Continue)
|
||||||
},
|
},
|
||||||
PACKET_PONG => {
|
PACKET_PONG => {
|
||||||
self.pong_time_ns = Some(time::precise_time_ns());
|
let time = time::precise_time_ns();
|
||||||
self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000);
|
self.pong_time_ns = Some(time);
|
||||||
|
self.info.ping_ms = Some((time - self.ping_time_ns) / 1000_000);
|
||||||
Ok(SessionData::Continue)
|
Ok(SessionData::Continue)
|
||||||
},
|
},
|
||||||
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
||||||
|
Loading…
Reference in New Issue
Block a user