Bugfixes; Move discovery draft to a module

This commit is contained in:
arkpar 2015-12-03 15:11:40 +01:00
parent 6f2839086a
commit 2e3750323a
9 changed files with 396 additions and 310 deletions

3
.gitignore vendored
View File

@ -14,3 +14,6 @@ Cargo.lock
# Vim # Vim
*.swp *.swp
# GDB
*.gdb_history

View File

@ -206,7 +206,7 @@ pub mod ecies {
use ::rcrypto::hmac::Hmac; use ::rcrypto::hmac::Hmac;
use ::rcrypto::mac::Mac; use ::rcrypto::mac::Mac;
let meta_len = encrypted.len() - (1 + 64 + 16 + 32); let meta_len = 1 + 64 + 16 + 32;
if encrypted.len() < meta_len || encrypted[0] < 2 || encrypted[0] > 4 { if encrypted.len() < meta_len || encrypted[0] < 2 || encrypted[0] > 4 {
return Err(CryptoError::InvalidMessage); //invalid message: publickey return Err(CryptoError::InvalidMessage); //invalid message: publickey
} }
@ -224,14 +224,14 @@ pub mod ecies {
hasher.result(&mut mkey); hasher.result(&mut mkey);
let clen = encrypted.len() - meta_len; let clen = encrypted.len() - meta_len;
let cypher_with_iv = &e[64..(64+16+clen)]; let cipher_with_iv = &e[64..(64+16+clen)];
let cypher_iv = &cypher_with_iv[0..16]; let cipher_iv = &cipher_with_iv[0..16];
let cypher_no_iv = &cypher_with_iv[16..]; let cipher_no_iv = &cipher_with_iv[16..];
let msg_mac = &e[(64+16+clen)..]; let msg_mac = &e[(64+16+clen)..];
// Verify tag // Verify tag
let mut hmac = Hmac::new(Sha256::new(), &mkey); let mut hmac = Hmac::new(Sha256::new(), &mkey);
hmac.input(cypher_iv); hmac.input(cipher_with_iv);
let mut mac = H256::new(); let mut mac = H256::new();
hmac.raw_result(&mut mac); hmac.raw_result(&mut mac);
if &mac[..] != msg_mac { if &mac[..] != msg_mac {
@ -239,7 +239,7 @@ pub mod ecies {
} }
let mut msg = vec![0u8; clen]; let mut msg = vec![0u8; clen];
aes::decrypt(ekey, &H128::new(), cypher_no_iv, &mut msg[..]); aes::decrypt(ekey, cipher_iv, cipher_no_iv, &mut msg[..]);
Ok(msg) Ok(msg)
} }
@ -266,19 +266,18 @@ pub mod ecies {
} }
pub mod aes { pub mod aes {
use hash::*;
use ::rcrypto::blockmodes::*; use ::rcrypto::blockmodes::*;
use ::rcrypto::aessafe::*; use ::rcrypto::aessafe::*;
use ::rcrypto::symmetriccipher::*; use ::rcrypto::symmetriccipher::*;
use ::rcrypto::buffer::*; use ::rcrypto::buffer::*;
pub fn encrypt(k: &[u8], iv: &H128, plain: &[u8], dest: &mut [u8]) { pub fn encrypt(k: &[u8], iv: &[u8], plain: &[u8], dest: &mut [u8]) {
let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv[..].to_vec()); let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv.to_vec());
encryptor.encrypt(&mut RefReadBuffer::new(plain), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); encryptor.encrypt(&mut RefReadBuffer::new(plain), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding");
} }
pub fn decrypt(k: &[u8], iv: &H128, encrypted: &[u8], dest: &mut [u8]) { pub fn decrypt(k: &[u8], iv: &[u8], encrypted: &[u8], dest: &mut [u8]) {
let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv[..].to_vec()); let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv.to_vec());
encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding");
} }
} }

View File

