UDP discovery working

This commit is contained in:
arkpar 2016-02-13 22:57:39 +01:00
parent 2af379d4b1
commit 62b9f4b91d
9 changed files with 375 additions and 180 deletions

View File

@ -170,28 +170,8 @@ pub trait BytesConvertable {
fn to_bytes(&self) -> Bytes { self.as_slice().to_vec() } fn to_bytes(&self) -> Bytes { self.as_slice().to_vec() }
} }
impl<'a> BytesConvertable for &'a [u8] { impl<T> BytesConvertable for T where T: Deref<Target = [u8]> {
fn bytes(&self) -> &[u8] { self } fn bytes(&self) -> &[u8] { self.deref() }
}
impl BytesConvertable for Vec<u8> {
fn bytes(&self) -> &[u8] { self }
}
macro_rules! impl_bytes_convertable_for_array {
($zero: expr) => ();
($len: expr, $($idx: expr),*) => {
impl BytesConvertable for [u8; $len] {
fn bytes(&self) -> &[u8] { self }
}
impl_bytes_convertable_for_array! { $($idx),* }
}
}
// -1 at the end is not expanded
impl_bytes_convertable_for_array! {
32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16,
15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1
} }
#[test] #[test]

View File

@ -77,12 +77,6 @@ macro_rules! impl_hash {
/// Unformatted binary data of fixed length. /// Unformatted binary data of fixed length.
pub struct $from (pub [u8; $size]); pub struct $from (pub [u8; $size]);
impl BytesConvertable for $from {
fn bytes(&self) -> &[u8] {
&self.0
}
}
impl Deref for $from { impl Deref for $from {
type Target = [u8]; type Target = [u8];

View File

@ -19,6 +19,7 @@
#![feature(augmented_assignments)] #![feature(augmented_assignments)]
#![feature(associated_consts)] #![feature(associated_consts)]
#![feature(plugin)] #![feature(plugin)]
#![feature(ip)]
#![plugin(clippy)] #![plugin(clippy)]
#![allow(needless_range_loop, match_bool)] #![allow(needless_range_loop, match_bool)]
#![feature(catch_panic)] #![feature(catch_panic)]

View File

@ -17,24 +17,26 @@
use bytes::Bytes; use bytes::Bytes;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::cell::{RefCell};
use std::ops::{DerefMut};
use std::mem; use std::mem;
use std::cmp;
use mio::*; use mio::*;
use mio::udp::*; use mio::udp::*;
use sha3::*;
use time;
use hash::*; use hash::*;
use sha3::Hashable;
use crypto::*; use crypto::*;
use rlp::*; use rlp::*;
use network::node::*; use network::node::*;
use network::error::NetworkError; use network::error::NetworkError;
use io::StreamToken; use io::StreamToken;
use network::PROTOCOL_VERSION;
const ADDRESS_BYTES_SIZE: u32 = 32; // Size of address type in bytes. const ADDRESS_BYTES_SIZE: u32 = 32; // Size of address type in bytes.
const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia]. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia].
const NODE_BINS: u32 = ADDRESS_BITS - 1; // Size of m_state (excludes root, which is us). const NODE_BINS: u32 = ADDRESS_BITS - 1; // Size of m_state (excludes root, which is us).
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover) const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
const BUCKET_SIZE: u32 = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
const MAX_DATAGRAM_SIZE: usize = 1280; const MAX_DATAGRAM_SIZE: usize = 1280;
@ -43,16 +45,27 @@ const PACKET_PONG: u8 = 2;
const PACKET_FIND_NODE: u8 = 3; const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4; const PACKET_NEIGHBOURS: u8 = 4;
const PING_TIMEOUT_MS: u64 = 300;
#[derive(Clone, Debug)]
pub struct NodeEntry {
pub id: NodeId,
pub endpoint: NodeEndpoint,
}
pub struct BucketEntry {
pub address: NodeEntry,
pub timeout: Option<u64>,
}
struct NodeBucket { struct NodeBucket {
distance: u32, nodes: VecDeque<BucketEntry>, //sorted by last active
nodes: Vec<NodeId>
} }
impl NodeBucket { impl NodeBucket {
fn new(distance: u32) -> NodeBucket { fn new() -> NodeBucket {
NodeBucket { NodeBucket {
distance: distance, nodes: VecDeque::new()
nodes: Vec::new()
} }
} }
} }
@ -64,6 +77,8 @@ struct Datagramm {
pub struct Discovery { pub struct Discovery {
id: NodeId, id: NodeId,
secret: Secret,
address: NodeEndpoint,
udp_socket: UdpSocket, udp_socket: UdpSocket,
token: StreamToken, token: StreamToken,
discovery_round: u16, discovery_round: u16,
@ -74,80 +89,90 @@ pub struct Discovery {
} }
pub struct TableUpdates { pub struct TableUpdates {
pub added: HashMap<NodeId, Node>, pub added: HashMap<NodeId, NodeEntry>,
pub removed: HashSet<NodeId>, pub removed: HashSet<NodeId>,
} }
struct FindNodePacket;
impl FindNodePacket {
fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket {
FindNodePacket
}
fn sign(&mut self, _secret: &Secret) {
}
fn send(& self, _socket: &mut UdpSocket) {
}
}
impl Discovery { impl Discovery {
pub fn new(id: &NodeId, address: &SocketAddr, token: StreamToken) -> Discovery { pub fn new(key: &KeyPair, address: NodeEndpoint, token: StreamToken) -> Discovery {
let socket = UdpSocket::bound(address).expect("Error binding UDP socket"); let socket = UdpSocket::bound(&address.udp_address()).expect("Error binding UDP socket");
Discovery { Discovery {
id: id.clone(), id: key.public().clone(),
secret: key.secret().clone(),
address: address,
token: token, token: token,
discovery_round: 0, discovery_round: 0,
discovery_id: NodeId::new(), discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(), discovery_nodes: HashSet::new(),
node_buckets: (0..NODE_BINS).map(NodeBucket::new).collect(), node_buckets: (0..NODE_BINS).map(|_| NodeBucket::new()).collect(),
udp_socket: socket, udp_socket: socket,
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
} }
} }
pub fn add_node(&mut self, id: &NodeId) { pub fn add_node(&mut self, e: NodeEntry) {
self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); let endpoint = e.endpoint.clone();
self.update_node(e);
self.ping(&endpoint);
} }
fn start_node_discovery<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) { fn update_node(&mut self, e: NodeEntry) {
trace!(target: "discovery", "Inserting {:?}", &e);
let ping = {
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id, &e.id) as usize).unwrap();
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.timeout = None;
true
} else { false };
if !updated {
bucket.nodes.push_front(BucketEntry { address: e, timeout: None });
}
if bucket.nodes.len() > BUCKET_SIZE {
//ping least active node
bucket.nodes.back_mut().unwrap().timeout = Some(time::precise_time_ns());
Some(bucket.nodes.back().unwrap().address.endpoint.clone())
} else { None }
};
if let Some(endpoint) = ping {
self.ping(&endpoint);
}
}
fn start(&mut self) {
trace!(target: "discovery", "Starting discovery");
self.discovery_round = 0; self.discovery_round = 0;
self.discovery_id.randomize(); self.discovery_id.randomize(); //TODO: use cryptographic nonce
self.discovery_nodes.clear(); self.discovery_nodes.clear();
self.discover(event_loop);
} }
fn discover<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) { fn discover(&mut self) {
if self.discovery_round == DISCOVERY_MAX_STEPS if self.discovery_round == DISCOVERY_MAX_STEPS {
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return; return;
} }
trace!(target: "discovery", "Starting round {:?}", self.discovery_round);
let mut tried_count = 0; let mut tried_count = 0;
{ {
let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter(); let nearest = Discovery::nearest_node_entries(&self.discovery_id, &self.node_buckets).into_iter();
let nodes = RefCell::new(&mut self.discovery_nodes); let nearest = nearest.filter(|x| !self.discovery_nodes.contains(&x.id)).take(ALPHA).collect::<Vec<_>>();
let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA);
for r in nearest { for r in nearest {
//let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id); let rlp = encode(&(&[self.discovery_id.clone()][..]));
//p.sign(&self.secret); self.send_packet(PACKET_FIND_NODE, &r.endpoint.udp_address(), &rlp);
//p.send(&mut self.udp_socket); self.discovery_nodes.insert(r.id.clone());
let mut borrowed = nodes.borrow_mut();
borrowed.deref_mut().insert(r.clone());
tried_count += 1; tried_count += 1;
trace!(target: "discovery", "Sent FindNode to {:?}", &r.endpoint);
} }
} }
if tried_count == 0 if tried_count == 0 {
{ trace!(target: "discovery", "Completing discovery");
debug!("Restarting discovery"); self.discovery_round = DISCOVERY_MAX_STEPS;
self.start_node_discovery(event_loop); self.discovery_nodes.clear();
return; return;
} }
self.discovery_round += 1; self.discovery_round += 1;
//event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap();
} }
fn distance(a: &NodeId, b: &NodeId) -> u32 { fn distance(a: &NodeId, b: &NodeId) -> u32 {
@ -163,75 +188,75 @@ impl Discovery {
ret ret
} }
#[allow(cyclomatic_complexity)] fn ping(&mut self, node: &NodeEndpoint) {
fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b [NodeBucket]) -> Vec<&'b NodeId> let mut rlp = RlpStream::new_list(3);
{ rlp.append(&PROTOCOL_VERSION);
// send ALPHA FindNode packets to nodes we know, closest to target self.address.to_rlp_list(&mut rlp);
const LAST_BIN: u32 = NODE_BINS - 1; node.to_rlp_list(&mut rlp);
let mut head = Discovery::distance(source, target); trace!(target: "discovery", "Sent Ping to {:?}", &node);
let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS }; self.send_packet(PACKET_PING, &node.udp_address(), &rlp.drain());
}
let mut found: BTreeMap<u32, Vec<&'b NodeId>> = BTreeMap::new(); fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) {
let mut rlp = RlpStream::new();
rlp.append_raw(&[packet_id], 1);
let source = Rlp::new(payload);
rlp.begin_list(source.item_count() + 1);
for i in 0 .. source.item_count() {
rlp.append_raw(source.at(i).as_raw(), 1);
}
let timestamp = time::get_time().sec as u32 + 60;
rlp.append(&timestamp);
let bytes = rlp.drain();
let hash = bytes.sha3();
let signature = match ec::sign(&self.secret, &hash) {
Ok(s) => s,
Err(_) => {
warn!("Error signing UDP packet");
return;
}
};
let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65);
packet.extend(hash.iter());
packet.extend(signature.iter());
packet.extend(bytes.iter());
let signed_hash = (&packet[32..]).sha3();
packet[0..32].clone_from_slice(&signed_hash);
self.send_to(packet, address.clone());
}
#[allow(map_clone)]
fn nearest_node_entries(target: &NodeId, buckets: &[NodeBucket]) -> Vec<NodeEntry>
{
let mut found: BTreeMap<u32, Vec<&NodeEntry>> = BTreeMap::new();
let mut count = 0; let mut count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d // Sort nodes by distance to target
if head > 1 && tail != LAST_BIN { for bucket in buckets {
while head != tail && head < NODE_BINS && count < BUCKET_SIZE { for node in &bucket.nodes {
for n in &buckets[head as usize].nodes { let distance = Discovery::distance(target, &node.address.id);
if count < BUCKET_SIZE { found.entry(distance).or_insert_with(Vec::new).push(&node.address);
count += 1; if count == BUCKET_SIZE {
found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n); // delete the most distant element
} let remove = {
else { break } let (_, last) = found.iter_mut().next_back().unwrap();
} last.pop();
if count < BUCKET_SIZE && tail != 0 { last.is_empty()
for n in &buckets[tail as usize].nodes { };
if count < BUCKET_SIZE { if remove {
count += 1; found.remove(&distance);
found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n);
}
else { break }
}
}
head += 1;
if tail > 0 {
tail -= 1;
}
}
}
else if head < 2 {
while head < NODE_BINS && count < BUCKET_SIZE {
for n in &buckets[head as usize].nodes {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n);
}
else { break }
}
head += 1;
} }
} }
else { else {
while tail > 0 && count < BUCKET_SIZE {
for n in &buckets[tail as usize].nodes {
if count < BUCKET_SIZE {
count += 1; count += 1;
found.entry(Discovery::distance(target, &n)).or_insert_with(Vec::new).push(n);
} }
else { break }
}
tail -= 1;
} }
} }
let mut ret:Vec<&NodeId> = Vec::new(); let mut ret:Vec<NodeEntry> = Vec::new();
for (_, nodes) in found { for (_, nodes) in found {
for n in nodes { ret.extend(nodes.iter().map(|&n| n.clone()));
if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ {
ret.push(n);
}
}
} }
ret ret
} }
@ -240,18 +265,22 @@ impl Discovery {
if self.send_queue.is_empty() { if self.send_queue.is_empty() {
return; return;
} }
while !self.send_queue.is_empty() {
let data = self.send_queue.pop_front().unwrap(); let data = self.send_queue.pop_front().unwrap();
match self.udp_socket.send_to(&data.payload, &data.address) { match self.udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => { Ok(Some(size)) if size == data.payload.len() => {
}, },
Ok(Some(size)) => { Ok(Some(_)) => {
warn!("UDP sent incomplete datagramm"); warn!("UDP sent incomplete datagramm");
}, },
Ok(None) => { Ok(None) => {
self.send_queue.push_front(data); self.send_queue.push_front(data);
return;
} }
Err(e) => { Err(e) => {
warn!("UDP sent error: {:?}", e); warn!("UDP send error: {:?}, address: {:?}", e, &data.address);
return;
}
} }
} }
} }
@ -305,25 +334,132 @@ impl Discovery {
} }
fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
Ok(None) trace!(target: "discovery", "Got Ping from {:?}", &from);
let version: u32 = try!(rlp.val_at(0));
if version != PROTOCOL_VERSION {
debug!(target: "discovery", "Unexpected protocol version: {}", version);
return Err(NetworkError::BadProtocol);
}
let source = try!(NodeEndpoint::from_rlp(&try!(rlp.at(1))));
let dest = try!(NodeEndpoint::from_rlp(&try!(rlp.at(2))));
let timestamp: u64 = try!(rlp.val_at(3));
if timestamp < time::get_time().sec as u64{
debug!(target: "discovery", "Expired ping");
return Err(NetworkError::Expired);
}
let mut entry = NodeEntry { id: node.clone(), endpoint: source.clone() };
if !entry.endpoint.is_valid() {
debug!(target: "discovery", "Bad address: {:?}", entry);
entry.endpoint.address = from.clone();
}
self.update_node(entry.clone());
let hash = rlp.as_raw().sha3();
let mut response = RlpStream::new_list(2);
dest.to_rlp_list(&mut response);
response.append(&hash);
self.send_packet(PACKET_PONG, &entry.endpoint.udp_address(), &response.drain());
let mut added_map = HashMap::new();
added_map.insert(node.clone(), entry);
Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() }))
} }
fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
trace!(target: "discovery", "Got Pong from {:?}", &from);
// TODO: validate pong packet
let dest = try!(NodeEndpoint::from_rlp(&try!(rlp.at(0))));
let timestamp: u64 = try!(rlp.val_at(2));
if timestamp > time::get_time().sec as u64 {
return Err(NetworkError::Expired);
}
let mut entry = NodeEntry { id: node.clone(), endpoint: dest };
if !entry.endpoint.is_valid() {
debug!(target: "discovery", "Bad address: {:?}", entry);
entry.endpoint.address = from.clone();
}
self.update_node(entry.clone());
let mut added_map = HashMap::new();
added_map.insert(node.clone(), entry);
Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() }))
}
fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
trace!(target: "discovery", "Got FindNode from {:?}", &from);
let target: NodeId = try!(rlp.val_at(0));
let timestamp: u64 = try!(rlp.val_at(1));
if timestamp > time::get_time().sec as u64 {
return Err(NetworkError::Expired);
}
let limit = (MAX_DATAGRAM_SIZE - 109) / 90;
let nearest = Discovery::nearest_node_entries(&target, &self.node_buckets);
if nearest.is_empty() {
return Ok(None);
}
let mut rlp = RlpStream::new_list(cmp::min(limit, nearest.len()));
rlp.begin_list(1);
for n in 0 .. nearest.len() {
rlp.begin_list(4);
nearest[n].endpoint.to_rlp(&mut rlp);
rlp.append(&nearest[n].id);
if (n + 1) % limit == 0 || n == nearest.len() - 1 {
self.send_packet(PACKET_NEIGHBOURS, &from, &rlp.drain());
trace!(target: "discovery", "Sent {} Neighbours to {:?}", n, &from);
rlp = RlpStream::new_list(cmp::min(limit, nearest.len() - n));
rlp.begin_list(1);
}
}
Ok(None) Ok(None)
} }
fn on_find_node(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
Ok(None) // TODO: validate packet
let mut added = HashMap::new();
trace!(target: "discovery", "Got {} Neighbours from {:?}", try!(rlp.at(0)).item_count(), &from);
for r in try!(rlp.at(0)).iter() {
let endpoint = try!(NodeEndpoint::from_rlp(&r));
if !endpoint.is_valid() {
debug!(target: "discovery", "Bad address: {:?}", endpoint);
continue;
}
let node_id: NodeId = try!(r.val_at(3));
let entry = NodeEntry { id: node_id.clone(), endpoint: endpoint };
added.insert(node_id, entry.clone());
self.update_node(entry);
}
Ok(Some(TableUpdates { added: added, removed: HashSet::new() }))
} }
fn on_neighbours(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn check_expired(&mut self) -> HashSet<NodeId> {
Ok(None) let now = time::precise_time_ns();
let mut removed: HashSet<NodeId> = HashSet::new();
for bucket in &mut self.node_buckets {
bucket.nodes.retain(|node| {
if let Some(timeout) = node.timeout {
if now - timeout < PING_TIMEOUT_MS * 1000_0000 {
true
}
else {
trace!(target: "discovery", "Removed expired node {:?}", &node.address);
removed.insert(node.address.id.clone());
false
}
} else { true }
});
}
removed
} }
pub fn round(&mut self) { pub fn round(&mut self) -> Option<TableUpdates> {
let removed = self.check_expired();
self.discover();
if !removed.is_empty() {
Some(TableUpdates { added: HashMap::new(), removed: removed })
} else { None }
} }
pub fn refresh(&mut self) { pub fn refresh(&mut self) {
self.start();
} }
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
@ -334,7 +470,7 @@ impl Discovery {
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
let mut registration = EventSet::readable(); let mut registration = EventSet::readable();
if !self.send_queue.is_empty() { if !self.send_queue.is_empty() {
registration &= EventSet::writable(); registration = registration | EventSet::writable();
} }
event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket"); event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket");
Ok(()) Ok(())

