This commit is contained in:
arkpar 2016-01-04 13:49:32 +01:00
parent 32fcc6b3e9
commit 39f2dc9e2f
5 changed files with 168 additions and 187 deletions

View File

@ -33,7 +33,7 @@ pub trait FixedHash: Sized + BytesConvertable {
macro_rules! impl_hash { macro_rules! impl_hash {
($from: ident, $size: expr) => { ($from: ident, $size: expr) => {
#[derive(Eq, Copy)] #[derive(Eq)]
pub struct $from (pub [u8; $size]); pub struct $from (pub [u8; $size]);
impl BytesConvertable for $from { impl BytesConvertable for $from {

View File

@ -11,56 +11,57 @@ use hash::*;
use crypto::*; use crypto::*;
use network::host::*; use network::host::*;
const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes.
const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia]. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia].
const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us). const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us).
const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover) const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover)
const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
struct NodeBucket { struct NodeBucket {
distance: u32, distance: u32,
nodes: Vec<NodeId> nodes: Vec<NodeId>
} }
impl NodeBucket { impl NodeBucket {
fn new(distance: u32) -> NodeBucket { fn new(distance: u32) -> NodeBucket {
NodeBucket { NodeBucket {
distance: distance, distance: distance,
nodes: Vec::new() nodes: Vec::new()
} }
} }
} }
struct Discovery { struct Discovery {
id: NodeId, id: NodeId,
discovery_round: u16, discovery_round: u16,
discovery_id: NodeId, discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>, discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>, node_buckets: Vec<NodeBucket>,
} }
struct FindNodePacket; struct FindNodePacket;
impl FindNodePacket { impl FindNodePacket {
fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket { fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket {
FindNodePacket FindNodePacket
} }
fn sign(&mut self, _secret: &Secret) {
}
fn send(& self, _socket: &mut UdpSocket) { fn sign(&mut self, _secret: &Secret) {
} }
fn send(& self, _socket: &mut UdpSocket) {
}
} }
impl Discovery { impl Discovery {
pub fn new(id: &NodeId) -> Discovery { pub fn new(id: &NodeId) -> Discovery {
Discovery { Discovery {
id: id.clone(), id: id.clone(),
discovery_round: 0, discovery_round: 0,
discovery_id: NodeId::new(), discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(), discovery_nodes: HashSet::new(),
node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(), node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(),
} }
} }
@ -68,137 +69,137 @@ impl Discovery {
self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone());
} }
fn start_node_discovery(&mut self, event_loop: &mut EventLoop<Host>) { fn start_node_discovery(&mut self, event_loop: &mut EventLoop<Host>) {
self.discovery_round = 0; self.discovery_round = 0;
self.discovery_id.randomize(); self.discovery_id.randomize();
self.discovery_nodes.clear(); self.discovery_nodes.clear();
self.discover(event_loop); self.discover(event_loop);
} }
fn discover(&mut self, event_loop: &mut EventLoop<Host>) { fn discover(&mut self, event_loop: &mut EventLoop<Host>) {
if self.discovery_round == DISCOVERY_MAX_STEPS if self.discovery_round == DISCOVERY_MAX_STEPS
{ {
debug!("Restarting discovery"); debug!("Restarting discovery");
self.start_node_discovery(event_loop); self.start_node_discovery(event_loop);
return; return;
} }
let mut tried_count = 0; let mut tried_count = 0;
{ {
let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter(); let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter();
let nodes = RefCell::new(&mut self.discovery_nodes); let nodes = RefCell::new(&mut self.discovery_nodes);
let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA); let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA);
for r in nearest { for r in nearest {
//let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id); //let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id);
//p.sign(&self.secret); //p.sign(&self.secret);
//p.send(&mut self.udp_socket); //p.send(&mut self.udp_socket);
let mut borrowed = nodes.borrow_mut(); let mut borrowed = nodes.borrow_mut();
borrowed.deref_mut().insert(r.clone()); borrowed.deref_mut().insert(r.clone());
tried_count += 1; tried_count += 1;
} }
} }
if tried_count == 0 if tried_count == 0
{ {
debug!("Restarting discovery"); debug!("Restarting discovery");
self.start_node_discovery(event_loop); self.start_node_discovery(event_loop);
return; return;
} }
self.discovery_round += 1; self.discovery_round += 1;
//event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap(); //event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap();
} }
fn distance(a: &NodeId, b: &NodeId) -> u32 { fn distance(a: &NodeId, b: &NodeId) -> u32 {
let d = a.sha3() ^ b.sha3(); let d = a.sha3() ^ b.sha3();
let mut ret:u32 = 0; let mut ret:u32 = 0;
for i in 0..32 { for i in 0..32 {
let mut v: u8 = d[i]; let mut v: u8 = d[i];
while v != 0 { while v != 0 {
v >>= 1; v >>= 1;
ret += 1; ret += 1;
} }
} }
ret ret
} }
fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec<NodeBucket>) -> Vec<&'b NodeId> fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec<NodeBucket>) -> Vec<&'b NodeId>
{ {
// send ALPHA FindNode packets to nodes we know, closest to target // send ALPHA FindNode packets to nodes we know, closest to target
const LAST_BIN: u32 = NODE_BINS - 1; const LAST_BIN: u32 = NODE_BINS - 1;
let mut head = Discovery::distance(source, target); let mut head = Discovery::distance(source, target);
let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS }; let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS };
let mut found: BTreeMap<u32, Vec<&'b NodeId>> = BTreeMap::new(); let mut found: BTreeMap<u32, Vec<&'b NodeId>> = BTreeMap::new();
let mut count = 0; let mut count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d // if d is 0, then we roll look forward, if last, we reverse, else, spread from d
if head > 1 && tail != LAST_BIN { if head > 1 && tail != LAST_BIN {
while head != tail && head < NODE_BINS && count < BUCKET_SIZE while head != tail && head < NODE_BINS && count < BUCKET_SIZE
{ {
for n in buckets[head as usize].nodes.iter() for n in buckets[head as usize].nodes.iter()
{ {
if count < BUCKET_SIZE { if count < BUCKET_SIZE {
count += 1; count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
} }
else { else {
break; break;
} }
} }
if count < BUCKET_SIZE && tail != 0 { if count < BUCKET_SIZE && tail != 0 {
for n in buckets[tail as usize].nodes.iter() { for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE { if count < BUCKET_SIZE {
count += 1; count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
} }
else { else {
break; break;
} }
} }
} }
head += 1; head += 1;
if tail > 0 { if tail > 0 {
tail -= 1; tail -= 1;
} }
} }
} }
else if head < 2 { else if head < 2 {
while head < NODE_BINS && count < BUCKET_SIZE { while head < NODE_BINS && count < BUCKET_SIZE {
for n in buckets[head as usize].nodes.iter() { for n in buckets[head as usize].nodes.iter() {
if count < BUCKET_SIZE { if count < BUCKET_SIZE {
count += 1; count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
} }
else { else {
break; break;
} }
} }
head += 1; head += 1;
} }
} }
else { else {
while tail > 0 && count < BUCKET_SIZE { while tail > 0 && count < BUCKET_SIZE {
for n in buckets[tail as usize].nodes.iter() { for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE { if count < BUCKET_SIZE {
count += 1; count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
} }
else { else {
break; break;
} }
} }
tail -= 1; tail -= 1;
} }
} }
let mut ret:Vec<&NodeId> = Vec::new(); let mut ret:Vec<&NodeId> = Vec::new();
for (_, nodes) in found { for (_, nodes) in found {
for n in nodes { for n in nodes {
if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ { if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ {
ret.push(n); ret.push(n);
} }
} }
} }
ret ret
} }
} }