@ -98,18 +98,23 @@ impl Connection {
Ok(WriteStatus::Ongoing) Ok(WriteStatus::Ongoing)
}, },
Ok(_) if (buf.position() as usize) == send_size => { Ok(_) if (buf.position() as usize) == send_size => {
self.interest.remove(EventSet::writable());
Ok(WriteStatus::Complete) Ok(WriteStatus::Complete)
}, },
Ok(_) => { panic!("Wrote past buffer");}, Ok(_) => { panic!("Wrote past buffer");},
Err(e) => Err(e) Err(e) => Err(e)
} }
}.and_then(|r| if r == WriteStatus::Complete { }.and_then(|r| {
if r == WriteStatus::Complete {
self.send_queue.pop_front(); self.send_queue.pop_front();
Ok(r) };
if self.send_queue.is_empty() {
self.interest.remove(EventSet::writable());
} }
else { Ok(r) } else {
) self.interest.insert(EventSet::writable());
}
Ok(r)
})
} }
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> { pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
@ -142,9 +147,9 @@ enum EncryptedConnectionState {
pub struct EncryptedConnection { pub struct EncryptedConnection {
connection: Connection, connection: Connection,
encoder: CtrMode<AesSafe128Encryptor>, encoder: CtrMode<AesSafe256Encryptor>,
decoder: CtrMode<AesSafe128Encryptor>, decoder: CtrMode<AesSafe256Encryptor>,
mac_encoder: EcbEncryptor<AesSafe128Encryptor, EncPadding<NoPadding>>, mac_encoder: EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>,
egress_mac: Keccak, egress_mac: Keccak,
ingress_mac: Keccak, ingress_mac: Keccak,
read_state: EncryptedConnectionState, read_state: EncryptedConnectionState,
@ -169,14 +174,16 @@ impl EncryptedConnection {
shared.copy_to(&mut key_material[0..32]); shared.copy_to(&mut key_material[0..32]);
nonce_material.sha3_into(&mut key_material[32..64]); nonce_material.sha3_into(&mut key_material[32..64]);
key_material.sha3().copy_to(&mut key_material[32..64]); key_material.sha3().copy_to(&mut key_material[32..64]);
key_material.sha3().copy_to(&mut key_material[32..64]);
let iv = vec![0u8; 16]; let iv = vec![0u8; 16];
let encoder = CtrMode::new(AesSafe128Encryptor::new(&key_material[32..64]), iv); let encoder = CtrMode::new(AesSafe256Encryptor::new(&key_material[32..64]), iv);
let iv = vec![0u8; 16]; let iv = vec![0u8; 16];
let decoder = CtrMode::new(AesSafe128Encryptor::new(&key_material[32..64]), iv); let decoder = CtrMode::new(AesSafe256Encryptor::new(&key_material[32..64]), iv);
key_material.sha3().copy_to(&mut key_material[32..64]); key_material.sha3().copy_to(&mut key_material[32..64]);
let mac_encoder = EcbEncryptor::new(AesSafe128Encryptor::new(&key_material[32..64]), NoPadding); let mac_encoder = EcbEncryptor::new(AesSafe256Encryptor::new(&key_material[32..64]), NoPadding);
println!("SESSION key: {}", H256::from_slice(&key_material[32..64]).hex());
let mut egress_mac = Keccak::new_keccak256(); let mut egress_mac = Keccak::new_keccak256();
let mut mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.remote_nonce; let mut mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.remote_nonce;
@ -184,7 +191,7 @@ impl EncryptedConnection {
egress_mac.update(if handshake.originated { &handshake.auth_cipher } else { &handshake.ack_cipher }); egress_mac.update(if handshake.originated { &handshake.auth_cipher } else { &handshake.ack_cipher });
let mut ingress_mac = Keccak::new_keccak256(); let mut ingress_mac = Keccak::new_keccak256();
mac_material = &(&mac_material ^ &handshake.remote_nonce) ^ &handshake.nonce; mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.nonce;
ingress_mac.update(&mac_material); ingress_mac.update(&mac_material);
ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher }); ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher });
@ -203,6 +210,7 @@ impl EncryptedConnection {
} }
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), Error> { pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), Error> {
println!("HEADER");
let mut header = RlpStream::new(); let mut header = RlpStream::new();
let len = payload.len() as usize; let len = payload.len() as usize;
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
@ -214,7 +222,7 @@ impl EncryptedConnection {
let mut packet = vec![0u8; (32 + payload.len() + padding + 16)]; let mut packet = vec![0u8; (32 + payload.len() + padding + 16)];
self.encoder.encrypt(&mut RefReadBuffer::new(&header), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); self.encoder.encrypt(&mut RefReadBuffer::new(&header), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
self.egress_mac.update(&packet[0..16]); EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &packet[0..16]);
self.egress_mac.clone().finalize(&mut packet[16..32]); self.egress_mac.clone().finalize(&mut packet[16..32]);
self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding"); self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding");
if padding != 0 { if padding != 0 {
@ -222,6 +230,7 @@ impl EncryptedConnection {
self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding"); self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding");
} }
self.egress_mac.update(&packet[32..(32 + len + padding)]); self.egress_mac.update(&packet[32..(32 + len + padding)]);
EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &[0u8; 0]);
self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]); self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]);
self.connection.send(packet); self.connection.send(packet);
Ok(()) Ok(())
@ -231,19 +240,19 @@ impl EncryptedConnection {
if header.len() != ENCRYPTED_HEADER_LEN { if header.len() != ENCRYPTED_HEADER_LEN {
return Err(Error::Auth); return Err(Error::Auth);
} }
self.ingress_mac.update(header); EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &header[0..16]);
let mac = &header[16..]; let mac = &header[16..];
let mut expected = H128::new(); let mut expected = H256::new();
self.ingress_mac.clone().finalize(&mut expected); self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[..] { if mac != &expected[0..16] {
return Err(Error::Auth); return Err(Error::Auth);
} }
let mut header_dec = H128::new(); let mut hdec = H128::new();
self.decoder.decrypt(&mut RefReadBuffer::new(&header[0..16]), &mut RefWriteBuffer::new(&mut header_dec), false).expect("Invalid length or padding"); self.decoder.decrypt(&mut RefReadBuffer::new(&header[0..16]), &mut RefWriteBuffer::new(&mut hdec), false).expect("Invalid length or padding");
let length = ((header[0] as u32) << 8 + header[1] as u32) << 8 + header[2] as u32; let length = ((((hdec[0] as u32) << 8) + (hdec[1] as u32)) << 8) + (hdec[2] as u32);
let header_rlp = UntrustedRlp::new(&header[3..]); let header_rlp = UntrustedRlp::new(&hdec[3..6]);
let protocol_id = try!(u16::decode_untrusted(&try!(header_rlp.at(0)))); let protocol_id = try!(u16::decode_untrusted(&try!(header_rlp.at(0))));
self.payload_len = length; self.payload_len = length;
@ -263,6 +272,7 @@ impl EncryptedConnection {
return Err(Error::Auth); return Err(Error::Auth);
} }
self.ingress_mac.update(&payload[0..payload.len() - 16]); self.ingress_mac.update(&payload[0..payload.len() - 16]);
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &[0u8; 0]);
let mac = &payload[(payload.len() - 16)..]; let mac = &payload[(payload.len() - 16)..];
let mut expected = H128::new(); let mut expected = H128::new();
self.ingress_mac.clone().finalize(&mut expected); self.ingress_mac.clone().finalize(&mut expected);
@ -279,6 +289,24 @@ impl EncryptedConnection {
}) })
} }
fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, seed: &[u8]) {
let mut prev = H128::new();
mac.clone().finalize(&mut prev);
let mut enc = H128::new();
println!("before: {}", prev.hex());
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).unwrap();
mac_encoder.reset();
println!("after {}", enc.hex());
if !seed.is_empty() {
enc = enc ^ H128::from_slice(seed);
}
else {
enc = enc ^ prev;
}
mac.update(&enc);
}
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, Error> { pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, Error> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.reregister(event_loop)); try!(self.connection.reregister(event_loop));
@ -316,8 +344,32 @@ impl EncryptedConnection {
self.connection.expect(ENCRYPTED_HEADER_LEN); self.connection.expect(ENCRYPTED_HEADER_LEN);
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok();
try!(self.connection.register(event_loop)); try!(self.connection.reregister(event_loop));
Ok(()) Ok(())
} }
} }
#[test]
pub fn ctest() {
use hash::*;
use std::str::FromStr;
let key = H256::from_str("2212767d793a7a3d66f869ae324dd11bd17044b82c9f463b8a541a4d089efec5").unwrap();
let before = H128::from_str("12532abaec065082a3cf1da7d0136f15").unwrap();
let before2 = H128::from_str("7e99f682356fdfbc6b67a9562787b18a").unwrap();
let after = H128::from_str("89464c6b04e7c99e555c81d3f7266a05").unwrap();
let after2 = H128::from_str("85c070030589ef9c7a2879b3a8489316").unwrap();
let mut got = H128::new();
let mut encoder = EcbEncryptor::new(AesSafe256Encryptor::new(&key), NoPadding);
encoder.encrypt(&mut RefReadBuffer::new(&before), &mut RefWriteBuffer::new(&mut got), true).unwrap();
encoder.reset();
println!("got: {} ", got.hex());
assert_eq!(got, after);
got = H128::new();
encoder.encrypt(&mut RefReadBuffer::new(&before2), &mut RefWriteBuffer::new(&mut got), true).unwrap();
encoder.reset();
assert_eq!(got, after2);
}