View File

@ -42,6 +42,8 @@ pub enum NetworkError {
Auth, Auth,
/// Unrecognised protocol. /// Unrecognised protocol.
BadProtocol, BadProtocol,
/// Message expired.
Expired,
/// Peer not found. /// Peer not found.
PeerNotFound, PeerNotFound,
/// Peer is diconnected. /// Peer is diconnected.

View File

@ -31,21 +31,18 @@ use network::handshake::Handshake;
use network::session::{Session, SessionData}; use network::session::{Session, SessionData};
use error::*; use error::*;
use io::*; use io::*;
use network::NetworkProtocolHandler; use network::{NetworkProtocolHandler, PROTOCOL_VERSION};
use network::node::*; use network::node::*;
use network::stats::NetworkStats; use network::stats::NetworkStats;
use network::error::DisconnectReason; use network::error::DisconnectReason;
use igd::{PortMappingProtocol,search_gateway}; use igd::{PortMappingProtocol,search_gateway};
use network::discovery::{Discovery, TableUpdates}; use network::discovery::{Discovery, TableUpdates, NodeEntry};
use network::node_table::NodeTable; use network::node_table::NodeTable;
type Slab<T> = ::slab::Slab<T, usize>; type Slab<T> = ::slab::Slab<T, usize>;
const _DEFAULT_PORT: u16 = 30304; const _DEFAULT_PORT: u16 = 30304;
const MAX_CONNECTIONS: usize = 1024; const MAX_CONNECTIONS: usize = 1024;
const IDEAL_PEERS: u32 = 10;
const MAINTENANCE_TIMEOUT: u64 = 1000; const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)] #[derive(Debug)]
@ -67,6 +64,8 @@ pub struct NetworkConfiguration {
pub boot_nodes: Vec<String>, pub boot_nodes: Vec<String>,
/// Use provided node key instead of default /// Use provided node key instead of default
pub use_secret: Option<Secret>, pub use_secret: Option<Secret>,
/// Number of connected peers to maintain
pub ideal_peers: u32,
} }
impl NetworkConfiguration { impl NetworkConfiguration {
@ -81,6 +80,7 @@ impl NetworkConfiguration {
pin: false, pin: false,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 10,
} }
} }
@ -126,6 +126,7 @@ impl NetworkConfiguration {
pin: self.pin, pin: self.pin,
boot_nodes: self.boot_nodes, boot_nodes: self.boot_nodes,
use_secret: self.use_secret, use_secret: self.use_secret,
ideal_peers: self.ideal_peers,
} }
} }
} }
@ -343,19 +344,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Setup the server socket // Setup the server socket
let tcp_listener = TcpListener::bind(&addr).unwrap(); let tcp_listener = TcpListener::bind(&addr).unwrap();
let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() }; let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() };
let public = keys.public().clone(); let endpoint = NodeEndpoint { address: addr.clone(), udp_port: addr.port() };
let discovery = Discovery::new(&keys, endpoint, DISCOVERY);
let path = config.config_path.clone(); let path = config.config_path.clone();
let mut host = Host::<Message> { let mut host = Host::<Message> {
info: RwLock::new(HostInfo { info: RwLock::new(HostInfo {
keys: keys, keys: keys,
config: config, config: config,
nonce: H256::random(), nonce: H256::random(),
protocol_version: 4, protocol_version: PROTOCOL_VERSION,
client_version: format!("Parity/{}/{}-{}-{}", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os()), client_version: format!("Parity/{}/{}-{}-{}", env!("CARGO_PKG_VERSION"), Target::arch(), Target::env(), Target::os()),
listen_port: 0, listen_port: 0,
capabilities: Vec::new(), capabilities: Vec::new(),
}), }),
discovery: Mutex::new(Discovery::new(&public, &addr, DISCOVERY)), discovery: Mutex::new(discovery),
tcp_listener: Mutex::new(tcp_listener), tcp_listener: Mutex::new(tcp_listener),
connections: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS))), connections: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS))),
nodes: RwLock::new(NodeTable::new(path)), nodes: RwLock::new(NodeTable::new(path)),
@ -382,7 +384,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
match Node::from_str(id) { match Node::from_str(id) {
Err(e) => { warn!("Could not add node: {:?}", e); }, Err(e) => { warn!("Could not add node: {:?}", e); },
Ok(n) => { Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.nodes.write().unwrap().add_node(n); self.nodes.write().unwrap().add_node(n);
self.discovery.lock().unwrap().add_node(entry);
} }
} }
} }
@ -432,6 +436,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut to_connect: Vec<NodeInfo> = Vec::new(); let mut to_connect: Vec<NodeInfo> = Vec::new();
let mut req_conn = 0; let mut req_conn = 0;
let pin = self.info.read().unwrap().deref().config.pin; let pin = self.info.read().unwrap().deref().config.pin;
let ideal_peers = self.info.read().unwrap().deref().config.ideal_peers;
for n in self.nodes.read().unwrap().nodes().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { for n in self.nodes.read().unwrap().nodes().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) {
let connected = self.have_session(&n.id) || self.connecting_to(&n.id); let connected = self.have_session(&n.id) || self.connecting_to(&n.id);
let required = n.peer_type == PeerType::Required; let required = n.peer_type == PeerType::Required;
@ -445,7 +450,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
for n in &to_connect { for n in &to_connect {
if n.peer_type == PeerType::Required { if n.peer_type == PeerType::Required {
if req_conn < IDEAL_PEERS { if req_conn < ideal_peers {
self.connect_peer(&n.id, io); self.connect_peer(&n.id, io);
} }
req_conn += 1; req_conn += 1;
@ -455,7 +460,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if !pin { if !pin {
let pending_count = 0; //TODO: let pending_count = 0; //TODO:
let peer_count = 0; let peer_count = 0;
let mut open_slots = IDEAL_PEERS - peer_count - pending_count + req_conn; let mut open_slots = ideal_peers - peer_count - pending_count + req_conn;
if open_slots > 0 { if open_slots > 0 {
for n in &to_connect { for n in &to_connect {
if n.peer_type == PeerType::Optional && open_slots > 0 { if n.peer_type == PeerType::Optional && open_slots > 0 {
@ -471,11 +476,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) { fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) {
if self.have_session(id) if self.have_session(id)
{ {
warn!("Aborted connect. Node already connected."); debug!("Aborted connect. Node already connected.");
return; return;
} }
if self.connecting_to(id) { if self.connecting_to(id) {
warn!("Aborted connect. Node already connecting."); debug!("Aborted connect. Node already connecting.");
return; return;
} }
@ -689,7 +694,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
for c in connections.iter() { for c in connections.iter() {
match *c.lock().unwrap().deref_mut() { match *c.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref h) => { ConnectionEntry::Handshake(ref h) => {
if node_changes.removed.contains(&h.id) { if node_changes.removed.contains(&h.id()) {
to_remove.push(h.token()); to_remove.push(h.token());
} }
} }
@ -732,6 +737,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
if let Some(node_changes) = self.discovery.lock().unwrap().readable() { if let Some(node_changes) = self.discovery.lock().unwrap().readable() {
self.update_nodes(io, node_changes); self.update_nodes(io, node_changes);
} }
io.update_registration(DISCOVERY).expect("Error updating disicovery registration");
}, },
TCP_ACCEPT => self.accept(io), TCP_ACCEPT => self.accept(io),
_ => panic!("Received unknown readable token"), _ => panic!("Received unknown readable token"),
@ -741,7 +747,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) { fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
match stream { match stream {
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io),
DISCOVERY => self.discovery.lock().unwrap().writable(), DISCOVERY => {
self.discovery.lock().unwrap().writable();
io.update_registration(DISCOVERY).expect("Error updating disicovery registration");
}
_ => panic!("Received unknown writable token"), _ => panic!("Received unknown writable token"),
} }
} }
@ -752,9 +761,13 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io), FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => { DISCOVERY_REFRESH => {
self.discovery.lock().unwrap().refresh(); self.discovery.lock().unwrap().refresh();
io.update_registration(DISCOVERY).expect("Error updating disicovery registration");
}, },
DISCOVERY_ROUND => { DISCOVERY_ROUND => {
self.discovery.lock().unwrap().round(); if let Some(node_changes) = self.discovery.lock().unwrap().round() {
self.update_nodes(io, node_changes);
}
io.update_registration(DISCOVERY).expect("Error updating disicovery registration");
}, },
_ => match self.timers.read().unwrap().get(&token).cloned() { _ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {

View File

@ -90,6 +90,8 @@ pub use network::stats::NetworkStats;
use io::TimerToken; use io::TimerToken;
const PROTOCOL_VERSION: u32 = 4;
/// Network IO protocol handler. This needs to be implemented for each new subprotocol. /// Network IO protocol handler. This needs to be implemented for each new subprotocol.
/// All the handler function are called from within IO event loop. /// All the handler function are called from within IO event loop.
/// `Message` is the type for message data. /// `Message` is the type for message data.

View File

@ -14,7 +14,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::net::{SocketAddr, ToSocketAddrs}; use std::mem;
use std::slice::from_raw_parts;
use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::str::{FromStr}; use std::str::{FromStr};
use hash::*; use hash::*;
@ -25,17 +27,69 @@ use error::*;
/// Node public key /// Node public key
pub type NodeId = H512; pub type NodeId = H512;
#[derive(Debug)] #[derive(Debug, Clone)]
/// Noe address info /// Node address info
pub struct NodeEndpoint { pub struct NodeEndpoint {
/// IP(V4 or V6) address /// IP(V4 or V6) address
pub address: SocketAddr, pub address: SocketAddr,
/// Address as string (can be host name).
pub address_str: String,
/// Conneciton port. /// Conneciton port.
pub udp_port: u16 pub udp_port: u16
} }
impl NodeEndpoint {
pub fn udp_address(&self) -> SocketAddr {
match self.address {
SocketAddr::V4(a) => SocketAddr::V4(SocketAddrV4::new(a.ip().clone(), self.udp_port)),
SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(a.ip().clone(), self.udp_port, a.flowinfo(), a.scope_id())),
}
}
}
impl NodeEndpoint {
pub fn from_rlp(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let tcp_port = try!(rlp.val_at::<u16>(2));
let udp_port = try!(rlp.val_at::<u16>(1));
let addr_bytes = try!(try!(rlp.at(0)).data());
let address = try!(match addr_bytes.len() {
4 => Ok(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(addr_bytes[0], addr_bytes[1], addr_bytes[2], addr_bytes[3]), tcp_port))),
16 => unsafe {
let o: *const u16 = mem::transmute(addr_bytes.as_ptr());
let o = from_raw_parts(o, 8);
Ok(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(o[0], o[1], o[2], o[3], o[4], o[5], o[6], o[7]), tcp_port, 0, 0)))
},
_ => Err(DecoderError::RlpInconsistentLengthAndData)
});
Ok(NodeEndpoint { address: address, udp_port: udp_port })
}
pub fn to_rlp(&self, rlp: &mut RlpStream) {
match self.address {
SocketAddr::V4(a) => {
rlp.append(&(&a.ip().octets()[..]));
}
SocketAddr::V6(a) => unsafe {
let o: *const u8 = mem::transmute(a.ip().segments().as_ptr());
rlp.append(&from_raw_parts(o, 16));
}
};
rlp.append(&self.udp_port);
rlp.append(&self.address.port());
}
pub fn to_rlp_list(&self, rlp: &mut RlpStream) {
rlp.begin_list(3);
self.to_rlp(rlp);
}
pub fn is_valid(&self) -> bool {
self.udp_port != 0 && self.address.port() != 0 &&
match self.address {
SocketAddr::V4(a) => !a.ip().is_unspecified(),
SocketAddr::V6(a) => !a.ip().is_unspecified()
}
}
}
impl FromStr for NodeEndpoint { impl FromStr for NodeEndpoint {
type Err = UtilError; type Err = UtilError;
@ -45,7 +99,6 @@ impl FromStr for NodeEndpoint {
match address { match address {
Ok(Some(a)) => Ok(NodeEndpoint { Ok(Some(a)) => Ok(NodeEndpoint {
address: a, address: a,
address_str: s.to_owned(),
udp_port: a.port() udp_port: a.port()
}), }),
Ok(_) => Err(UtilError::AddressResolve(None)), Ok(_) => Err(UtilError::AddressResolve(None)),
@ -67,6 +120,17 @@ pub struct Node {
pub last_attempted: Option<Tm>, pub last_attempted: Option<Tm>,
} }
impl Node {
pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node {
Node {
id: id,
endpoint: endpoint,
peer_type: PeerType::Optional,
last_attempted: None,
}
}
}
impl FromStr for Node { impl FromStr for Node {
type Err = UtilError; type Err = UtilError;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {

View File

@ -43,7 +43,10 @@ impl NodeTable {
} }
pub fn update(&mut self, mut update: TableUpdates) { pub fn update(&mut self, mut update: TableUpdates) {
self.nodes.extend(update.added.drain()); for (_, node) in update.added.drain() {
let mut entry = self.nodes.entry(node.id.clone()).or_insert_with(|| Node::new(node.id.clone(), node.endpoint.clone()));
entry.endpoint = node.endpoint;
}
for r in update.removed { for r in update.removed {
self.nodes.remove(&r); self.nodes.remove(&r);
} }