View File

@ -1,4 +1,3 @@
#![allow(dead_code)] //TODO: remove this after everything is done
//TODO: remove all unwraps //TODO: remove all unwraps
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::{HashMap}; use std::collections::{HashMap};
@ -16,7 +15,7 @@ use network::handshake::Handshake;
use network::session::{Session, SessionData}; use network::session::{Session, SessionData};
use network::{Error, ProtocolHandler}; use network::{Error, ProtocolHandler};
const DEFAULT_PORT: u16 = 30304; const _DEFAULT_PORT: u16 = 30304;
const MAX_CONNECTIONS: usize = 1024; const MAX_CONNECTIONS: usize = 1024;
const MAX_USER_TIMERS: usize = 32; const MAX_USER_TIMERS: usize = 32;
@ -54,13 +53,6 @@ pub struct NodeEndpoint {
} }
impl NodeEndpoint { impl NodeEndpoint {
fn new(address: SocketAddr) -> NodeEndpoint {
NodeEndpoint {
address: address,
address_str: address.to_string(),
udp_port: address.port()
}
}
fn from_str(s: &str) -> Result<NodeEndpoint, Error> { fn from_str(s: &str) -> Result<NodeEndpoint, Error> {
println!("{:?}", s); println!("{:?}", s);
let address = s.to_socket_addrs().map(|mut i| i.next()); let address = s.to_socket_addrs().map(|mut i| i.next());
@ -87,7 +79,6 @@ struct Node {
endpoint: NodeEndpoint, endpoint: NodeEndpoint,
peer_type: PeerType, peer_type: PeerType,
last_attempted: Option<Tm>, last_attempted: Option<Tm>,
confirmed: bool,
} }
impl FromStr for Node { impl FromStr for Node {
@ -105,23 +96,10 @@ impl FromStr for Node {
endpoint: endpoint, endpoint: endpoint,
peer_type: PeerType::Optional, peer_type: PeerType::Optional,
last_attempted: None, last_attempted: None,
confirmed: false
}) })
} }
} }
impl Node {
fn new(id: NodeId, address: SocketAddr, t:PeerType) -> Node {
Node {
id: id,
endpoint: NodeEndpoint::new(address),
peer_type: t,
last_attempted: None,
confirmed: false
}
}
}
impl PartialEq for Node { impl PartialEq for Node {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.id == other.id self.id == other.id
@ -168,9 +146,9 @@ pub enum HostMessage {
pub type UserMessageId = u32; pub type UserMessageId = u32;
pub struct UserMessage { pub struct UserMessage {
protocol: ProtocolId, pub protocol: ProtocolId,
id: UserMessageId, pub id: UserMessageId,
data: Option<Vec<u8>>, pub data: Option<Vec<u8>>,
} }
pub type PeerId = usize; pub type PeerId = usize;
@ -305,14 +283,13 @@ enum ConnectionEntry {
pub struct Host { pub struct Host {
info: HostInfo, info: HostInfo,
udp_socket: UdpSocket, _udp_socket: UdpSocket,
listener: TcpListener, _listener: TcpListener,
connections: Slab<ConnectionEntry>, connections: Slab<ConnectionEntry>,
timers: Slab<UserTimer>, timers: Slab<UserTimer>,
nodes: HashMap<NodeId, Node>, nodes: HashMap<NodeId, Node>,
handlers: HashMap<ProtocolId, Box<ProtocolHandler>>, handlers: HashMap<ProtocolId, Box<ProtocolHandler>>,
idle_timeout: Timeout, _idle_timeout: Timeout,
channel: Sender<HostMessage>,
} }
impl Host { impl Host {
@ -352,14 +329,13 @@ impl Host {
//capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], //capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }],
capabilities: Vec::new(), capabilities: Vec::new(),
}, },
udp_socket: udp_socket, _udp_socket: udp_socket,
listener: listener, _listener: listener,
connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS),
timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS),
nodes: HashMap::new(), nodes: HashMap::new(),
handlers: HashMap::new(), handlers: HashMap::new(),
idle_timeout: idle_timeout, _idle_timeout: idle_timeout,
channel: event_loop.channel(),
}; };
host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303");
@ -639,10 +615,16 @@ impl Handler for Host {
NODETABLE_DISCOVERY => {}, NODETABLE_DISCOVERY => {},
NODETABLE_MAINTAIN => {}, NODETABLE_MAINTAIN => {},
USER_TIMER ... LAST_USER_TIMER => { USER_TIMER ... LAST_USER_TIMER => {
let protocol = self.timers.get_mut(token).expect("Unknown user timer token").protocol; let (protocol, delay) = {
let timer = self.timers.get_mut(token).expect("Unknown user timer token");
(timer.protocol, timer.delay)
};
match self.handlers.get_mut(protocol) { match self.handlers.get_mut(protocol) {
None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
Some(h) => h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()), Some(h) => {
h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize());
event_loop.timeout_ms(token, delay).expect("Error re-registering user timer");
}
} }
} }
_ => panic!("Unknown timer token"), _ => panic!("Unknown timer token"),

View File

@ -90,7 +90,7 @@ pub trait ProtocolHandler: Send {
/// Called when a previously connected peer disconnects. /// Called when a previously connected peer disconnects.
fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId);
/// Timer function called after a timeout created with `HandlerIo::timeout`. /// Timer function called after a timeout created with `HandlerIo::timeout`.
fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken); fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) -> bool;
/// Called when a broadcasted message is received. The message can only be sent from a different protocol handler. /// Called when a broadcasted message is received. The message can only be sent from a different protocol handler.
fn message(&mut self, io: &mut HandlerIo, message: &Message); fn message(&mut self, io: &mut HandlerIo, message: &Message);
} }

View File

@ -1,5 +1,3 @@
#![allow(dead_code)] //TODO: remove this after everything is done
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use mio::*; use mio::*;
use network::{Error, ProtocolHandler}; use network::{Error, ProtocolHandler};