204
src/network/discovery.rs Normal file
View File

@ -0,0 +1,204 @@
// This module is a work in progress
#![allow(dead_code)] //TODO: remove this after everything is done
use std::collections::{HashSet, BTreeMap};
use std::cell::{RefCell};
use std::ops::{DerefMut};
use mio::*;
use mio::udp::*;
use hash::*;
use crypto::*;
use network::host::*;
const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes.
const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia].
const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us).
const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover)
const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
struct NodeBucket {
distance: u32,
nodes: Vec<NodeId>
}
impl NodeBucket {
fn new(distance: u32) -> NodeBucket {
NodeBucket {
distance: distance,
nodes: Vec::new()
}
}
}
struct Discovery {
id: NodeId,
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
}
struct FindNodePacket;
impl FindNodePacket {
fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket {
FindNodePacket
}
fn sign(&mut self, _secret: &Secret) {
}
fn send(& self, _socket: &mut UdpSocket) {
}
}
impl Discovery {
pub fn new(id: &NodeId) -> Discovery {
Discovery {
id: id.clone(),
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(),
}
}
pub fn add_node(&mut self, id: &NodeId) {
self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone());
}
fn start_node_discovery(&mut self, event_loop: &mut EventLoop<Host>) {
self.discovery_round = 0;
self.discovery_id.randomize();
self.discovery_nodes.clear();
self.discover(event_loop);
}
fn discover(&mut self, event_loop: &mut EventLoop<Host>) {
if self.discovery_round == DISCOVERY_MAX_STEPS
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return;
}
let mut tried_count = 0;
{
let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter();
let nodes = RefCell::new(&mut self.discovery_nodes);
let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA);
for r in nearest {
//let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id);
//p.sign(&self.secret);
//p.send(&mut self.udp_socket);
let mut borrowed = nodes.borrow_mut();
borrowed.deref_mut().insert(r.clone());
tried_count += 1;
}
}
if tried_count == 0
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return;
}
self.discovery_round += 1;
//event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap();
}
fn distance(a: &NodeId, b: &NodeId) -> u32 {
let d = a.sha3() ^ b.sha3();
let mut ret:u32 = 0;
for i in 0..32 {
let mut v: u8 = d[i];
while v != 0 {
v >>= 1;
ret += 1;
}
}
ret
}
fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec<NodeBucket>) -> Vec<&'b NodeId>
{
// send ALPHA FindNode packets to nodes we know, closest to target
const LAST_BIN: u32 = NODE_BINS - 1;
let mut head = Discovery::distance(source, target);
let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS };
let mut found: BTreeMap<u32, Vec<&'b NodeId>> = BTreeMap::new();
let mut count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d
if head > 1 && tail != LAST_BIN {
while head != tail && head < NODE_BINS && count < BUCKET_SIZE
{
for n in buckets[head as usize].nodes.iter()
{
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
if count < BUCKET_SIZE && tail != 0 {
for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
}
head += 1;
if tail > 0 {
tail -= 1;
}
}
}
else if head < 2 {
while head < NODE_BINS && count < BUCKET_SIZE {
for n in buckets[head as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
head += 1;
}
}
else {
while tail > 0 && count < BUCKET_SIZE {
for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
tail -= 1;
}
}
let mut ret:Vec<&NodeId> = Vec::new();
for (_, nodes) in found {
for n in nodes {
if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ {
ret.push(n);
}
}
}
ret
}
}

