devp2p: Move UDP socket handling from Discovery to Host. (#8790)
* devp2p: Move UDP socket handling from Discovery to Host. * devp2p: Fix bug with potentially incorrect UDP registration. This works right now because the Host handler happens to be the first one registered on the IoService. * devp2p: Use 0-initialized memory buffer instead of unsafe. * Remove send_queue field from public interface of Discovery. * Rename Datagramm to Datagram. sed -i 's/Datagramm/Datagram/g' util/network-devp2p/src/discovery.rs util/network-devp2p/src/host.rs sed -i 's/datagramm/datagram/g' util/network-devp2p/src/discovery.rs util/network-devp2p/src/host.rs * Include target in log statements.
This commit is contained in:
parent
1f39a1bd76
commit
13bc922e54
@ -17,18 +17,13 @@
|
|||||||
use ethcore_bytes::Bytes;
|
use ethcore_bytes::Bytes;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::collections::{HashSet, HashMap, VecDeque};
|
use std::collections::{HashSet, HashMap, VecDeque};
|
||||||
use std::mem;
|
|
||||||
use std::default::Default;
|
use std::default::Default;
|
||||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
use mio::*;
|
|
||||||
use mio::deprecated::{Handler, EventLoop};
|
|
||||||
use mio::udp::*;
|
|
||||||
use hash::keccak;
|
use hash::keccak;
|
||||||
use ethereum_types::{H256, H520};
|
use ethereum_types::{H256, H520};
|
||||||
use rlp::{Rlp, RlpStream, encode_list};
|
use rlp::{Rlp, RlpStream, encode_list};
|
||||||
use node_table::*;
|
use node_table::*;
|
||||||
use network::{Error, ErrorKind};
|
use network::{Error, ErrorKind};
|
||||||
use io::{StreamToken, IoContext};
|
|
||||||
use ethkey::{Secret, KeyPair, sign, recover};
|
use ethkey::{Secret, KeyPair, sign, recover};
|
||||||
use network::IpFilter;
|
use network::IpFilter;
|
||||||
|
|
||||||
@ -39,7 +34,7 @@ const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademl
|
|||||||
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
|
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
|
||||||
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
|
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
|
||||||
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
|
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
|
||||||
const MAX_DATAGRAM_SIZE: usize = 1280;
|
pub const MAX_DATAGRAM_SIZE: usize = 1280;
|
||||||
|
|
||||||
const PACKET_PING: u8 = 1;
|
const PACKET_PING: u8 = 1;
|
||||||
const PACKET_PONG: u8 = 2;
|
const PACKET_PONG: u8 = 2;
|
||||||
@ -79,9 +74,9 @@ impl NodeBucket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Datagramm {
|
pub struct Datagram {
|
||||||
payload: Bytes,
|
pub payload: Bytes,
|
||||||
address: SocketAddr,
|
pub address: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Discovery {
|
pub struct Discovery {
|
||||||
@ -89,13 +84,11 @@ pub struct Discovery {
|
|||||||
id_hash: H256,
|
id_hash: H256,
|
||||||
secret: Secret,
|
secret: Secret,
|
||||||
public_endpoint: NodeEndpoint,
|
public_endpoint: NodeEndpoint,
|
||||||
udp_socket: UdpSocket,
|
|
||||||
token: StreamToken,
|
|
||||||
discovery_round: u16,
|
discovery_round: u16,
|
||||||
discovery_id: NodeId,
|
discovery_id: NodeId,
|
||||||
discovery_nodes: HashSet<NodeId>,
|
discovery_nodes: HashSet<NodeId>,
|
||||||
node_buckets: Vec<NodeBucket>,
|
node_buckets: Vec<NodeBucket>,
|
||||||
send_queue: VecDeque<Datagramm>,
|
send_queue: VecDeque<Datagram>,
|
||||||
check_timestamps: bool,
|
check_timestamps: bool,
|
||||||
adding_nodes: Vec<NodeEntry>,
|
adding_nodes: Vec<NodeEntry>,
|
||||||
ip_filter: IpFilter,
|
ip_filter: IpFilter,
|
||||||
@ -107,19 +100,16 @@ pub struct TableUpdates {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Discovery {
|
impl Discovery {
|
||||||
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, ip_filter: IpFilter) -> Discovery {
|
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
|
||||||
let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket");
|
|
||||||
Discovery {
|
Discovery {
|
||||||
id: key.public().clone(),
|
id: key.public().clone(),
|
||||||
id_hash: keccak(key.public()),
|
id_hash: keccak(key.public()),
|
||||||
secret: key.secret().clone(),
|
secret: key.secret().clone(),
|
||||||
public_endpoint: public,
|
public_endpoint: public,
|
||||||
token: token,
|
|
||||||
discovery_round: 0,
|
discovery_round: 0,
|
||||||
discovery_id: NodeId::new(),
|
discovery_id: NodeId::new(),
|
||||||
discovery_nodes: HashSet::new(),
|
discovery_nodes: HashSet::new(),
|
||||||
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
|
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
|
||||||
udp_socket: socket,
|
|
||||||
send_queue: VecDeque::new(),
|
send_queue: VecDeque::new(),
|
||||||
check_timestamps: true,
|
check_timestamps: true,
|
||||||
adding_nodes: Vec::new(),
|
adding_nodes: Vec::new(),
|
||||||
@ -352,53 +342,12 @@ impl Discovery {
|
|||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
|
|
||||||
while let Some(data) = self.send_queue.pop_front() {
|
|
||||||
match self.udp_socket.send_to(&data.payload, &data.address) {
|
|
||||||
Ok(Some(size)) if size == data.payload.len() => {
|
|
||||||
},
|
|
||||||
Ok(Some(_)) => {
|
|
||||||
warn!("UDP sent incomplete datagramm");
|
|
||||||
},
|
|
||||||
Ok(None) => {
|
|
||||||
self.send_queue.push_front(data);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
debug!("UDP send error: {:?}, address: {:?}", e, &data.address);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
|
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
|
||||||
self.send_queue.push_back(Datagramm { payload: payload, address: address });
|
self.send_queue.push_back(Datagram { payload: payload, address: address });
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone {
|
|
||||||
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
|
|
||||||
let writable = !self.send_queue.is_empty();
|
|
||||||
let res = match self.udp_socket.recv_from(&mut buf) {
|
|
||||||
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
|
|
||||||
debug!("Error processing UDP packet: {:?}", e);
|
|
||||||
None
|
|
||||||
}),
|
|
||||||
Ok(_) => None,
|
|
||||||
Err(e) => {
|
|
||||||
debug!("Error reading UPD socket: {:?}", e);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let new_writable = !self.send_queue.is_empty();
|
|
||||||
if writable != new_writable {
|
|
||||||
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
|
||||||
}
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
|
pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
|
||||||
// validate packet
|
// validate packet
|
||||||
if packet.len() < 32 + 65 + 4 + 1 {
|
if packet.len() < 32 + 65 + 4 + 1 {
|
||||||
return Err(ErrorKind::BadProtocol.into());
|
return Err(ErrorKind::BadProtocol.into());
|
||||||
@ -571,19 +520,16 @@ impl Discovery {
|
|||||||
self.start();
|
self.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
|
pub fn any_sends_queued(&self) -> bool {
|
||||||
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
|
!self.send_queue.is_empty()
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
|
pub fn dequeue_send(&mut self) -> Option<Datagram> {
|
||||||
let registration = if !self.send_queue.is_empty() {
|
self.send_queue.pop_front()
|
||||||
Ready::readable() | Ready::writable()
|
}
|
||||||
} else {
|
|
||||||
Ready::readable()
|
pub fn requeue_send(&mut self, datagram: Datagram) {
|
||||||
};
|
self.send_queue.push_front(datagram)
|
||||||
event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket");
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -620,8 +566,8 @@ mod tests {
|
|||||||
let key2 = Random.generate().unwrap();
|
let key2 = Random.generate().unwrap();
|
||||||
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 };
|
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 };
|
||||||
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
|
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
|
||||||
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
|
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
|
||||||
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
|
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());
|
||||||
|
|
||||||
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
|
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
|
||||||
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
|
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
|
||||||
@ -632,16 +578,14 @@ mod tests {
|
|||||||
discovery2.refresh();
|
discovery2.refresh();
|
||||||
|
|
||||||
for _ in 0 .. 10 {
|
for _ in 0 .. 10 {
|
||||||
while !discovery1.send_queue.is_empty() {
|
while let Some(datagram) = discovery1.dequeue_send() {
|
||||||
let datagramm = discovery1.send_queue.pop_front().unwrap();
|
if datagram.address == ep2.address {
|
||||||
if datagramm.address == ep2.address {
|
discovery2.on_packet(&datagram.payload, ep1.address.clone()).ok();
|
||||||
discovery2.on_packet(&datagramm.payload, ep1.address.clone()).ok();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while !discovery2.send_queue.is_empty() {
|
while let Some(datagram) = discovery2.dequeue_send() {
|
||||||
let datagramm = discovery2.send_queue.pop_front().unwrap();
|
if datagram.address == ep1.address {
|
||||||
if datagramm.address == ep1.address {
|
discovery1.on_packet(&datagram.payload, ep2.address.clone()).ok();
|
||||||
discovery1.on_packet(&datagramm.payload, ep2.address.clone()).ok();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
discovery2.round();
|
discovery2.round();
|
||||||
@ -653,7 +597,7 @@ mod tests {
|
|||||||
fn removes_expired() {
|
fn removes_expired() {
|
||||||
let key = Random.generate().unwrap();
|
let key = Random.generate().unwrap();
|
||||||
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
|
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
|
||||||
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
|
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
|
||||||
for _ in 0..1200 {
|
for _ in 0..1200 {
|
||||||
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
|
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
|
||||||
}
|
}
|
||||||
@ -668,7 +612,7 @@ mod tests {
|
|||||||
|
|
||||||
let key = Random.generate().unwrap();
|
let key = Random.generate().unwrap();
|
||||||
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
|
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
|
||||||
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
|
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
|
||||||
|
|
||||||
for _ in 0..(16 + 10) {
|
for _ in 0..(16 + 10) {
|
||||||
discovery.node_buckets[0].nodes.push_back(BucketEntry {
|
discovery.node_buckets[0].nodes.push_back(BucketEntry {
|
||||||
@ -728,7 +672,7 @@ mod tests {
|
|||||||
let key = Secret::from_str(secret_hex)
|
let key = Secret::from_str(secret_hex)
|
||||||
.and_then(|secret| KeyPair::from_secret(secret))
|
.and_then(|secret| KeyPair::from_secret(secret))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
|
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
|
||||||
|
|
||||||
node_entries.iter().for_each(|entry| discovery.update_node(entry.clone()));
|
node_entries.iter().for_each(|entry| discovery.update_node(entry.clone()));
|
||||||
|
|
||||||
@ -773,7 +717,7 @@ mod tests {
|
|||||||
fn packets() {
|
fn packets() {
|
||||||
let key = Random.generate().unwrap();
|
let key = Random.generate().unwrap();
|
||||||
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 };
|
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 };
|
||||||
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
|
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
|
||||||
discovery.check_timestamps = false;
|
discovery.check_timestamps = false;
|
||||||
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();
|
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();
|
||||||
|
|
||||||
@ -840,13 +784,13 @@ mod tests {
|
|||||||
let key2 = Random.generate().unwrap();
|
let key2 = Random.generate().unwrap();
|
||||||
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 };
|
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 };
|
||||||
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 };
|
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 };
|
||||||
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default());
|
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
|
||||||
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default());
|
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());
|
||||||
|
|
||||||
discovery1.ping(&ep2);
|
discovery1.ping(&ep2);
|
||||||
let ping_data = discovery1.send_queue.pop_front().unwrap();
|
let ping_data = discovery1.dequeue_send().unwrap();
|
||||||
discovery2.on_packet(&ping_data.payload, ep1.address.clone()).ok();
|
discovery2.on_packet(&ping_data.payload, ep1.address.clone()).ok();
|
||||||
let pong_data = discovery2.send_queue.pop_front().unwrap();
|
let pong_data = discovery2.dequeue_send().unwrap();
|
||||||
let data = &pong_data.payload[(32 + 65)..];
|
let data = &pong_data.payload[(32 + 65)..];
|
||||||
let rlp = Rlp::new(&data[1..]);
|
let rlp = Rlp::new(&data[1..]);
|
||||||
assert_eq!(ping_data.payload[0..32], rlp.val_at::<Vec<u8>>(1).unwrap()[..])
|
assert_eq!(ping_data.payload[0..32], rlp.val_at::<Vec<u8>>(1).unwrap()[..])
|
||||||
|
@ -30,6 +30,7 @@ use hash::keccak;
|
|||||||
use mio::*;
|
use mio::*;
|
||||||
use mio::deprecated::{EventLoop};
|
use mio::deprecated::{EventLoop};
|
||||||
use mio::tcp::*;
|
use mio::tcp::*;
|
||||||
|
use mio::udp::*;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use rlp::{RlpStream, Encodable};
|
use rlp::{RlpStream, Encodable};
|
||||||
|
|
||||||
@ -40,7 +41,7 @@ use node_table::*;
|
|||||||
use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId};
|
use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId};
|
||||||
use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait};
|
use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait};
|
||||||
use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler};
|
use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler};
|
||||||
use discovery::{Discovery, TableUpdates, NodeEntry};
|
use discovery::{Discovery, TableUpdates, NodeEntry, MAX_DATAGRAM_SIZE};
|
||||||
use ip_utils::{map_external_address, select_public_address};
|
use ip_utils::{map_external_address, select_public_address};
|
||||||
use path::restrict_permissions_owner;
|
use path::restrict_permissions_owner;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
@ -239,6 +240,7 @@ struct ProtocolTimer {
|
|||||||
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
|
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
|
||||||
pub struct Host {
|
pub struct Host {
|
||||||
pub info: RwLock<HostInfo>,
|
pub info: RwLock<HostInfo>,
|
||||||
|
udp_socket: Mutex<Option<UdpSocket>>,
|
||||||
tcp_listener: Mutex<TcpListener>,
|
tcp_listener: Mutex<TcpListener>,
|
||||||
sessions: Arc<RwLock<Slab<SharedSession>>>,
|
sessions: Arc<RwLock<Slab<SharedSession>>>,
|
||||||
discovery: Mutex<Option<Discovery>>,
|
discovery: Mutex<Option<Discovery>>,
|
||||||
@ -295,6 +297,7 @@ impl Host {
|
|||||||
local_endpoint: local_endpoint,
|
local_endpoint: local_endpoint,
|
||||||
}),
|
}),
|
||||||
discovery: Mutex::new(None),
|
discovery: Mutex::new(None),
|
||||||
|
udp_socket: Mutex::new(None),
|
||||||
tcp_listener: Mutex::new(tcp_listener),
|
tcp_listener: Mutex::new(tcp_listener),
|
||||||
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
|
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
|
||||||
nodes: RwLock::new(NodeTable::new(path)),
|
nodes: RwLock::new(NodeTable::new(path)),
|
||||||
@ -458,13 +461,16 @@ impl Host {
|
|||||||
let discovery = {
|
let discovery = {
|
||||||
let info = self.info.read();
|
let info = self.info.read();
|
||||||
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
|
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
|
||||||
let mut udp_addr = local_endpoint.address.clone();
|
Some(Discovery::new(&info.keys, public_endpoint, allow_ips))
|
||||||
udp_addr.set_port(local_endpoint.udp_port);
|
|
||||||
Some(Discovery::new(&info.keys, udp_addr, public_endpoint, DISCOVERY, allow_ips))
|
|
||||||
} else { None }
|
} else { None }
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(mut discovery) = discovery {
|
if let Some(mut discovery) = discovery {
|
||||||
|
let mut udp_addr = local_endpoint.address;
|
||||||
|
udp_addr.set_port(local_endpoint.udp_port);
|
||||||
|
let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket");
|
||||||
|
*self.udp_socket.lock() = Some(socket);
|
||||||
|
|
||||||
discovery.init_node_list(self.nodes.read().entries());
|
discovery.init_node_list(self.nodes.read().entries());
|
||||||
discovery.add_node_list(self.nodes.read().entries());
|
discovery.add_node_list(self.nodes.read().entries());
|
||||||
*self.discovery.lock() = Some(discovery);
|
*self.discovery.lock() = Some(discovery);
|
||||||
@ -819,6 +825,67 @@ impl Host {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn discovery_readable(&self, io: &IoContext<NetworkIoMessage>) {
|
||||||
|
let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) {
|
||||||
|
(Some(udp_socket), Some(discovery)) => {
|
||||||
|
let mut buf = [0u8; MAX_DATAGRAM_SIZE];
|
||||||
|
let writable = discovery.any_sends_queued();
|
||||||
|
let res = match udp_socket.recv_from(&mut buf) {
|
||||||
|
Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
|
||||||
|
debug!(target: "network", "Error processing UDP packet: {:?}", e);
|
||||||
|
None
|
||||||
|
}),
|
||||||
|
Ok(_) => None,
|
||||||
|
Err(e) => {
|
||||||
|
debug!(target: "network", "Error reading UPD socket: {:?}", e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let new_writable = discovery.any_sends_queued();
|
||||||
|
if writable != new_writable {
|
||||||
|
io.update_registration(DISCOVERY)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
debug!(target: "network" ,"Error updating discovery registration: {:?}", e)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
res
|
||||||
|
},
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
if let Some(node_changes) = node_changes {
|
||||||
|
self.update_nodes(io, node_changes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn discovery_writable(&self, io: &IoContext<NetworkIoMessage>) {
|
||||||
|
match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) {
|
||||||
|
(Some(udp_socket), Some(discovery)) => {
|
||||||
|
while let Some(data) = discovery.dequeue_send() {
|
||||||
|
match udp_socket.send_to(&data.payload, &data.address) {
|
||||||
|
Ok(Some(size)) if size == data.payload.len() => {
|
||||||
|
},
|
||||||
|
Ok(Some(_)) => {
|
||||||
|
warn!(target: "network", "UDP sent incomplete datagram");
|
||||||
|
},
|
||||||
|
Ok(None) => {
|
||||||
|
discovery.requeue_send(data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!(target: "network", "UDP send error: {:?}, address: {:?}", e, &data.address);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
io.update_registration(DISCOVERY)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
debug!(target: "network", "Error updating discovery registration: {:?}", e)
|
||||||
|
});
|
||||||
|
},
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
|
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
|
||||||
trace!(target: "network", "Connection timeout: {}", token);
|
trace!(target: "network", "Connection timeout: {}", token);
|
||||||
self.kill_connection(token, io, true)
|
self.kill_connection(token, io, true)
|
||||||
@ -920,12 +987,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
}
|
}
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => self.discovery_readable(io),
|
||||||
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) };
|
|
||||||
if let Some(node_changes) = node_changes {
|
|
||||||
self.update_nodes(io, node_changes);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
TCP_ACCEPT => self.accept(io),
|
TCP_ACCEPT => self.accept(io),
|
||||||
_ => panic!("Received unknown readable token"),
|
_ => panic!("Received unknown readable token"),
|
||||||
}
|
}
|
||||||
@ -937,9 +999,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
}
|
}
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => self.discovery_writable(io),
|
||||||
self.discovery.lock().as_mut().map(|d| d.writable(io));
|
|
||||||
}
|
|
||||||
_ => panic!("Received unknown writable token"),
|
_ => panic!("Received unknown writable token"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1055,7 +1115,13 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
|
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"),
|
DISCOVERY => match self.udp_socket.lock().as_ref() {
|
||||||
|
Some(udp_socket) => {
|
||||||
|
event_loop.register(udp_socket, reg, Ready::all(), PollOpt::edge())
|
||||||
|
.expect("Error registering UDP socket");
|
||||||
|
},
|
||||||
|
_ => panic!("Error registering discovery socket"),
|
||||||
|
}
|
||||||
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
|
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||||
_ => warn!("Unexpected stream registration")
|
_ => warn!("Unexpected stream registration")
|
||||||
}
|
}
|
||||||
@ -1086,7 +1152,18 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
|
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"),
|
DISCOVERY => match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_ref()) {
|
||||||
|
(Some(udp_socket), Some(discovery)) => {
|
||||||
|
let registration = if discovery.any_sends_queued() {
|
||||||
|
Ready::readable() | Ready::writable()
|
||||||
|
} else {
|
||||||
|
Ready::readable()
|
||||||
|
};
|
||||||
|
event_loop.reregister(udp_socket, reg, registration, PollOpt::edge())
|
||||||
|
.expect("Error reregistering UDP socket");
|
||||||
|
},
|
||||||
|
_ => panic!("Error reregistering discovery socket"),
|
||||||
|
}
|
||||||
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||||
_ => warn!("Unexpected stream update")
|
_ => warn!("Unexpected stream update")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user