From 39f2dc9e2f80c269b05f551d85c98cc64db96059 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 4 Jan 2016 13:49:32 +0100 Subject: [PATCH] Style --- src/hash.rs | 2 +- src/network/discovery.rs | 295 ++++++++++++++++++++------------------- src/network/host.rs | 54 +++---- src/network/mod.rs | 2 +- src/network/service.rs | 2 - 5 files changed, 168 insertions(+), 187 deletions(-) diff --git a/src/hash.rs b/src/hash.rs index 6d2c4de95..995a6cf28 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -33,7 +33,7 @@ pub trait FixedHash: Sized + BytesConvertable { macro_rules! impl_hash { ($from: ident, $size: expr) => { - #[derive(Eq, Copy)] + #[derive(Eq)] pub struct $from (pub [u8; $size]); impl BytesConvertable for $from { diff --git a/src/network/discovery.rs b/src/network/discovery.rs index e8a342c22..bae52eb10 100644 --- a/src/network/discovery.rs +++ b/src/network/discovery.rs @@ -11,56 +11,57 @@ use hash::*; use crypto::*; use network::host::*; -const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. +const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes. const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia]. const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us). -const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover) -const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. +const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover) +const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. struct NodeBucket { - distance: u32, - nodes: Vec + distance: u32, + nodes: Vec } impl NodeBucket { - fn new(distance: u32) -> NodeBucket { - NodeBucket { - distance: distance, - nodes: Vec::new() - } - } + fn new(distance: u32) -> NodeBucket { + NodeBucket { + distance: distance, + nodes: Vec::new() + } + } } struct Discovery { - id: NodeId, - discovery_round: u16, - discovery_id: NodeId, - discovery_nodes: HashSet, - node_buckets: Vec, + id: NodeId, + discovery_round: u16, + discovery_id: NodeId, + discovery_nodes: HashSet, + node_buckets: Vec, } struct FindNodePacket; impl FindNodePacket { - fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket { - FindNodePacket - } - fn sign(&mut self, _secret: &Secret) { - } + fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket { + FindNodePacket + } - fn send(& self, _socket: &mut UdpSocket) { - } + fn sign(&mut self, _secret: &Secret) { + } + + fn send(& self, _socket: &mut UdpSocket) { + } } impl Discovery { pub fn new(id: &NodeId) -> Discovery { Discovery { id: id.clone(), - discovery_round: 0, - discovery_id: NodeId::new(), - discovery_nodes: HashSet::new(), - node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(), + discovery_round: 0, + discovery_id: NodeId::new(), + discovery_nodes: HashSet::new(), + node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(), } } @@ -68,137 +69,137 @@ impl Discovery { self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone()); } - fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { - self.discovery_round = 0; - self.discovery_id.randomize(); - self.discovery_nodes.clear(); - self.discover(event_loop); - } + fn start_node_discovery(&mut self, event_loop: &mut EventLoop) { + self.discovery_round = 0; + self.discovery_id.randomize(); + self.discovery_nodes.clear(); + self.discover(event_loop); + } - fn discover(&mut self, event_loop: &mut EventLoop) { - if self.discovery_round == DISCOVERY_MAX_STEPS - { - debug!("Restarting discovery"); - self.start_node_discovery(event_loop); - return; - } - let mut tried_count = 0; - { - let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter(); - let nodes = RefCell::new(&mut self.discovery_nodes); - let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA); - for r in nearest { - //let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id); - //p.sign(&self.secret); - //p.send(&mut self.udp_socket); - let mut borrowed = nodes.borrow_mut(); - borrowed.deref_mut().insert(r.clone()); - tried_count += 1; - } - } + fn discover(&mut self, event_loop: &mut EventLoop) { + if self.discovery_round == DISCOVERY_MAX_STEPS + { + debug!("Restarting discovery"); + self.start_node_discovery(event_loop); + return; + } + let mut tried_count = 0; + { + let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter(); + let nodes = RefCell::new(&mut self.discovery_nodes); + let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA); + for r in nearest { + //let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id); + //p.sign(&self.secret); + //p.send(&mut self.udp_socket); + let mut borrowed = nodes.borrow_mut(); + borrowed.deref_mut().insert(r.clone()); + tried_count += 1; + } + } - if tried_count == 0 - { - debug!("Restarting discovery"); - self.start_node_discovery(event_loop); - return; - } - self.discovery_round += 1; - //event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap(); - } + if tried_count == 0 + { + debug!("Restarting discovery"); + self.start_node_discovery(event_loop); + return; + } + self.discovery_round += 1; + //event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap(); + } fn distance(a: &NodeId, b: &NodeId) -> u32 { - let d = a.sha3() ^ b.sha3(); - let mut ret:u32 = 0; - for i in 0..32 { - let mut v: u8 = d[i]; - while v != 0 { - v >>= 1; - ret += 1; - } - } - ret - } + let d = a.sha3() ^ b.sha3(); + let mut ret:u32 = 0; + for i in 0..32 { + let mut v: u8 = d[i]; + while v != 0 { + v >>= 1; + ret += 1; + } + } + ret + } - fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec) -> Vec<&'b NodeId> - { - // send ALPHA FindNode packets to nodes we know, closest to target - const LAST_BIN: u32 = NODE_BINS - 1; - let mut head = Discovery::distance(source, target); - let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS }; + fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec) -> Vec<&'b NodeId> + { + // send ALPHA FindNode packets to nodes we know, closest to target + const LAST_BIN: u32 = NODE_BINS - 1; + let mut head = Discovery::distance(source, target); + let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS }; - let mut found: BTreeMap> = BTreeMap::new(); - let mut count = 0; + let mut found: BTreeMap> = BTreeMap::new(); + let mut count = 0; - // if d is 0, then we roll look forward, if last, we reverse, else, spread from d - if head > 1 && tail != LAST_BIN { - while head != tail && head < NODE_BINS && count < BUCKET_SIZE - { - for n in buckets[head as usize].nodes.iter() - { - if count < BUCKET_SIZE { + // if d is 0, then we roll look forward, if last, we reverse, else, spread from d + if head > 1 && tail != LAST_BIN { + while head != tail && head < NODE_BINS && count < BUCKET_SIZE + { + for n in buckets[head as usize].nodes.iter() + { + if count < BUCKET_SIZE { count += 1; - found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); - } - else { - break; - } - } - if count < BUCKET_SIZE && tail != 0 { - for n in buckets[tail as usize].nodes.iter() { - if count < BUCKET_SIZE { + found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); + } + else { + break; + } + } + if count < BUCKET_SIZE && tail != 0 { + for n in buckets[tail as usize].nodes.iter() { + if count < BUCKET_SIZE { count += 1; - found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); - } - else { - break; - } - } - } + found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); + } + else { + break; + } + } + } - head += 1; - if tail > 0 { - tail -= 1; - } - } - } - else if head < 2 { - while head < NODE_BINS && count < BUCKET_SIZE { - for n in buckets[head as usize].nodes.iter() { - if count < BUCKET_SIZE { + head += 1; + if tail > 0 { + tail -= 1; + } + } + } + else if head < 2 { + while head < NODE_BINS && count < BUCKET_SIZE { + for n in buckets[head as usize].nodes.iter() { + if count < BUCKET_SIZE { count += 1; - found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); - } - else { - break; - } - } - head += 1; - } - } - else { - while tail > 0 && count < BUCKET_SIZE { - for n in buckets[tail as usize].nodes.iter() { - if count < BUCKET_SIZE { + found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); + } + else { + break; + } + } + head += 1; + } + } + else { + while tail > 0 && count < BUCKET_SIZE { + for n in buckets[tail as usize].nodes.iter() { + if count < BUCKET_SIZE { count += 1; - found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); - } - else { - break; - } - } - tail -= 1; - } - } + found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n); + } + else { + break; + } + } + tail -= 1; + } + } - let mut ret:Vec<&NodeId> = Vec::new(); - for (_, nodes) in found { - for n in nodes { - if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ { - ret.push(n); - } - } - } - ret - } + let mut ret:Vec<&NodeId> = Vec::new(); + for (_, nodes) in found { + for n in nodes { + if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ { + ret.push(n); + } + } + } + ret + } } diff --git a/src/network/host.rs b/src/network/host.rs index 3d8e78e46..1c2a66150 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] //TODO: remove this after everything is done //TODO: remove all unwraps use std::net::{SocketAddr, ToSocketAddrs}; use std::collections::{HashMap}; @@ -16,7 +15,7 @@ use network::handshake::Handshake; use network::session::{Session, SessionData}; use network::{Error, ProtocolHandler}; -const DEFAULT_PORT: u16 = 30304; +const _DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; const MAX_USER_TIMERS: usize = 32; @@ -54,13 +53,6 @@ pub struct NodeEndpoint { } impl NodeEndpoint { - fn new(address: SocketAddr) -> NodeEndpoint { - NodeEndpoint { - address: address, - address_str: address.to_string(), - udp_port: address.port() - } - } fn from_str(s: &str) -> Result { println!("{:?}", s); let address = s.to_socket_addrs().map(|mut i| i.next()); @@ -87,7 +79,6 @@ struct Node { endpoint: NodeEndpoint, peer_type: PeerType, last_attempted: Option, - confirmed: bool, } impl FromStr for Node { @@ -105,23 +96,10 @@ impl FromStr for Node { endpoint: endpoint, peer_type: PeerType::Optional, last_attempted: None, - confirmed: false }) } } -impl Node { - fn new(id: NodeId, address: SocketAddr, t:PeerType) -> Node { - Node { - id: id, - endpoint: NodeEndpoint::new(address), - peer_type: t, - last_attempted: None, - confirmed: false - } - } -} - impl PartialEq for Node { fn eq(&self, other: &Self) -> bool { self.id == other.id @@ -168,9 +146,9 @@ pub enum HostMessage { pub type UserMessageId = u32; pub struct UserMessage { - protocol: ProtocolId, - id: UserMessageId, - data: Option>, + pub protocol: ProtocolId, + pub id: UserMessageId, + pub data: Option>, } pub type PeerId = usize; @@ -305,14 +283,13 @@ enum ConnectionEntry { pub struct Host { info: HostInfo, - udp_socket: UdpSocket, - listener: TcpListener, + _udp_socket: UdpSocket, + _listener: TcpListener, connections: Slab, timers: Slab, nodes: HashMap, handlers: HashMap>, - idle_timeout: Timeout, - channel: Sender, + _idle_timeout: Timeout, } impl Host { @@ -352,14 +329,13 @@ impl Host { //capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], capabilities: Vec::new(), }, - udp_socket: udp_socket, - listener: listener, + _udp_socket: udp_socket, + _listener: listener, connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), nodes: HashMap::new(), handlers: HashMap::new(), - idle_timeout: idle_timeout, - channel: event_loop.channel(), + _idle_timeout: idle_timeout, }; host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); @@ -639,10 +615,16 @@ impl Handler for Host { NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, USER_TIMER ... LAST_USER_TIMER => { - let protocol = self.timers.get_mut(token).expect("Unknown user timer token").protocol; + let (protocol, delay) = { + let timer = self.timers.get_mut(token).expect("Unknown user timer token"); + (timer.protocol, timer.delay) + }; match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, - Some(h) => h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()), + Some(h) => { + h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()); + event_loop.timeout_ms(token, delay).expect("Error re-registering user timer"); + } } } _ => panic!("Unknown timer token"), diff --git a/src/network/mod.rs b/src/network/mod.rs index c44182430..839624b01 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -90,7 +90,7 @@ pub trait ProtocolHandler: Send { /// Called when a previously connected peer disconnects. fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); /// Timer function called after a timeout created with `HandlerIo::timeout`. - fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken); + fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) -> bool; /// Called when a broadcasted message is received. The message can only be sent from a different protocol handler. fn message(&mut self, io: &mut HandlerIo, message: &Message); } diff --git a/src/network/service.rs b/src/network/service.rs index 986d6dd11..7598ffdd6 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] //TODO: remove this after everything is done - use std::thread::{self, JoinHandle}; use mio::*; use network::{Error, ProtocolHandler};