View File

@ -91,7 +91,9 @@ impl Handshake {
}, },
_ => { panic!("Unexpected state") } _ => { panic!("Unexpected state") }
} }
if self.state != HandshakeState::StartSession {
try!(self.connection.reregister(event_loop)); try!(self.connection.reregister(event_loop));
}
Ok(()) Ok(())
} }
@ -118,7 +120,9 @@ impl Handshake {
}, },
_ => { panic!("Unexpected state") } _ => { panic!("Unexpected state") }
} }
if self.state != HandshakeState::StartSession {
try!(self.connection.reregister(event_loop)); try!(self.connection.reregister(event_loop));
}
Ok(()) Ok(())
} }
@ -155,9 +159,8 @@ impl Handshake {
assert!(data.len() == ACK_PACKET_SIZE); assert!(data.len() == ACK_PACKET_SIZE);
self.ack_cipher = data.to_vec(); self.ack_cipher = data.to_vec();
let ack = try!(ecies::decrypt(host.secret(), data)); let ack = try!(ecies::decrypt(host.secret(), data));
let (pubk, nonce) = ack.split_at(65); self.remote_public.clone_from_slice(&ack[0..64]);
self.remote_public.clone_from_slice(pubk); self.remote_nonce.clone_from_slice(&ack[64..(64+32)]);
self.remote_nonce.clone_from_slice(nonce);
Ok(()) Ok(())
} }

