diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 746c745c4..44d429164 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -159,6 +159,11 @@ impl Connection { } } + /// Get socket token + pub fn token(&self) -> StreamToken { + self.token + } + /// Register this connection with the IO event loop. pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", reg); diff --git a/util/src/network/discovery.rs b/util/src/network/discovery.rs index 32370b88d..da81920ff 100644 --- a/util/src/network/discovery.rs +++ b/util/src/network/discovery.rs @@ -14,26 +14,34 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -// This module is a work in progress - -#![allow(dead_code)] //TODO: remove this after everything is done - -use std::collections::{HashSet, BTreeMap}; +use bytes::Bytes; +use std::net::SocketAddr; +use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; use std::cell::{RefCell}; use std::ops::{DerefMut}; +use std::mem; use mio::*; use mio::udp::*; use hash::*; use sha3::Hashable; use crypto::*; +use rlp::*; use network::node::*; +use network::error::NetworkError; +use io::StreamToken; -const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. -const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia]. -const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us). -const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover) -const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. -const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. +const ADDRESS_BYTES_SIZE: u32 = 32; // Size of address type in bytes. +const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia]. +const NODE_BINS: u32 = ADDRESS_BITS - 1; // Size of m_state (excludes root, which is us). +const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover) +const BUCKET_SIZE: u32 = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. +const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. +const MAX_DATAGRAM_SIZE: usize = 1280; + +const PACKET_PING: u8 = 1; +const PACKET_PONG: u8 = 2; +const PACKET_FIND_NODE: u8 = 3; +const PACKET_NEIGHBOURS: u8 = 4; struct NodeBucket { distance: u32, @@ -49,12 +57,25 @@ impl NodeBucket { } } -struct Discovery { +struct Datagramm { + payload: Bytes, + address: SocketAddr, +} + +pub struct Discovery { id: NodeId, + udp_socket: UdpSocket, + token: StreamToken, discovery_round: u16, discovery_id: NodeId, discovery_nodes: HashSet, node_buckets: Vec, + send_queue: VecDeque +} + +pub struct TableUpdates { + pub added: HashMap, + pub removed: HashSet, } struct FindNodePacket; @@ -72,13 +93,17 @@ impl FindNodePacket { } impl Discovery { - pub fn new(id: &NodeId) -> Discovery { + pub fn new(id: &NodeId, address: &SocketAddr, token: StreamToken) -> Discovery { + let socket = UdpSocket::bound(address).expect("Error binding UDP socket"); Discovery { id: id.clone(), + token: token, discovery_round: 0, discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), node_buckets: (0..NODE_BINS).map(NodeBucket::new).collect(), + udp_socket: socket, + send_queue: VecDeque::new(), } } @@ -151,17 +176,13 @@ impl Discovery { // if d is 0, then we roll look forward, if last, we reverse, else, spread from d if head > 1 && tail != LAST_BIN { - while head != tail && head < NODE_BINS && count < BUCKET_SIZE - { - for n in &buckets[head as usize].nodes - { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { - break; - } + while head != tail && head < NODE_BINS && count < BUCKET_SIZE { + for n in &buckets[head as usize].nodes { + if count < BUCKET_SIZE { + count += 1; + found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); + } + else { break } } if count < BUCKET_SIZE && tail != 0 { for n in &buckets[tail as usize].nodes { @@ -169,9 +190,7 @@ impl Discovery { count += 1; found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); } - else { - break; - } + else { break } } } @@ -184,13 +203,11 @@ impl Discovery { else if head < 2 { while head < NODE_BINS && count < BUCKET_SIZE { for n in &buckets[head as usize].nodes { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { - break; - } + if count < BUCKET_SIZE { + count += 1; + found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); + } + else { break } } head += 1; } @@ -198,13 +215,11 @@ impl Discovery { else { while tail > 0 && count < BUCKET_SIZE { for n in &buckets[tail as usize].nodes { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { - break; - } + if count < BUCKET_SIZE { + count += 1; + found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); + } + else { break } } tail -= 1; } @@ -220,4 +235,108 @@ impl Discovery { } ret } + + pub fn writable(&mut self) { + if self.send_queue.is_empty() { + return; + } + let data = self.send_queue.pop_front().unwrap(); + match self.udp_socket.send_to(&data.payload, &data.address) { + Ok(Some(size)) if size == data.payload.len() => { + }, + Ok(Some(size)) => { + warn!("UDP sent incomplete datagramm"); + }, + Ok(None) => { + self.send_queue.push_front(data); + } + Err(e) => { + warn!("UDP sent error: {:?}", e); + } + } + } + + fn send_to(&mut self, payload: Bytes, address: SocketAddr) { + self.send_queue.push_back(Datagramm { payload: payload, address: address }); + } + + pub fn readable(&mut self) -> Option { + let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() }; + match self.udp_socket.recv_from(&mut buf) { + Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| { + debug!("Error processing UDP packet: {:?}", e); + None + }), + Ok(_) => None, + Err(e) => { + warn!("Error reading UPD socket: {:?}", e); + None + } + } + } + + fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result, NetworkError> { + // validate packet + if packet.len() < 32 + 65 + 4 + 1 { + return Err(NetworkError::BadProtocol); + } + + let hash_signed = (&packet[32..]).sha3(); + if hash_signed[..] != packet[0..32] { + return Err(NetworkError::BadProtocol); + } + + let signed = &packet[(32 + 65)..]; + let signature = Signature::from_slice(&packet[32..(32 + 65)]); + let node_id = try!(ec::recover(&signature, &signed.sha3())); + + let packet_id = signed[0]; + let rlp = UntrustedRlp::new(&signed[1..]); + match packet_id { + PACKET_PING => self.on_ping(&rlp, &node_id, &from), + PACKET_PONG => self.on_pong(&rlp, &node_id, &from), + PACKET_FIND_NODE => self.on_find_node(&rlp, &node_id, &from), + PACKET_NEIGHBOURS => self.on_neighbours(&rlp, &node_id, &from), + _ => { + debug!("Unknown UDP packet: {}", packet_id); + Ok(None) + } + } + } + + fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + Ok(None) + } + + fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + Ok(None) + } + + fn on_find_node(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + Ok(None) + } + + fn on_neighbours(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + Ok(None) + } + + pub fn round(&mut self) { + } + + pub fn refresh(&mut self) { + } + + pub fn register_socket(&self, event_loop: &mut EventLoop) -> Result<(), NetworkError> { + event_loop.register(&self.udp_socket, Token(self.token), EventSet::all(), PollOpt::edge()).expect("Error registering UDP socket"); + Ok(()) + } + + pub fn update_registration(&self, event_loop: &mut EventLoop) -> Result<(), NetworkError> { + let mut registration = EventSet::readable(); + if !self.send_queue.is_empty() { + registration &= EventSet::writable(); + } + event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket"); + Ok(()) + } } diff --git a/util/src/network/error.rs b/util/src/network/error.rs index 4d7fb483e..eb97e54b6 100644 --- a/util/src/network/error.rs +++ b/util/src/network/error.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . use io::IoError; +use crypto::CryptoError; use rlp::*; #[derive(Debug, Copy, Clone)] @@ -61,3 +62,9 @@ impl From for NetworkError { } } +impl From for NetworkError { + fn from(_err: CryptoError) -> NetworkError { + NetworkError::Auth + } +} + diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index 4b23c4e16..94650b2a7 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -87,6 +87,16 @@ impl Handshake { }) } + /// Get id of the remote node if known + pub fn id(&self) -> &NodeId { + &self.id + } + + /// Get stream token id + pub fn token(&self) -> StreamToken { + self.connection.token() + } + /// Start a handhsake pub fn start(&mut self, io: &IoContext, host: &HostInfo, originated: bool) -> Result<(), UtilError> where Message: Send + Clone{ self.originated = originated; diff --git a/util/src/network/host.rs b/util/src/network/host.rs index c1423dbb3..2ad949642 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -22,7 +22,6 @@ use std::sync::*; use std::ops::*; use mio::*; use mio::tcp::*; -use mio::udp::*; use target_info::Target; use hash::*; use crypto::*; @@ -37,6 +36,8 @@ use network::node::*; use network::stats::NetworkStats; use network::error::DisconnectReason; use igd::{PortMappingProtocol,search_gateway}; +use network::discovery::{Discovery, TableUpdates}; +use network::node_table::NodeTable; type Slab = ::slab::Slab; @@ -50,6 +51,8 @@ const MAINTENANCE_TIMEOUT: u64 = 1000; #[derive(Debug)] /// Network service configuration pub struct NetworkConfiguration { + /// Directory path to store network configuration. None means nothing will be saved + pub config_path: Option, /// IP address to listen for incoming connections pub listen_address: SocketAddr, /// IP address to advertise @@ -70,6 +73,7 @@ impl NetworkConfiguration { /// Create a new instance of default settings. pub fn new() -> NetworkConfiguration { NetworkConfiguration { + config_path: None, listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), nat_enabled: true, @@ -114,6 +118,7 @@ impl NetworkConfiguration { } NetworkConfiguration { + config_path: self.config_path, listen_address: listen, public_address: public, nat_enabled: false, @@ -126,14 +131,12 @@ impl NetworkConfiguration { } // Tokens -//const TOKEN_BEGIN: usize = USER_TOKEN_START; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12) -const TOKEN_BEGIN: usize = 32; -const TCP_ACCEPT: usize = TOKEN_BEGIN + 1; -const IDLE: usize = TOKEN_BEGIN + 2; -const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 3; -const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 4; -const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 5; -const FIRST_CONNECTION: usize = TOKEN_BEGIN + 16; +const TCP_ACCEPT: usize = MAX_CONNECTIONS + 1; +const IDLE: usize = MAX_CONNECTIONS + 2; +const DISCOVERY: usize = MAX_CONNECTIONS + 3; +const DISCOVERY_REFRESH: usize = MAX_CONNECTIONS + 4; +const DISCOVERY_ROUND: usize = MAX_CONNECTIONS + 5; +const FIRST_CONNECTION: usize = 0; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; /// Protocol handler level packet id @@ -320,10 +323,10 @@ struct ProtocolTimer { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host where Message: Send + Sync + Clone { pub info: RwLock, - udp_socket: Mutex, tcp_listener: Mutex, connections: Arc>>, - nodes: RwLock>, + discovery: Mutex, + nodes: RwLock, handlers: RwLock>>>, timers: RwLock>, timer_counter: RwLock, @@ -338,10 +341,12 @@ impl Host where Message: Send + Sync + Clone { let addr = config.listen_address; // Setup the server socket let tcp_listener = TcpListener::bind(&addr).unwrap(); - let udp_socket = UdpSocket::bound(&addr).unwrap(); + let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() }; + let public = keys.public().clone(); + let path = config.config_path.clone(); let mut host = Host:: { info: RwLock::new(HostInfo { - keys: if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() }, + keys: keys, config: config, nonce: H256::random(), protocol_version: 4, @@ -349,10 +354,10 @@ impl Host where Message: Send + Sync + Clone { listen_port: 0, capabilities: Vec::new(), }), - udp_socket: Mutex::new(udp_socket), + discovery: Mutex::new(Discovery::new(&public, &addr, DISCOVERY)), tcp_listener: Mutex::new(tcp_listener), connections: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS))), - nodes: RwLock::new(HashMap::new()), + nodes: RwLock::new(NodeTable::new(path)), handlers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(LAST_CONNECTION + 1), @@ -361,12 +366,6 @@ impl Host where Message: Send + Sync + Clone { let port = host.info.read().unwrap().config.listen_address.port(); host.info.write().unwrap().deref_mut().listen_port = port; - /* - match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { - Some(iface) => config.public_address = iface.addr.unwrap(), - None => warn!("No public network interface"), - */ - let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); for n in boot_nodes { host.add_node(&n); @@ -382,7 +381,7 @@ impl Host where Message: Send + Sync + Clone { match Node::from_str(id) { Err(e) => { warn!("Could not add node: {:?}", e); }, Ok(n) => { - self.nodes.write().unwrap().insert(n.id.clone(), n); + self.nodes.write().unwrap().add_node(n); } } } @@ -430,12 +429,9 @@ impl Host where Message: Send + Sync + Clone { } let mut to_connect: Vec = Vec::new(); - let mut req_conn = 0; - //TODO: use nodes from discovery here - //for n in self.node_buckets.iter().flat_map(|n| &n.nodes).map(|id| NodeInfo { id: id.clone(), peer_type: self.nodes.get(id).unwrap().peer_type}) { let pin = self.info.read().unwrap().deref().config.pin; - for n in self.nodes.read().unwrap().values().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { + for n in self.nodes.read().unwrap().nodes().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { let connected = self.have_session(&n.id) || self.connecting_to(&n.id); let required = n.peer_type == PeerType::Required; if connected && required { @@ -685,15 +681,39 @@ impl Host where Message: Send + Sync + Clone { h.disconnected(&NetworkContext::new(io, p, Some(token), self.connections.clone()), &token); } } + + fn update_nodes(&self, io: &IoContext>, node_changes: TableUpdates) { + let connections = self.connections.write().unwrap(); + let mut to_remove: Vec = Vec::new(); + for c in connections.iter() { + match *c.lock().unwrap().deref_mut() { + ConnectionEntry::Handshake(ref h) => { + if node_changes.removed.contains(&h.id) { + to_remove.push(h.token()); + } + } + ConnectionEntry::Session(ref s) => { + if node_changes.removed.contains(&s.id()) { + to_remove.push(s.token()); + } + } + } + } + for i in to_remove { + self.kill_connection(i, io); + } + self.nodes.write().unwrap().update(node_changes); + } } impl IoHandler> for Host where Message: Send + Sync + Clone + 'static { /// Initialize networking fn initialize(&self, io: &IoContext>) { io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener"); - io.register_stream(NODETABLE_RECEIVE).expect("Error registering UDP listener"); + io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(IDLE, MAINTENANCE_TIMEOUT).expect("Error registering Network idle timer"); - //io.register_timer(NODETABLE_MAINTAIN, 7200); + io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); + io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); } fn stream_hup(&self, io: &IoContext>, stream: StreamToken) { @@ -707,7 +727,11 @@ impl IoHandler> for Host where Messa fn stream_readable(&self, io: &IoContext>, stream: StreamToken) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io), - NODETABLE_RECEIVE => {}, + DISCOVERY => { + if let Some(node_changes) = self.discovery.lock().unwrap().readable() { + self.update_nodes(io, node_changes); + } + }, TCP_ACCEPT => self.accept(io), _ => panic!("Received unknown readable token"), } @@ -716,7 +740,7 @@ impl IoHandler> for Host where Messa fn stream_writable(&self, io: &IoContext>, stream: StreamToken) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), - NODETABLE_RECEIVE => {}, + DISCOVERY => self.discovery.lock().unwrap().writable(), _ => panic!("Received unknown writable token"), } } @@ -725,8 +749,12 @@ impl IoHandler> for Host where Messa match token { IDLE => self.maintain_network(io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), - NODETABLE_DISCOVERY => {}, - NODETABLE_MAINTAIN => {}, + DISCOVERY_REFRESH => { + self.discovery.lock().unwrap().refresh(); + }, + DISCOVERY_ROUND => { + self.discovery.lock().unwrap().round(); + }, _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { None => { warn!(target: "net", "No handler found for protocol: {:?}", timer.protocol) }, @@ -794,7 +822,7 @@ impl IoHandler> for Host where Messa } } else {} // expired } - NODETABLE_RECEIVE => event_loop.register(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + DISCOVERY => self.discovery.lock().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } @@ -812,7 +840,7 @@ impl IoHandler> for Host where Messa connections.remove(stream); } }, - NODETABLE_RECEIVE => event_loop.deregister(self.udp_socket.lock().unwrap().deref()).unwrap(), + DISCOVERY => (), TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(), _ => warn!("Unexpected stream deregistration") } @@ -828,7 +856,7 @@ impl IoHandler> for Host where Messa } } else {} // expired } - NODETABLE_RECEIVE => event_loop.reregister(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + DISCOVERY => self.discovery.lock().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 6b58c87eb..e5465c952 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -72,6 +72,7 @@ mod discovery; mod service; mod error; mod node; +mod node_table; mod stats; #[cfg(test)] diff --git a/util/src/network/node_table.rs b/util/src/network/node_table.rs new file mode 100644 index 000000000..0f1c2c5ad --- /dev/null +++ b/util/src/network/node_table.rs @@ -0,0 +1,52 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::collections::HashMap; +use std::collections::hash_map::Values; +use network::node::*; +use network::discovery::TableUpdates; + +pub struct NodeTable { + nodes: HashMap +} + +impl NodeTable { + pub fn new(_path: Option) -> NodeTable { + NodeTable { + nodes: HashMap::new() + } + } + + pub fn add_node(&mut self, node: Node) { + self.nodes.insert(node.id.clone(), node); + } + + pub fn nodes(&self) -> Values { + self.nodes.values() + } + + pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut Node> { + self.nodes.get_mut(id) + } + + pub fn update(&mut self, mut update: TableUpdates) { + self.nodes.extend(update.added.drain()); + for r in update.removed { + self.nodes.remove(&r); + } + } + +} diff --git a/util/src/network/session.rs b/util/src/network/session.rs index b38807c49..19e2cf08e 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -129,6 +129,11 @@ impl Session { Ok(session) } + /// Get id of the remote peer + pub fn id(&self) -> &NodeId { + &self.info.id + } + /// Check if session is ready to send/receive data pub fn is_ready(&self) -> bool { self.had_hello