diff --git a/util/src/bytes.rs b/util/src/bytes.rs index 5ad2660e8..4923e6eb4 100644 --- a/util/src/bytes.rs +++ b/util/src/bytes.rs @@ -170,28 +170,8 @@ pub trait BytesConvertable { fn to_bytes(&self) -> Bytes { self.as_slice().to_vec() } } -impl<'a> BytesConvertable for &'a [u8] { - fn bytes(&self) -> &[u8] { self } -} - -impl BytesConvertable for Vec { - fn bytes(&self) -> &[u8] { self } -} - -macro_rules! impl_bytes_convertable_for_array { - ($zero: expr) => (); - ($len: expr, $($idx: expr),*) => { - impl BytesConvertable for [u8; $len] { - fn bytes(&self) -> &[u8] { self } - } - impl_bytes_convertable_for_array! { $($idx),* } - } -} - -// -1 at the end is not expanded -impl_bytes_convertable_for_array! { - 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, - 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1 +impl BytesConvertable for T where T: Deref { + fn bytes(&self) -> &[u8] { self.deref() } } #[test] diff --git a/util/src/hash.rs b/util/src/hash.rs index 75c39720e..c678d13a7 100644 --- a/util/src/hash.rs +++ b/util/src/hash.rs @@ -77,12 +77,6 @@ macro_rules! impl_hash { /// Unformatted binary data of fixed length. pub struct $from (pub [u8; $size]); - impl BytesConvertable for $from { - fn bytes(&self) -> &[u8] { - &self.0 - } - } - impl Deref for $from { type Target = [u8]; diff --git a/util/src/lib.rs b/util/src/lib.rs index bdd595014..05162bca7 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -19,6 +19,7 @@ #![feature(augmented_assignments)] #![feature(associated_consts)] #![feature(plugin)] +#![feature(ip)] #![plugin(clippy)] #![allow(needless_range_loop, match_bool)] #![feature(catch_panic)] diff --git a/util/src/network/discovery.rs b/util/src/network/discovery.rs index da81920ff..a214f5278 100644 --- a/util/src/network/discovery.rs +++ b/util/src/network/discovery.rs @@ -17,24 +17,26 @@ use bytes::Bytes; use std::net::SocketAddr; use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; -use std::cell::{RefCell}; -use std::ops::{DerefMut}; use std::mem; +use std::cmp; use mio::*; use mio::udp::*; +use sha3::*; +use time; use hash::*; -use sha3::Hashable; use crypto::*; use rlp::*; use network::node::*; use network::error::NetworkError; use io::StreamToken; +use network::PROTOCOL_VERSION; + const ADDRESS_BYTES_SIZE: u32 = 32; // Size of address type in bytes. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia]. const NODE_BINS: u32 = ADDRESS_BITS - 1; // Size of m_state (excludes root, which is us). const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover) -const BUCKET_SIZE: u32 = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. +const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. const MAX_DATAGRAM_SIZE: usize = 1280; @@ -43,16 +45,27 @@ const PACKET_PONG: u8 = 2; const PACKET_FIND_NODE: u8 = 3; const PACKET_NEIGHBOURS: u8 = 4; +const PING_TIMEOUT_MS: u64 = 300; + +#[derive(Clone, Debug)] +pub struct NodeEntry { + pub id: NodeId, + pub endpoint: NodeEndpoint, +} + +pub struct BucketEntry { + pub address: NodeEntry, + pub timeout: Option, +} + struct NodeBucket { - distance: u32, - nodes: Vec + nodes: VecDeque, //sorted by last active } impl NodeBucket { - fn new(distance: u32) -> NodeBucket { + fn new() -> NodeBucket { NodeBucket { - distance: distance, - nodes: Vec::new() + nodes: VecDeque::new() } } } @@ -64,6 +77,8 @@ struct Datagramm { pub struct Discovery { id: NodeId, + secret: Secret, + address: NodeEndpoint, udp_socket: UdpSocket, token: StreamToken, discovery_round: u16, @@ -74,80 +89,90 @@ pub struct Discovery { } pub struct TableUpdates { - pub added: HashMap, + pub added: HashMap, pub removed: HashSet, } -struct FindNodePacket; - -impl FindNodePacket { - fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket { - FindNodePacket - } - - fn sign(&mut self, _secret: &Secret) { - } - - fn send(& self, _socket: &mut UdpSocket) { - } -} - impl Discovery { - pub fn new(id: &NodeId, address: &SocketAddr, token: StreamToken) -> Discovery { - let socket = UdpSocket::bound(address).expect("Error binding UDP socket"); + pub fn new(key: &KeyPair, address: NodeEndpoint, token: StreamToken) -> Discovery { + let socket = UdpSocket::bound(&address.udp_address()).expect("Error binding UDP socket"); Discovery { - id: id.clone(), + id: key.public().clone(), + secret: key.secret().clone(), + address: address, token: token, discovery_round: 0, discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), - node_buckets: (0..NODE_BINS).map(NodeBucket::new).collect(), + node_buckets: (0..NODE_BINS).map(|_| NodeBucket::new()).collect(), udp_socket: socket, send_queue: VecDeque::new(), } } - pub fn add_node(&mut self, id: &NodeId) { - self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); + pub fn add_node(&mut self, e: NodeEntry) { + let endpoint = e.endpoint.clone(); + self.update_node(e); + self.ping(&endpoint); } - fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { + fn update_node(&mut self, e: NodeEntry) { + trace!(target: "discovery", "Inserting {:?}", &e); + let ping = { + let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id, &e.id) as usize).unwrap(); + let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) { + node.address = e.clone(); + node.timeout = None; + true + } else { false }; + + if !updated { + bucket.nodes.push_front(BucketEntry { address: e, timeout: None }); + } + + if bucket.nodes.len() > BUCKET_SIZE { + //ping least active node + bucket.nodes.back_mut().unwrap().timeout = Some(time::precise_time_ns()); + Some(bucket.nodes.back().unwrap().address.endpoint.clone()) + } else { None } + }; + if let Some(endpoint) = ping { + self.ping(&endpoint); + } + } + + fn start(&mut self) { + trace!(target: "discovery", "Starting discovery"); self.discovery_round = 0; - self.discovery_id.randomize(); + self.discovery_id.randomize(); //TODO: use cryptographic nonce self.discovery_nodes.clear(); - self.discover(event_loop); } - fn discover(&mut self, event_loop: &mut EventLoop) { - if self.discovery_round == DISCOVERY_MAX_STEPS - { - debug!("Restarting discovery"); - self.start_node_discovery(event_loop); + fn discover(&mut self) { + if self.discovery_round == DISCOVERY_MAX_STEPS { return; } + trace!(target: "discovery", "Starting round {:?}", self.discovery_round); let mut tried_count = 0; { - let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter(); - let nodes = RefCell::new(&mut self.discovery_nodes); - let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA); + let nearest = Discovery::nearest_node_entries(&self.discovery_id, &self.node_buckets).into_iter(); + let nearest = nearest.filter(|x| !self.discovery_nodes.contains(&x.id)).take(ALPHA).collect::>(); for r in nearest { - //let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id); - //p.sign(&self.secret); - //p.send(&mut self.udp_socket); - let mut borrowed = nodes.borrow_mut(); - borrowed.deref_mut().insert(r.clone()); + let rlp = encode(&(&[self.discovery_id.clone()][..])); + self.send_packet(PACKET_FIND_NODE, &r.endpoint.udp_address(), &rlp); + self.discovery_nodes.insert(r.id.clone()); tried_count += 1; + trace!(target: "discovery", "Sent FindNode to {:?}", &r.endpoint); } } - if tried_count == 0 - { - debug!("Restarting discovery"); - self.start_node_discovery(event_loop); + if tried_count == 0 { + trace!(target: "discovery", "Completing discovery"); + self.discovery_round = DISCOVERY_MAX_STEPS; + self.discovery_nodes.clear(); return; } self.discovery_round += 1; - //event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap(); } fn distance(a: &NodeId, b: &NodeId) -> u32 { @@ -163,75 +188,75 @@ impl Discovery { ret } - #[allow(cyclomatic_complexity)] - fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b [NodeBucket]) -> Vec<&'b NodeId> - { - // send ALPHA FindNode packets to nodes we know, closest to target - const LAST_BIN: u32 = NODE_BINS - 1; - let mut head = Discovery::distance(source, target); - let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS }; + fn ping(&mut self, node: &NodeEndpoint) { + let mut rlp = RlpStream::new_list(3); + rlp.append(&PROTOCOL_VERSION); + self.address.to_rlp_list(&mut rlp); + node.to_rlp_list(&mut rlp); + trace!(target: "discovery", "Sent Ping to {:?}", &node); + self.send_packet(PACKET_PING, &node.udp_address(), &rlp.drain()); + } - let mut found: BTreeMap> = BTreeMap::new(); + fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) { + let mut rlp = RlpStream::new(); + rlp.append_raw(&[packet_id], 1); + let source = Rlp::new(payload); + rlp.begin_list(source.item_count() + 1); + for i in 0 .. source.item_count() { + rlp.append_raw(source.at(i).as_raw(), 1); + } + let timestamp = time::get_time().sec as u32 + 60; + rlp.append(×tamp); + + let bytes = rlp.drain(); + let hash = bytes.sha3(); + let signature = match ec::sign(&self.secret, &hash) { + Ok(s) => s, + Err(_) => { + warn!("Error signing UDP packet"); + return; + } + }; + let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65); + packet.extend(hash.iter()); + packet.extend(signature.iter()); + packet.extend(bytes.iter()); + let signed_hash = (&packet[32..]).sha3(); + packet[0..32].clone_from_slice(&signed_hash); + self.send_to(packet, address.clone()); + } + + #[allow(map_clone)] + fn nearest_node_entries(target: &NodeId, buckets: &[NodeBucket]) -> Vec + { + let mut found: BTreeMap> = BTreeMap::new(); let mut count = 0; - // if d is 0, then we roll look forward, if last, we reverse, else, spread from d - if head > 1 && tail != LAST_BIN { - while head != tail && head < NODE_BINS && count < BUCKET_SIZE { - for n in &buckets[head as usize].nodes { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { break } - } - if count < BUCKET_SIZE && tail != 0 { - for n in &buckets[tail as usize].nodes { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { break } + // Sort nodes by distance to target + for bucket in buckets { + for node in &bucket.nodes { + let distance = Discovery::distance(target, &node.address.id); + found.entry(distance).or_insert_with(Vec::new).push(&node.address); + if count == BUCKET_SIZE { + // delete the most distant element + let remove = { + let (_, last) = found.iter_mut().next_back().unwrap(); + last.pop(); + last.is_empty() + }; + if remove { + found.remove(&distance); } } - - head += 1; - if tail > 0 { - tail -= 1; + else { + count += 1; } } } - else if head < 2 { - while head < NODE_BINS && count < BUCKET_SIZE { - for n in &buckets[head as usize].nodes { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { break } - } - head += 1; - } - } - else { - while tail > 0 && count < BUCKET_SIZE { - for n in &buckets[tail as usize].nodes { - if count < BUCKET_SIZE { - count += 1; - found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); - } - else { break } - } - tail -= 1; - } - } - let mut ret:Vec<&NodeId> = Vec::new(); + let mut ret:Vec = Vec::new(); for (_, nodes) in found { - for n in nodes { - if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ { - ret.push(n); - } - } + ret.extend(nodes.iter().map(|&n| n.clone())); } ret } @@ -240,18 +265,22 @@ impl Discovery { if self.send_queue.is_empty() { return; } - let data = self.send_queue.pop_front().unwrap(); - match self.udp_socket.send_to(&data.payload, &data.address) { - Ok(Some(size)) if size == data.payload.len() => { - }, - Ok(Some(size)) => { - warn!("UDP sent incomplete datagramm"); - }, - Ok(None) => { - self.send_queue.push_front(data); - } - Err(e) => { - warn!("UDP sent error: {:?}", e); + while !self.send_queue.is_empty() { + let data = self.send_queue.pop_front().unwrap(); + match self.udp_socket.send_to(&data.payload, &data.address) { + Ok(Some(size)) if size == data.payload.len() => { + }, + Ok(Some(_)) => { + warn!("UDP sent incomplete datagramm"); + }, + Ok(None) => { + self.send_queue.push_front(data); + return; + } + Err(e) => { + warn!("UDP send error: {:?}, address: {:?}", e, &data.address); + return; + } } } } @@ -305,25 +334,132 @@ impl Discovery { } fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { - Ok(None) + trace!(target: "discovery", "Got Ping from {:?}", &from); + let version: u32 = try!(rlp.val_at(0)); + if version != PROTOCOL_VERSION { + debug!(target: "discovery", "Unexpected protocol version: {}", version); + return Err(NetworkError::BadProtocol); + } + let source = try!(NodeEndpoint::from_rlp(&try!(rlp.at(1)))); + let dest = try!(NodeEndpoint::from_rlp(&try!(rlp.at(2)))); + let timestamp: u64 = try!(rlp.val_at(3)); + if timestamp < time::get_time().sec as u64{ + debug!(target: "discovery", "Expired ping"); + return Err(NetworkError::Expired); + } + let mut entry = NodeEntry { id: node.clone(), endpoint: source.clone() }; + if !entry.endpoint.is_valid() { + debug!(target: "discovery", "Bad address: {:?}", entry); + entry.endpoint.address = from.clone(); + } + self.update_node(entry.clone()); + let hash = rlp.as_raw().sha3(); + let mut response = RlpStream::new_list(2); + dest.to_rlp_list(&mut response); + response.append(&hash); + self.send_packet(PACKET_PONG, &entry.endpoint.udp_address(), &response.drain()); + + let mut added_map = HashMap::new(); + added_map.insert(node.clone(), entry); + Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() })) } fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + trace!(target: "discovery", "Got Pong from {:?}", &from); + // TODO: validate pong packet + let dest = try!(NodeEndpoint::from_rlp(&try!(rlp.at(0)))); + let timestamp: u64 = try!(rlp.val_at(2)); + if timestamp > time::get_time().sec as u64 { + return Err(NetworkError::Expired); + } + let mut entry = NodeEntry { id: node.clone(), endpoint: dest }; + if !entry.endpoint.is_valid() { + debug!(target: "discovery", "Bad address: {:?}", entry); + entry.endpoint.address = from.clone(); + } + self.update_node(entry.clone()); + let mut added_map = HashMap::new(); + added_map.insert(node.clone(), entry); + Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() })) + } + + fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + trace!(target: "discovery", "Got FindNode from {:?}", &from); + let target: NodeId = try!(rlp.val_at(0)); + let timestamp: u64 = try!(rlp.val_at(1)); + if timestamp > time::get_time().sec as u64 { + return Err(NetworkError::Expired); + } + + let limit = (MAX_DATAGRAM_SIZE - 109) / 90; + let nearest = Discovery::nearest_node_entries(&target, &self.node_buckets); + if nearest.is_empty() { + return Ok(None); + } + let mut rlp = RlpStream::new_list(cmp::min(limit, nearest.len())); + rlp.begin_list(1); + for n in 0 .. nearest.len() { + rlp.begin_list(4); + nearest[n].endpoint.to_rlp(&mut rlp); + rlp.append(&nearest[n].id); + if (n + 1) % limit == 0 || n == nearest.len() - 1 { + self.send_packet(PACKET_NEIGHBOURS, &from, &rlp.drain()); + trace!(target: "discovery", "Sent {} Neighbours to {:?}", n, &from); + rlp = RlpStream::new_list(cmp::min(limit, nearest.len() - n)); + rlp.begin_list(1); + } + } Ok(None) } - fn on_find_node(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { - Ok(None) + fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { + // TODO: validate packet + let mut added = HashMap::new(); + trace!(target: "discovery", "Got {} Neighbours from {:?}", try!(rlp.at(0)).item_count(), &from); + for r in try!(rlp.at(0)).iter() { + let endpoint = try!(NodeEndpoint::from_rlp(&r)); + if !endpoint.is_valid() { + debug!(target: "discovery", "Bad address: {:?}", endpoint); + continue; + } + let node_id: NodeId = try!(r.val_at(3)); + let entry = NodeEntry { id: node_id.clone(), endpoint: endpoint }; + added.insert(node_id, entry.clone()); + self.update_node(entry); + } + Ok(Some(TableUpdates { added: added, removed: HashSet::new() })) } - fn on_neighbours(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result, NetworkError> { - Ok(None) + fn check_expired(&mut self) -> HashSet { + let now = time::precise_time_ns(); + let mut removed: HashSet = HashSet::new(); + for bucket in &mut self.node_buckets { + bucket.nodes.retain(|node| { + if let Some(timeout) = node.timeout { + if now - timeout < PING_TIMEOUT_MS * 1000_0000 { + true + } + else { + trace!(target: "discovery", "Removed expired node {:?}", &node.address); + removed.insert(node.address.id.clone()); + false + } + } else { true } + }); + } + removed } - pub fn round(&mut self) { + pub fn round(&mut self) -> Option { + let removed = self.check_expired(); + self.discover(); + if !removed.is_empty() { + Some(TableUpdates { added: HashMap::new(), removed: removed }) + } else { None } } pub fn refresh(&mut self) { + self.start(); } pub fn register_socket(&self, event_loop: &mut EventLoop) -> Result<(), NetworkError> { @@ -334,7 +470,7 @@ impl Discovery { pub fn update_registration(&self, event_loop: &mut EventLoop) -> Result<(), NetworkError> { let mut registration = EventSet::readable(); if !self.send_queue.is_empty() { - registration &= EventSet::writable(); + registration = registration | EventSet::writable(); } event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket"); Ok(()) diff --git a/util/src/network/error.rs b/util/src/network/error.rs index eb97e54b6..74babb110 100644 --- a/util/src/network/error.rs +++ b/util/src/network/error.rs @@ -42,6 +42,8 @@ pub enum NetworkError { Auth, /// Unrecognised protocol. BadProtocol, + /// Message expired. + Expired, /// Peer not found. PeerNotFound, /// Peer is diconnected. diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 05462be37..47a3d9986 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -31,21 +31,18 @@ use network::handshake::Handshake; use network::session::{Session, SessionData}; use error::*; use io::*; -use network::NetworkProtocolHandler; +use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; use network::node::*; use network::stats::NetworkStats; use network::error::DisconnectReason; use igd::{PortMappingProtocol,search_gateway}; -use network::discovery::{Discovery, TableUpdates}; +use network::discovery::{Discovery, TableUpdates, NodeEntry}; use network::node_table::NodeTable; type Slab = ::slab::Slab; const _DEFAULT_PORT: u16 = 30304; - const MAX_CONNECTIONS: usize = 1024; -const IDEAL_PEERS: u32 = 10; - const MAINTENANCE_TIMEOUT: u64 = 1000; #[derive(Debug)] @@ -67,6 +64,8 @@ pub struct NetworkConfiguration { pub boot_nodes: Vec, /// Use provided node key instead of default pub use_secret: Option, + /// Number of connected peers to maintain + pub ideal_peers: u32, } impl NetworkConfiguration { @@ -81,6 +80,7 @@ impl NetworkConfiguration { pin: false, boot_nodes: Vec::new(), use_secret: None, + ideal_peers: 10, } } @@ -126,6 +126,7 @@ impl NetworkConfiguration { pin: self.pin, boot_nodes: self.boot_nodes, use_secret: self.use_secret, + ideal_peers: self.ideal_peers, } } } @@ -343,19 +344,20 @@ impl Host where Message: Send + Sync + Clone { // Setup the server socket let tcp_listener = TcpListener::bind(&addr).unwrap(); let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() }; - let public = keys.public().clone(); + let endpoint = NodeEndpoint { address: addr.clone(), udp_port: addr.port() }; + let discovery = Discovery::new(&keys, endpoint, DISCOVERY); let path = config.config_path.clone(); let mut host = Host:: { info: RwLock::new(HostInfo { keys: keys, config: config, nonce: H256::random(), - protocol_version: 4, + protocol_version: PROTOCOL_VERSION, client_version: format!("Parity/{}/{}-{}-{}", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os()), listen_port: 0, capabilities: Vec::new(), }), - discovery: Mutex::new(Discovery::new(&public, &addr, DISCOVERY)), + discovery: Mutex::new(discovery), tcp_listener: Mutex::new(tcp_listener), connections: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS))), nodes: RwLock::new(NodeTable::new(path)), @@ -382,7 +384,9 @@ impl Host where Message: Send + Sync + Clone { match Node::from_str(id) { Err(e) => { warn!("Could not add node: {:?}", e); }, Ok(n) => { + let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; self.nodes.write().unwrap().add_node(n); + self.discovery.lock().unwrap().add_node(entry); } } } @@ -432,6 +436,7 @@ impl Host where Message: Send + Sync + Clone { let mut to_connect: Vec = Vec::new(); let mut req_conn = 0; let pin = self.info.read().unwrap().deref().config.pin; + let ideal_peers = self.info.read().unwrap().deref().config.ideal_peers; for n in self.nodes.read().unwrap().nodes().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { let connected = self.have_session(&n.id) || self.connecting_to(&n.id); let required = n.peer_type == PeerType::Required; @@ -445,7 +450,7 @@ impl Host where Message: Send + Sync + Clone { for n in &to_connect { if n.peer_type == PeerType::Required { - if req_conn < IDEAL_PEERS { + if req_conn < ideal_peers { self.connect_peer(&n.id, io); } req_conn += 1; @@ -455,7 +460,7 @@ impl Host where Message: Send + Sync + Clone { if !pin { let pending_count = 0; //TODO: let peer_count = 0; - let mut open_slots = IDEAL_PEERS - peer_count - pending_count + req_conn; + let mut open_slots = ideal_peers - peer_count - pending_count + req_conn; if open_slots > 0 { for n in &to_connect { if n.peer_type == PeerType::Optional && open_slots > 0 { @@ -471,11 +476,11 @@ impl Host where Message: Send + Sync + Clone { fn connect_peer(&self, id: &NodeId, io: &IoContext>) { if self.have_session(id) { - warn!("Aborted connect. Node already connected."); + debug!("Aborted connect. Node already connected."); return; } if self.connecting_to(id) { - warn!("Aborted connect. Node already connecting."); + debug!("Aborted connect. Node already connecting."); return; } @@ -689,7 +694,7 @@ impl Host where Message: Send + Sync + Clone { for c in connections.iter() { match *c.lock().unwrap().deref_mut() { ConnectionEntry::Handshake(ref h) => { - if node_changes.removed.contains(&h.id) { + if node_changes.removed.contains(&h.id()) { to_remove.push(h.token()); } } @@ -732,6 +737,7 @@ impl IoHandler> for Host where Messa if let Some(node_changes) = self.discovery.lock().unwrap().readable() { self.update_nodes(io, node_changes); } + io.update_registration(DISCOVERY).expect("Error updating disicovery registration"); }, TCP_ACCEPT => self.accept(io), _ => panic!("Received unknown readable token"), @@ -741,7 +747,10 @@ impl IoHandler> for Host where Messa fn stream_writable(&self, io: &IoContext>, stream: StreamToken) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), - DISCOVERY => self.discovery.lock().unwrap().writable(), + DISCOVERY => { + self.discovery.lock().unwrap().writable(); + io.update_registration(DISCOVERY).expect("Error updating disicovery registration"); + } _ => panic!("Received unknown writable token"), } } @@ -752,9 +761,13 @@ impl IoHandler> for Host where Messa FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { self.discovery.lock().unwrap().refresh(); + io.update_registration(DISCOVERY).expect("Error updating disicovery registration"); }, DISCOVERY_ROUND => { - self.discovery.lock().unwrap().round(); + if let Some(node_changes) = self.discovery.lock().unwrap().round() { + self.update_nodes(io, node_changes); + } + io.update_registration(DISCOVERY).expect("Error updating disicovery registration"); }, _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index e5465c952..466ef4e6a 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -90,6 +90,8 @@ pub use network::stats::NetworkStats; use io::TimerToken; +const PROTOCOL_VERSION: u32 = 4; + /// Network IO protocol handler. This needs to be implemented for each new subprotocol. /// All the handler function are called from within IO event loop. /// `Message` is the type for message data. diff --git a/util/src/network/node.rs b/util/src/network/node.rs index e23dee9f5..d8370bc79 100644 --- a/util/src/network/node.rs +++ b/util/src/network/node.rs @@ -14,7 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::net::{SocketAddr, ToSocketAddrs}; +use std::mem; +use std::slice::from_raw_parts; +use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use std::hash::{Hash, Hasher}; use std::str::{FromStr}; use hash::*; @@ -25,17 +27,69 @@ use error::*; /// Node public key pub type NodeId = H512; -#[derive(Debug)] -/// Noe address info +#[derive(Debug, Clone)] +/// Node address info pub struct NodeEndpoint { /// IP(V4 or V6) address pub address: SocketAddr, - /// Address as string (can be host name). - pub address_str: String, /// Conneciton port. pub udp_port: u16 } +impl NodeEndpoint { + pub fn udp_address(&self) -> SocketAddr { + match self.address { + SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(a.ip().clone(), self.udp_port)), + SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(a.ip().clone(), self.udp_port, a.flowinfo(), a.scope_id())), + } + } +} + +impl NodeEndpoint { + pub fn from_rlp(rlp: &UntrustedRlp) -> Result { + let tcp_port = try!(rlp.val_at::(2)); + let udp_port = try!(rlp.val_at::(1)); + let addr_bytes = try!(try!(rlp.at(0)).data()); + let address = try!(match addr_bytes.len() { + 4 => Ok(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(addr_bytes[0], addr_bytes[1], addr_bytes[2], addr_bytes[3]), tcp_port))), + 16 => unsafe { + let o: *const u16 = mem::transmute(addr_bytes.as_ptr()); + let o = from_raw_parts(o, 8); + Ok(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(o[0], o[1], o[2], o[3], o[4], o[5], o[6], o[7]), tcp_port, 0, 0))) + }, + _ => Err(DecoderError::RlpInconsistentLengthAndData) + }); + Ok(NodeEndpoint { address: address, udp_port: udp_port }) + } + + pub fn to_rlp(&self, rlp: &mut RlpStream) { + match self.address { + SocketAddr::V4(a) => { + rlp.append(&(&a.ip().octets()[..])); + } + SocketAddr::V6(a) => unsafe { + let o: *const u8 = mem::transmute(a.ip().segments().as_ptr()); + rlp.append(&from_raw_parts(o, 16)); + } + }; + rlp.append(&self.udp_port); + rlp.append(&self.address.port()); + } + + pub fn to_rlp_list(&self, rlp: &mut RlpStream) { + rlp.begin_list(3); + self.to_rlp(rlp); + } + + pub fn is_valid(&self) -> bool { + self.udp_port != 0 && self.address.port() != 0 && + match self.address { + SocketAddr::V4(a) => !a.ip().is_unspecified(), + SocketAddr::V6(a) => !a.ip().is_unspecified() + } + } +} + impl FromStr for NodeEndpoint { type Err = UtilError; @@ -45,7 +99,6 @@ impl FromStr for NodeEndpoint { match address { Ok(Some(a)) => Ok(NodeEndpoint { address: a, - address_str: s.to_owned(), udp_port: a.port() }), Ok(_) => Err(UtilError::AddressResolve(None)), @@ -67,6 +120,17 @@ pub struct Node { pub last_attempted: Option, } +impl Node { + pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node { + Node { + id: id, + endpoint: endpoint, + peer_type: PeerType::Optional, + last_attempted: None, + } + } +} + impl FromStr for Node { type Err = UtilError; fn from_str(s: &str) -> Result { @@ -91,7 +155,7 @@ impl PartialEq for Node { self.id == other.id } } -impl Eq for Node { } +impl Eq for Node {} impl Hash for Node { fn hash(&self, state: &mut H) where H: Hasher { diff --git a/util/src/network/node_table.rs b/util/src/network/node_table.rs index 0f1c2c5ad..d93057eb3 100644 --- a/util/src/network/node_table.rs +++ b/util/src/network/node_table.rs @@ -43,7 +43,10 @@ impl NodeTable { } pub fn update(&mut self, mut update: TableUpdates) { - self.nodes.extend(update.added.drain()); + for (_, node) in update.added.drain() { + let mut entry = self.nodes.entry(node.id.clone()).or_insert_with(|| Node::new(node.id.clone(), node.endpoint.clone())); + entry.endpoint = node.endpoint; + } for r in update.removed { self.nodes.remove(&r); }