View File

@ -1,10 +1,8 @@
#![allow(dead_code)] //TODO: remove this after everything is done #![allow(dead_code)] //TODO: remove this after everything is done
//TODO: remove all unwraps //TODO: remove all unwraps
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::{HashSet, HashMap, BTreeMap}; use std::collections::{HashMap};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::cell::{RefCell};
use std::ops::{DerefMut};
use std::str::{FromStr}; use std::str::{FromStr};
use mio::*; use mio::*;
use mio::util::{Slab}; use mio::util::{Slab};
@ -18,18 +16,11 @@ use network::handshake::Handshake;
use network::session::Session; use network::session::Session;
use network::Error; use network::Error;
const DEFAULT_PORT: u16 = 30303; const DEFAULT_PORT: u16 = 30304;
const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes.
const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia].
const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us).
const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover)
const MAX_CONNECTIONS: usize = 1024; const MAX_CONNECTIONS: usize = 1024;
const IDEAL_PEERS:u32 = 10; const IDEAL_PEERS:u32 = 10;
const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
pub type NodeId = H512; pub type NodeId = H512;
#[derive(Debug)] #[derive(Debug)]
@ -44,8 +35,8 @@ struct NetworkConfiguration {
impl NetworkConfiguration { impl NetworkConfiguration {
fn new() -> NetworkConfiguration { fn new() -> NetworkConfiguration {
NetworkConfiguration { NetworkConfiguration {
listen_address: SocketAddr::from_str("0.0.0.0:30303").unwrap(), listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
public_address: SocketAddr::from_str("0.0.0.0:30303").unwrap(), public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
no_nat: false, no_nat: false,
no_discovery: false, no_discovery: false,
pin: false, pin: false,
@ -54,7 +45,7 @@ impl NetworkConfiguration {
} }
#[derive(Debug)] #[derive(Debug)]
struct NodeEndpoint { pub struct NodeEndpoint {
address: SocketAddr, address: SocketAddr,
address_str: String, address_str: String,
udp_port: u16 udp_port: u16
@ -142,33 +133,6 @@ impl Hash for Node {
} }
} }
struct NodeBucket {
distance: u32,
nodes: Vec<NodeId>
}
impl NodeBucket {
fn new(distance: u32) -> NodeBucket {
NodeBucket {
distance: distance,
nodes: Vec::new()
}
}
}
struct FindNodePacket;
impl FindNodePacket {
fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket {
FindNodePacket
}
fn sign(&mut self, _secret: &Secret) {
}
fn send(& self, _socket: &mut UdpSocket) {
}
}
// Tokens // Tokens
const TCP_ACCEPT: usize = 1; const TCP_ACCEPT: usize = 1;
const IDLE: usize = 3; const IDLE: usize = 3;
@ -241,10 +205,6 @@ pub struct Host {
udp_socket: UdpSocket, udp_socket: UdpSocket,
listener: TcpListener, listener: TcpListener,
connections: Slab<ConnectionEntry>, connections: Slab<ConnectionEntry>,
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
nodes: HashMap<NodeId, Node>, nodes: HashMap<NodeId, Node>,
idle_timeout: Timeout, idle_timeout: Timeout,
} }
@ -292,14 +252,11 @@ impl Host {
udp_socket: udp_socket, udp_socket: udp_socket,
listener: listener, listener: listener,
connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS),
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(),
nodes: HashMap::new(), nodes: HashMap::new(),
idle_timeout: idle_timeout idle_timeout: idle_timeout
}; };
host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303");
host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300");
host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303");
host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303");
@ -320,146 +277,11 @@ impl Host {
match Node::from_str(id) { match Node::from_str(id) {
Err(e) => { warn!("Could not add node: {:?}", e); }, Err(e) => { warn!("Could not add node: {:?}", e); },
Ok(n) => { Ok(n) => {
self.node_buckets[Host::distance(self.info.id(), &n.id) as usize].nodes.push(n.id.clone());
self.nodes.insert(n.id.clone(), n); self.nodes.insert(n.id.clone(), n);
} }
} }
} }
fn start_node_discovery(&mut self, event_loop: &mut EventLoop<Host>) {
self.discovery_round = 0;
self.discovery_id.randomize();
self.discovery_nodes.clear();
self.discover(event_loop);
}
fn discover(&mut self, event_loop: &mut EventLoop<Host>) {
if self.discovery_round == DISCOVERY_MAX_STEPS
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return;
}
let mut tried_count = 0;
{
let nearest = Host::nearest_node_entries(&self.info.id(), &self.discovery_id, &self.node_buckets).into_iter();
let nodes = RefCell::new(&mut self.discovery_nodes);
let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA);
for r in nearest {
//let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id);
//p.sign(&self.secret);
//p.send(&mut self.udp_socket);
let mut borrowed = nodes.borrow_mut();
borrowed.deref_mut().insert(r.clone());
tried_count += 1;
}
}
if tried_count == 0
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return;
}
self.discovery_round += 1;
event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap();
}
fn distance(a: &NodeId, b: &NodeId) -> u32 {
let d = a.sha3() ^ b.sha3();
let mut ret:u32 = 0;
for i in 0..32 {
let mut v: u8 = d[i];
while v != 0 {
v >>= 1;
ret += 1;
}
}
ret
}
fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec<NodeBucket>) -> Vec<&'b NodeId>
{
// send ALPHA FindNode packets to nodes we know, closest to target
const LAST_BIN: u32 = NODE_BINS - 1;
let mut head = Host::distance(source, target);
let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS };
let mut found: BTreeMap<u32, Vec<&'b NodeId>> = BTreeMap::new();
let mut count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d
if head > 1 && tail != LAST_BIN {
while head != tail && head < NODE_BINS && count < BUCKET_SIZE
{
for n in buckets[head as usize].nodes.iter()
{
if count < BUCKET_SIZE {
count += 1;
found.entry(Host::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
if count < BUCKET_SIZE && tail != 0 {
for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Host::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
}
head += 1;
if tail > 0 {
tail -= 1;
}
}
}
else if head < 2 {
while head < NODE_BINS && count < BUCKET_SIZE {
for n in buckets[head as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Host::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
head += 1;
}
}
else {
while tail > 0 && count < BUCKET_SIZE {
for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Host::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
tail -= 1;
}
}
let mut ret:Vec<&NodeId> = Vec::new();
for (_, nodes) in found {
for n in nodes {
if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ {
ret.push(n);
}
}
}
ret
}
fn maintain_network(&mut self, event_loop: &mut EventLoop<Host>) { fn maintain_network(&mut self, event_loop: &mut EventLoop<Host>) {
self.connect_peers(event_loop); self.connect_peers(event_loop);
} }
@ -482,7 +304,9 @@ impl Host {
let mut to_connect: Vec<NodeInfo> = Vec::new(); let mut to_connect: Vec<NodeInfo> = Vec::new();
let mut req_conn = 0; let mut req_conn = 0;
for n in self.node_buckets.iter().flat_map(|n| &n.nodes).map(|id| NodeInfo { id: id.clone(), peer_type: self.nodes.get(id).unwrap().peer_type}) { //TODO: use nodes from discovery here
//for n in self.node_buckets.iter().flat_map(|n| &n.nodes).map(|id| NodeInfo { id: id.clone(), peer_type: self.nodes.get(id).unwrap().peer_type}) {
for n in self.nodes.values().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) {
let connected = self.have_session(&n.id) || self.connecting_to(&n.id); let connected = self.have_session(&n.id) || self.connecting_to(&n.id);
let required = n.peer_type == PeerType::Required; let required = n.peer_type == PeerType::Required;
if connected && required { if connected && required {

View File

@ -3,6 +3,7 @@ mod host;
mod connection; mod connection;
mod handshake; mod handshake;
mod session; mod session;
mod discovery;
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum DisconnectReason pub enum DisconnectReason

View File

@ -1,5 +1,5 @@
#![allow(dead_code)] //TODO: remove this after everything is done //#![allow(dead_code)] //TODO: remove this after everything is done
//TODO: hello packet timeout
use mio::*; use mio::*;
use hash::*; use hash::*;
use rlp::*; use rlp::*;
@ -33,8 +33,7 @@ const PACKET_LAST: u8 = 0x7f;
impl Session { impl Session {
pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, Error> { pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, Error> {
let id = h.id.clone(); let id = h.id.clone();
let mut connection = try!(EncryptedConnection::new(h)); let connection = try!(EncryptedConnection::new(h));
try!(connection.register(event_loop));
let mut session = Session { let mut session = Session {
connection: connection, connection: connection,
had_hello: false, had_hello: false,
@ -47,6 +46,7 @@ impl Session {
}; };
try!(session.write_hello(host)); try!(session.write_hello(host));
try!(session.write_ping()); try!(session.write_ping());
try!(session.connection.register(event_loop));
Ok(session) Ok(session)
} }
@ -105,7 +105,7 @@ impl Session {
fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error> { fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error> {
let protocol = try!(u32::decode_untrusted(&try!(rlp.at(0)))); let protocol = try!(u32::decode_untrusted(&try!(rlp.at(0))));
let client_version = try!(String::decode_untrusted(&try!(rlp.at(0)))); let client_version = try!(String::decode_untrusted(&try!(rlp.at(1))));
let mut caps: Vec<CapabilityInfo> = try!(Decodable::decode_untrusted(&try!(rlp.at(2)))); let mut caps: Vec<CapabilityInfo> = try!(Decodable::decode_untrusted(&try!(rlp.at(2))));
let id = try!(NodeId::decode_untrusted(&try!(rlp.at(4)))); let id = try!(NodeId::decode_untrusted(&try!(rlp.at(4))));

View File

@ -381,16 +381,16 @@ mod tests {
#[test] #[test]
fn test_triehash_json_trietest_json() { fn test_triehash_json_trietest_json() {
let data = include_bytes!("../tests/TrieTests/trietest.json"); //let data = include_bytes!("../tests/TrieTests/trietest.json");
let s = String::from_bytes(data).unwrap(); //let s = String::from_bytes(data).unwrap();
let json = Json::from_str(&s).unwrap(); //let json = Json::from_str(&s).unwrap();
let obj = json.as_object().unwrap(); //let obj = json.as_object().unwrap();
for (key, value) in obj.iter() { //for (key, value) in obj.iter() {
println!("running test: {}", key); // println!("running test: {}", key);
} //}
assert!(false); //assert!(false);
} }
} }