Move ethcore files back into root.

This commit is contained in:
Gav Wood
2016-01-17 13:11:25 +01:00
parent 9b87bae322
commit 6ea8eaa3b5
159 changed files with 511 additions and 511 deletions

View File

@@ -0,0 +1,410 @@
use std::collections::VecDeque;
use mio::{Handler, Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite};
use mio::tcp::*;
use hash::*;
use sha3::*;
use bytes::*;
use rlp::*;
use std::io::{self, Cursor, Read};
use error::*;
use network::error::NetworkError;
use network::handshake::Handshake;
use crypto;
use rcrypto::blockmodes::*;
use rcrypto::aessafe::*;
use rcrypto::symmetriccipher::*;
use rcrypto::buffer::*;
use tiny_keccak::Keccak;
const ENCRYPTED_HEADER_LEN: usize = 32;
/// Low level tcp connection
pub struct Connection {
/// Connection id (token)
pub token: Token,
/// Network socket
pub socket: TcpStream,
/// Receive buffer
rec_buf: Bytes,
/// Expected size
rec_size: usize,
/// Send out packets FIFO
send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet,
}
/// Connection write status.
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing,
/// All data sent.
Complete
}
impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: Token, socket: TcpStream) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup(),
}
}
/// Put a connection into read mode. Receiving up `size` bytes of data.
pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start");
}
unsafe { self.rec_buf.set_len(0) }
self.rec_size = size;
}
/// Readable IO handler. Called when there is some data to be read.
//TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
warn!(target:"net", "Unexpected connection read");
}
let max = self.rec_size - self.rec_buf.len();
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(_)) if self.rec_buf.len() == self.rec_size => {
self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
},
Ok(_) => Ok(None),
Err(e) => Err(e),
}
}
/// Add a packet to send queue.
pub fn send(&mut self, data: Bytes) {
if data.len() != 0 {
self.send_queue.push_back(Cursor::new(data));
}
if !self.interest.is_writable() {
self.interest.insert(EventSet::writable());
}
}
/// Writable IO handler. Called when the socket is ready to send.
pub fn writable(&mut self) -> io::Result<WriteStatus> {
if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete)
}
{
let buf = self.send_queue.front_mut().unwrap();
let send_size = buf.get_ref().len();
if (buf.position() as usize) >= send_size {
warn!(target:"net", "Unexpected connection data");
return Ok(WriteStatus::Complete)
}
match self.socket.try_write_buf(buf) {
Ok(_) if (buf.position() as usize) < send_size => {
self.interest.insert(EventSet::writable());
Ok(WriteStatus::Ongoing)
},
Ok(_) if (buf.position() as usize) == send_size => {
Ok(WriteStatus::Complete)
},
Ok(_) => { panic!("Wrote past buffer");},
Err(e) => Err(e)
}
}.and_then(|r| {
if r == WriteStatus::Complete {
self.send_queue.pop_front();
}
if self.send_queue.is_empty() {
self.interest.remove(EventSet::writable());
}
else {
self.interest.insert(EventSet::writable());
}
Ok(r)
})
}
/// Register this connection with the IO event loop.
pub fn register<Host: Handler>(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", self.token);
self.interest.insert(EventSet::readable());
event_loop.register(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to register {:?}, {:?}", self.token, e);
Err(e)
})
}
/// Update connection registration. Should be called at the end of the IO handler.
pub fn reregister<Host: Handler>(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", self.token);
event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}
}
/// RLPx packet
pub struct Packet {
pub protocol: u16,
pub data: Bytes,
}
/// Encrypted connection receiving state.
enum EncryptedConnectionState {
/// Reading a header.
Header,
/// Reading the rest of the packet.
Payload,
}
/// Connection implementing RLPx framing
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
pub struct EncryptedConnection {
/// Underlying tcp connection
connection: Connection,
/// Egress data encryptor
encoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
decoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
mac_encoder: EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>,
/// MAC for egress data
egress_mac: Keccak,
/// MAC for ingress data
ingress_mac: Keccak,
/// Read state
read_state: EncryptedConnectionState,
/// Disconnect timeout
idle_timeout: Option<Timeout>,
/// Protocol id for the last received packet
protocol_id: u16,
/// Payload expected to be received for the last header.
payload_len: usize,
}
impl EncryptedConnection {
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
pub fn new(handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
let mut nonce_material = H512::new();
if handshake.originated {
handshake.remote_nonce.copy_to(&mut nonce_material[0..32]);
handshake.nonce.copy_to(&mut nonce_material[32..64]);
}
else {
handshake.nonce.copy_to(&mut nonce_material[0..32]);
handshake.remote_nonce.copy_to(&mut nonce_material[32..64]);
}
let mut key_material = H512::new();
shared.copy_to(&mut key_material[0..32]);
nonce_material.sha3_into(&mut key_material[32..64]);
key_material.sha3().copy_to(&mut key_material[32..64]);
key_material.sha3().copy_to(&mut key_material[32..64]);
let iv = vec![0u8; 16];
let encoder = CtrMode::new(AesSafe256Encryptor::new(&key_material[32..64]), iv);
let iv = vec![0u8; 16];
let decoder = CtrMode::new(AesSafe256Encryptor::new(&key_material[32..64]), iv);
key_material.sha3().copy_to(&mut key_material[32..64]);
let mac_encoder = EcbEncryptor::new(AesSafe256Encryptor::new(&key_material[32..64]), NoPadding);
let mut egress_mac = Keccak::new_keccak256();
let mut mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.remote_nonce;
egress_mac.update(&mac_material);
egress_mac.update(if handshake.originated { &handshake.auth_cipher } else { &handshake.ack_cipher });
let mut ingress_mac = Keccak::new_keccak256();
mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.nonce;
ingress_mac.update(&mac_material);
ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher });
Ok(EncryptedConnection {
connection: handshake.connection,
encoder: encoder,
decoder: decoder,
mac_encoder: mac_encoder,
egress_mac: egress_mac,
ingress_mac: ingress_mac,
read_state: EncryptedConnectionState::Header,
idle_timeout: None,
protocol_id: 0,
payload_len: 0
})
}
/// Send a packet
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> {
let mut header = RlpStream::new();
let len = payload.len() as usize;
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
header.append_raw(&[0xc2u8, 0x80u8, 0x80u8], 1);
//TODO: ger rid of vectors here
let mut header = header.out();
let padding = (16 - (payload.len() % 16)) % 16;
header.resize(16, 0u8);
let mut packet = vec![0u8; (32 + payload.len() + padding + 16)];
self.encoder.encrypt(&mut RefReadBuffer::new(&header), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &packet[0..16]);
self.egress_mac.clone().finalize(&mut packet[16..32]);
self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding");
if padding != 0 {
let pad = [0u8; 16];
self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding");
}
self.egress_mac.update(&packet[32..(32 + len + padding)]);
EncryptedConnection::update_mac(&mut self.egress_mac, &mut self.mac_encoder, &[0u8; 0]);
self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]);
self.connection.send(packet);
Ok(())
}
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> {
if header.len() != ENCRYPTED_HEADER_LEN {
return Err(From::from(NetworkError::Auth));
}
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &header[0..16]);
let mac = &header[16..];
let mut expected = H256::new();
self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[0..16] {
return Err(From::from(NetworkError::Auth));
}
let mut hdec = H128::new();
self.decoder.decrypt(&mut RefReadBuffer::new(&header[0..16]), &mut RefWriteBuffer::new(&mut hdec), false).expect("Invalid length or padding");
let length = ((((hdec[0] as u32) << 8) + (hdec[1] as u32)) << 8) + (hdec[2] as u32);
let header_rlp = UntrustedRlp::new(&hdec[3..6]);
let protocol_id = try!(header_rlp.val_at::<u16>(0));
self.payload_len = length as usize;
self.protocol_id = protocol_id;
self.read_state = EncryptedConnectionState::Payload;
let padding = (16 - (length % 16)) % 16;
let full_length = length + padding + 16;
self.connection.expect(full_length as usize);
Ok(())
}
/// Decrypt and authenticate packet payload.
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, UtilError> {
let padding = (16 - (self.payload_len % 16)) % 16;
let full_length = self.payload_len + padding + 16;
if payload.len() != full_length {
return Err(From::from(NetworkError::Auth));
}
self.ingress_mac.update(&payload[0..payload.len() - 16]);
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &[0u8; 0]);
let mac = &payload[(payload.len() - 16)..];
let mut expected = H128::new();
self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[..] {
return Err(From::from(NetworkError::Auth));
}
let mut packet = vec![0u8; self.payload_len];
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..self.payload_len]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
let mut pad_buf = [0u8; 16];
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[self.payload_len..(payload.len() - 16)]), &mut RefWriteBuffer::new(&mut pad_buf), false).expect("Invalid length or padding");
Ok(Packet {
protocol: self.protocol_id,
data: packet
})
}
/// Update MAC after reading or writing any data.
fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, seed: &[u8]) {
let mut prev = H128::new();
mac.clone().finalize(&mut prev);
let mut enc = H128::new();
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).unwrap();
mac_encoder.reset();
enc = enc ^ if seed.is_empty() { prev } else { H128::from_slice(seed) };
mac.update(&enc);
}
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
match self.read_state {
EncryptedConnectionState::Header => {
match try!(self.connection.readable()) {
Some(data) => {
try!(self.read_header(&data));
},
None => {}
};
Ok(None)
},
EncryptedConnectionState::Payload => {
match try!(self.connection.readable()) {
Some(data) => {
self.read_state = EncryptedConnectionState::Header;
self.connection.expect(ENCRYPTED_HEADER_LEN);
Ok(Some(try!(self.read_payload(&data))))
},
None => Ok(None)
}
}
}
}
/// Writable IO handler. Processes send queeue.
pub fn writable<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.writable());
Ok(())
}
/// Register this connection with the event handler.
pub fn register<Host:Handler<Timeout=Token>>(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.expect(ENCRYPTED_HEADER_LEN);
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok();
try!(self.connection.reregister(event_loop));
Ok(())
}
/// Update connection registration. This should be called at the end of the event loop.
pub fn reregister<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.reregister(event_loop));
Ok(())
}
}
#[test]
pub fn test_encryption() {
use hash::*;
use std::str::FromStr;
let key = H256::from_str("2212767d793a7a3d66f869ae324dd11bd17044b82c9f463b8a541a4d089efec5").unwrap();
let before = H128::from_str("12532abaec065082a3cf1da7d0136f15").unwrap();
let before2 = H128::from_str("7e99f682356fdfbc6b67a9562787b18a").unwrap();
let after = H128::from_str("89464c6b04e7c99e555c81d3f7266a05").unwrap();
let after2 = H128::from_str("85c070030589ef9c7a2879b3a8489316").unwrap();
let mut got = H128::new();
let mut encoder = EcbEncryptor::new(AesSafe256Encryptor::new(&key), NoPadding);
encoder.encrypt(&mut RefReadBuffer::new(&before), &mut RefWriteBuffer::new(&mut got), true).unwrap();
encoder.reset();
assert_eq!(got, after);
got = H128::new();
encoder.encrypt(&mut RefReadBuffer::new(&before2), &mut RefWriteBuffer::new(&mut got), true).unwrap();
encoder.reset();
assert_eq!(got, after2);
}

View File

@@ -0,0 +1,206 @@
// This module is a work in progress
#![allow(dead_code)] //TODO: remove this after everything is done
use std::collections::{HashSet, BTreeMap};
use std::cell::{RefCell};
use std::ops::{DerefMut};
use mio::*;
use mio::udp::*;
use hash::*;
use sha3::Hashable;
use crypto::*;
use network::node::*;
const ADDRESS_BYTES_SIZE: u32 = 32; ///< Size of address type in bytes.
const ADDRESS_BITS: u32 = 8 * ADDRESS_BYTES_SIZE; ///< Denoted by n in [Kademlia].
const NODE_BINS: u32 = ADDRESS_BITS - 1; ///< Size of m_state (excludes root, which is us).
const DISCOVERY_MAX_STEPS: u16 = 8; ///< Max iterations of discovery. (discover)
const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
struct NodeBucket {
distance: u32,
nodes: Vec<NodeId>
}
impl NodeBucket {
fn new(distance: u32) -> NodeBucket {
NodeBucket {
distance: distance,
nodes: Vec::new()
}
}
}
struct Discovery {
id: NodeId,
discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
}
struct FindNodePacket;
impl FindNodePacket {
fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket {
FindNodePacket
}
fn sign(&mut self, _secret: &Secret) {
}
fn send(& self, _socket: &mut UdpSocket) {
}
}
impl Discovery {
pub fn new(id: &NodeId) -> Discovery {
Discovery {
id: id.clone(),
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..NODE_BINS).map(|x| NodeBucket::new(x)).collect(),
}
}
pub fn add_node(&mut self, id: &NodeId) {
self.node_buckets[Discovery::distance(&self.id, &id) as usize].nodes.push(id.clone());
}
fn start_node_discovery<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) {
self.discovery_round = 0;
self.discovery_id.randomize();
self.discovery_nodes.clear();
self.discover(event_loop);
}
fn discover<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) {
if self.discovery_round == DISCOVERY_MAX_STEPS
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return;
}
let mut tried_count = 0;
{
let nearest = Discovery::nearest_node_entries(&self.id, &self.discovery_id, &self.node_buckets).into_iter();
let nodes = RefCell::new(&mut self.discovery_nodes);
let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA);
for r in nearest {
//let mut p = FindNodePacket::new(&r.endpoint, &self.discovery_id);
//p.sign(&self.secret);
//p.send(&mut self.udp_socket);
let mut borrowed = nodes.borrow_mut();
borrowed.deref_mut().insert(r.clone());
tried_count += 1;
}
}
if tried_count == 0
{
debug!("Restarting discovery");
self.start_node_discovery(event_loop);
return;
}
self.discovery_round += 1;
//event_loop.timeout_ms(Token(NODETABLE_DISCOVERY), 1200).unwrap();
}
fn distance(a: &NodeId, b: &NodeId) -> u32 {
let d = a.sha3() ^ b.sha3();
let mut ret:u32 = 0;
for i in 0..32 {
let mut v: u8 = d[i];
while v != 0 {
v >>= 1;
ret += 1;
}
}
ret
}
fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec<NodeBucket>) -> Vec<&'b NodeId>
{
// send ALPHA FindNode packets to nodes we know, closest to target
const LAST_BIN: u32 = NODE_BINS - 1;
let mut head = Discovery::distance(source, target);
let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS };
let mut found: BTreeMap<u32, Vec<&'b NodeId>> = BTreeMap::new();
let mut count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d
if head > 1 && tail != LAST_BIN {
while head != tail && head < NODE_BINS && count < BUCKET_SIZE
{
for n in buckets[head as usize].nodes.iter()
{
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
if count < BUCKET_SIZE && tail != 0 {
for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
}
head += 1;
if tail > 0 {
tail -= 1;
}
}
}
else if head < 2 {
while head < NODE_BINS && count < BUCKET_SIZE {
for n in buckets[head as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
head += 1;
}
}
else {
while tail > 0 && count < BUCKET_SIZE {
for n in buckets[tail as usize].nodes.iter() {
if count < BUCKET_SIZE {
count += 1;
found.entry(Discovery::distance(target, &n)).or_insert(Vec::new()).push(n);
}
else {
break;
}
}
tail -= 1;
}
}
let mut ret:Vec<&NodeId> = Vec::new();
for (_, nodes) in found {
for n in nodes {
if ret.len() < BUCKET_SIZE as usize /* && n->endpoint && n->endpoint.isAllowed() */ {
ret.push(n);
}
}
}
ret
}
}

41
util/src/network/error.rs Normal file
View File

@@ -0,0 +1,41 @@
use io::IoError;
use rlp::*;
#[derive(Debug, Copy, Clone)]
pub enum DisconnectReason
{
DisconnectRequested,
//TCPError,
//BadProtocol,
UselessPeer,
//TooManyPeers,
//DuplicatePeer,
//IncompatibleProtocol,
//NullIdentity,
//ClientQuit,
//UnexpectedIdentity,
//LocalIdentity,
//PingTimeout,
}
#[derive(Debug)]
pub enum NetworkError {
Auth,
BadProtocol,
PeerNotFound,
Disconnect(DisconnectReason),
Io(IoError),
}
impl From<DecoderError> for NetworkError {
fn from(_err: DecoderError) -> NetworkError {
NetworkError::Auth
}
}
impl From<IoError> for NetworkError {
fn from(err: IoError) -> NetworkError {
NetworkError::Io(err)
}
}

View File

@@ -0,0 +1,217 @@
use mio::*;
use mio::tcp::*;
use hash::*;
use sha3::Hashable;
use bytes::Bytes;
use crypto::*;
use crypto;
use network::connection::{Connection};
use network::host::{HostInfo};
use network::node::NodeId;
use error::*;
use network::error::NetworkError;
#[derive(PartialEq, Eq, Debug)]
enum HandshakeState {
/// Just created
New,
/// Waiting for auth packet
ReadingAuth,
/// Waiting for ack packet
ReadingAck,
/// Ready to start a session
StartSession,
}
/// RLPx protocol handhake. See https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake
pub struct Handshake {
/// Remote node public key
pub id: NodeId,
/// Underlying connection
pub connection: Connection,
/// Handshake state
state: HandshakeState,
/// Outgoing or incoming connection
pub originated: bool,
/// Disconnect timeout
idle_timeout: Option<Timeout>,
/// ECDH ephemeral
pub ecdhe: KeyPair,
/// Connection nonce
pub nonce: H256,
/// Handshake public key
pub remote_public: Public,
/// Remote connection nonce.
pub remote_nonce: H256,
/// A copy of received encryped auth packet
pub auth_cipher: Bytes,
/// A copy of received encryped ack packet
pub ack_cipher: Bytes
}
const AUTH_PACKET_SIZE: usize = 307;
const ACK_PACKET_SIZE: usize = 210;
impl Handshake {
/// Create a new handshake object
pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> {
Ok(Handshake {
id: id.clone(),
connection: Connection::new(token, socket),
originated: false,
state: HandshakeState::New,
idle_timeout: None,
ecdhe: try!(KeyPair::create()),
nonce: nonce.clone(),
remote_public: Public::new(),
remote_nonce: H256::new(),
auth_cipher: Bytes::new(),
ack_cipher: Bytes::new(),
})
}
/// Start a handhsake
pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> {
self.originated = originated;
if originated {
try!(self.write_auth(host));
}
else {
self.state = HandshakeState::ReadingAuth;
self.connection.expect(AUTH_PACKET_SIZE);
};
Ok(())
}
/// Check if handshake is complete
pub fn done(&self) -> bool {
self.state == HandshakeState::StartSession
}
/// Readable IO handler. Drives the state change.
pub fn readable<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
match self.state {
HandshakeState::ReadingAuth => {
match try!(self.connection.readable()) {
Some(data) => {
try!(self.read_auth(host, &data));
try!(self.write_ack());
},
None => {}
};
},
HandshakeState::ReadingAck => {
match try!(self.connection.readable()) {
Some(data) => {
try!(self.read_ack(host, &data));
self.state = HandshakeState::StartSession;
},
None => {}
};
},
_ => { panic!("Unexpected state"); }
}
if self.state != HandshakeState::StartSession {
try!(self.connection.reregister(event_loop));
}
Ok(())
}
/// Writabe IO handler.
pub fn writable<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
try!(self.connection.reregister(event_loop));
}
Ok(())
}
/// Register the IO handler with the event loop
pub fn register<Host:Handler<Timeout=Token>>(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok();
try!(self.connection.register(event_loop));
Ok(())
}
/// Parse, validate and confirm auth message
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == AUTH_PACKET_SIZE);
self.auth_cipher = data.to_vec();
let auth = try!(ecies::decrypt(host.secret(), data));
let (sig, rest) = auth.split_at(65);
let (hepubk, rest) = rest.split_at(32);
let (pubk, rest) = rest.split_at(64);
let (nonce, _) = rest.split_at(32);
self.remote_public.clone_from_slice(pubk);
self.remote_nonce.clone_from_slice(nonce);
let shared = try!(ecdh::agree(host.secret(), &self.remote_public));
let signature = Signature::from_slice(sig);
let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce)));
if &spub.sha3()[..] != hepubk {
trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr());
return Err(From::from(NetworkError::Auth));
};
self.write_ack()
}
/// Parse and validate ack message
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == ACK_PACKET_SIZE);
self.ack_cipher = data.to_vec();
let ack = try!(ecies::decrypt(host.secret(), data));
self.remote_public.clone_from_slice(&ack[0..64]);
self.remote_nonce.clone_from_slice(&ack[64..(64+32)]);
Ok(())
}
/// Sends auth message
fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len();
{
data[len - 1] = 0x0;
let (sig, rest) = data.split_at_mut(65);
let (hepubk, rest) = rest.split_at_mut(32);
let (pubk, rest) = rest.split_at_mut(64);
let (nonce, _) = rest.split_at_mut(32);
// E(remote-pubk, S(ecdhe-random, ecdh-shared-secret^nonce) || H(ecdhe-random-pubk) || pubk || nonce || 0x0)
let shared = try!(crypto::ecdh::agree(host.secret(), &self.id));
try!(crypto::ec::sign(self.ecdhe.secret(), &(&shared ^ &self.nonce))).copy_to(sig);
self.ecdhe.public().sha3_into(hepubk);
host.id().copy_to(pubk);
self.nonce.copy_to(nonce);
}
let message = try!(crypto::ecies::encrypt(&self.id, &data));
self.auth_cipher = message.clone();
self.connection.send(message);
self.connection.expect(ACK_PACKET_SIZE);
self.state = HandshakeState::ReadingAck;
Ok(())
}
/// Sends ack message
fn write_ack(&mut self) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len();
{
data[len - 1] = 0x0;
let (epubk, rest) = data.split_at_mut(64);
let (nonce, _) = rest.split_at_mut(32);
self.ecdhe.public().copy_to(epubk);
self.nonce.copy_to(nonce);
}
let message = try!(crypto::ecies::encrypt(&self.id, &data));
self.ack_cipher = message.clone();
self.connection.send(message);
self.state = HandshakeState::StartSession;
Ok(())
}
}

651
util/src/network/host.rs Normal file
View File

@@ -0,0 +1,651 @@
use std::mem;
use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::hash::{Hasher};
use std::str::{FromStr};
use mio::*;
use mio::tcp::*;
use mio::udp::*;
use hash::*;
use crypto::*;
use sha3::Hashable;
use rlp::*;
use network::handshake::Handshake;
use network::session::{Session, SessionData};
use error::*;
use io::*;
use network::NetworkProtocolHandler;
use network::node::*;
type Slab<T> = ::slab::Slab<T, usize>;
const _DEFAULT_PORT: u16 = 30304;
const MAX_CONNECTIONS: usize = 1024;
const IDEAL_PEERS: u32 = 10;
const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)]
struct NetworkConfiguration {
listen_address: SocketAddr,
public_address: SocketAddr,
nat_enabled: bool,
discovery_enabled: bool,
pin: bool,
}
impl NetworkConfiguration {
fn new() -> NetworkConfiguration {
NetworkConfiguration {
listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(),
nat_enabled: true,
discovery_enabled: true,
pin: false,
}
}
}
// Tokens
//const TOKEN_BEGIN: usize = USER_TOKEN_START; // TODO: ICE in rustc 1.7.0-nightly (49c382779 2016-01-12)
const TOKEN_BEGIN: usize = 32;
const TCP_ACCEPT: usize = TOKEN_BEGIN + 1;
const IDLE: usize = TOKEN_BEGIN + 2;
const NODETABLE_RECEIVE: usize = TOKEN_BEGIN + 3;
const NODETABLE_MAINTAIN: usize = TOKEN_BEGIN + 4;
const NODETABLE_DISCOVERY: usize = TOKEN_BEGIN + 5;
const FIRST_CONNECTION: usize = TOKEN_BEGIN + 16;
const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1;
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = &'static str;
/// Messages used to communitate with the event loop from other threads.
pub enum NetworkIoMessage<Message> where Message: Send {
/// Register a new protocol handler.
AddHandler {
handler: Option<Box<NetworkProtocolHandler<Message>+Send>>,
protocol: ProtocolId,
versions: Vec<u8>,
},
/// Send data over the network.
Send {
peer: PeerId,
packet_id: PacketId,
protocol: ProtocolId,
data: Vec<u8>,
},
/// User message
User(Message),
}
/// Local (temporary) peer session ID.
pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
}
impl Encodable for CapabilityInfo {
fn encode<E>(&self, encoder: &mut E) -> () where E: Encoder {
encoder.emit_list(|e| {
self.protocol.encode(e);
(self.version as u32).encode(e);
});
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct NetworkContext<'s, 'io, Message> where Message: Send + 'static, 'io: 's {
io: &'s mut IoContext<'io, NetworkIoMessage<Message>>,
protocol: ProtocolId,
connections: &'s mut Slab<ConnectionEntry>,
timers: &'s mut HashMap<TimerToken, ProtocolId>,
session: Option<StreamToken>,
}
impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 'static, {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(io: &'s mut IoContext<'io, NetworkIoMessage<Message>>,
protocol: ProtocolId,
session: Option<StreamToken>, connections: &'s mut Slab<ConnectionEntry>,
timers: &'s mut HashMap<TimerToken, ProtocolId>) -> NetworkContext<'s, 'io, Message> {
NetworkContext {
io: io,
protocol: protocol,
session: session,
connections: connections,
timers: timers,
}
}
/// Send a packet over the network to another peer.
pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
match self.connections.get_mut(peer) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
warn!(target: "net", "Send error: {:?}", e);
}); //TODO: don't copy vector data
},
_ => {
warn!(target: "net", "Send: Peer does not exist");
}
}
Ok(())
}
/// Respond to a current network message. Panics if no there is no packet in the context.
pub fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
match self.session {
Some(session) => self.send(session, packet_id, data),
None => {
panic!("Respond: Session does not exist")
}
}
}
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&mut self, _peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
}
/// Register a new IO timer. Returns a new timer token. 'NetworkProtocolHandler::timeout' will be called with the token.
pub fn register_timer(&mut self, ms: u64) -> Result<TimerToken, UtilError>{
match self.io.register_timer(ms) {
Ok(token) => {
self.timers.insert(token, self.protocol);
Ok(token)
},
e => e,
}
}
/// Returns peer identification string
pub fn peer_info(&self, peer: PeerId) -> String {
match self.connections.get(peer) {
Some(&ConnectionEntry::Session(ref s)) => {
s.info.client_version.clone()
},
_ => {
"unknown".to_string()
}
}
}
}
/// Shared host information
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
nonce: H256,
/// RLPx protocol version
pub protocol_version: u32,
/// Client identifier
pub client_version: String,
/// TCP connection port.
pub listen_port: u16,
/// Registered capabilities (handlers)
pub capabilities: Vec<CapabilityInfo>
}
impl HostInfo {
/// Returns public key
pub fn id(&self) -> &NodeId {
self.keys.public()
}
/// Returns secret key
pub fn secret(&self) -> &Secret {
self.keys.secret()
}
/// Increments and returns connection nonce.
pub fn next_nonce(&mut self) -> H256 {
self.nonce = self.nonce.sha3();
return self.nonce.clone();
}
}
enum ConnectionEntry {
Handshake(Handshake),
Session(Session)
}
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host<Message> where Message: Send {
pub info: HostInfo,
udp_socket: UdpSocket,
listener: TcpListener,
connections: Slab<ConnectionEntry>,
timers: HashMap<TimerToken, ProtocolId>,
nodes: HashMap<NodeId, Node>,
handlers: HashMap<ProtocolId, Box<NetworkProtocolHandler<Message>>>,
}
impl<Message> Host<Message> where Message: Send {
pub fn new() -> Host<Message> {
let config = NetworkConfiguration::new();
let addr = config.listen_address;
// Setup the server socket
let listener = TcpListener::bind(&addr).unwrap();
let udp_socket = UdpSocket::bound(&addr).unwrap();
Host::<Message> {
info: HostInfo {
keys: KeyPair::create().unwrap(),
config: config,
nonce: H256::random(),
protocol_version: 4,
client_version: "parity".to_string(),
listen_port: 0,
capabilities: Vec::new(),
},
udp_socket: udp_socket,
listener: listener,
connections: Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS),
timers: HashMap::new(),
nodes: HashMap::new(),
handlers: HashMap::new(),
}
}
fn add_node(&mut self, id: &str) {
match Node::from_str(id) {
Err(e) => { warn!("Could not add node: {:?}", e); },
Ok(n) => {
self.nodes.insert(n.id.clone(), n);
}
}
}
fn maintain_network(&mut self, io: &mut IoContext<NetworkIoMessage<Message>>) {
self.connect_peers(io);
io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap();
}
fn have_session(&self, id: &NodeId) -> bool {
self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false })
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
}
fn connect_peers(&mut self, io: &mut IoContext<NetworkIoMessage<Message>>) {
struct NodeInfo {
id: NodeId,
peer_type: PeerType
}
let mut to_connect: Vec<NodeInfo> = Vec::new();
let mut req_conn = 0;
//TODO: use nodes from discovery here
//for n in self.node_buckets.iter().flat_map(|n| &n.nodes).map(|id| NodeInfo { id: id.clone(), peer_type: self.nodes.get(id).unwrap().peer_type}) {
for n in self.nodes.values().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) {
let connected = self.have_session(&n.id) || self.connecting_to(&n.id);
let required = n.peer_type == PeerType::Required;
if connected && required {
req_conn += 1;
}
else if !connected && (!self.info.config.pin || required) {
to_connect.push(n);
}
}
for n in to_connect.iter() {
if n.peer_type == PeerType::Required {
if req_conn < IDEAL_PEERS {
self.connect_peer(&n.id, io);
}
req_conn += 1;
}
}
if !self.info.config.pin
{
let pending_count = 0; //TODO:
let peer_count = 0;
let mut open_slots = IDEAL_PEERS - peer_count - pending_count + req_conn;
if open_slots > 0 {
for n in to_connect.iter() {
if n.peer_type == PeerType::Optional && open_slots > 0 {
open_slots -= 1;
self.connect_peer(&n.id, io);
}
}
}
}
}
fn connect_peer(&mut self, id: &NodeId, io: &mut IoContext<NetworkIoMessage<Message>>) {
if self.have_session(id)
{
warn!("Aborted connect. Node already connected.");
return;
}
if self.connecting_to(id)
{
warn!("Aborted connect. Node already connecting.");
return;
}
let socket = {
let node = self.nodes.get_mut(id).unwrap();
node.last_attempted = Some(::time::now());
match TcpStream::connect(&node.endpoint.address) {
Ok(socket) => socket,
Err(_) => {
warn!("Cannot connect to node");
return;
}
}
};
let nonce = self.info.next_nonce();
match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(Token(token), id, socket, &nonce).expect("Can't create handshake"))) {
Some(token) => {
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
h.start(&self.info, true)
.and_then(|_| h.register(io.event_loop))
.unwrap_or_else (|e| {
debug!(target: "net", "Handshake create error: {:?}", e);
});
},
_ => {}
}
},
None => { warn!("Max connections reached") }
}
}
fn accept(&mut self, _io: &mut IoContext<NetworkIoMessage<Message>>) {
trace!(target: "net", "accept");
}
fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
let mut kill = false;
let mut create_session = false;
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
h.writable(io.event_loop, &self.info).unwrap_or_else(|e| {
debug!(target: "net", "Handshake write error: {:?}", e);
kill = true;
});
create_session = h.done();
},
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.writable(io.event_loop, &self.info).unwrap_or_else(|e| {
debug!(target: "net", "Session write error: {:?}", e);
kill = true;
});
}
_ => {
warn!(target: "net", "Received event for unknown connection");
}
}
if kill {
self.kill_connection(token, io);
return;
} else if create_session {
self.start_session(token, io);
}
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
},
_ => (),
}
}
fn connection_closed<'s>(&'s mut self, token: TimerToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
self.kill_connection(token, io);
}
fn connection_readable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
let mut kill = false;
let mut create_session = false;
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
h.readable(io.event_loop, &self.info).unwrap_or_else(|e| {
debug!(target: "net", "Handshake read error: {:?}", e);
kill = true;
});
create_session = h.done();
},
Some(&mut ConnectionEntry::Session(ref mut s)) => {
let sd = { s.readable(io.event_loop, &self.info).unwrap_or_else(|e| {
debug!(target: "net", "Session read error: {:?}", e);
kill = true;
SessionData::None
}) };
match sd {
SessionData::Ready => {
for (p, _) in self.handlers.iter_mut() {
if s.have_capability(p) {
ready_data.push(p);
}
}
},
SessionData::Packet {
data,
protocol,
packet_id,
} => {
match self.handlers.get_mut(protocol) {
None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data = Some((protocol, packet_id, data)),
}
},
SessionData::None => {},
}
}
_ => {
warn!(target: "net", "Received event for unknown connection");
}
}
if kill {
self.kill_connection(token, io);
return;
}
if create_session {
self.start_session(token, io);
}
for p in ready_data {
let mut h = self.handlers.get_mut(p).unwrap();
h.connected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token);
}
if let Some((p, packet_id, data)) = packet_data {
let mut h = self.handlers.get_mut(p).unwrap();
h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]);
}
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
},
_ => (),
}
}
fn start_session(&mut self, token: StreamToken, io: &mut IoContext<NetworkIoMessage<Message>>) {
let info = &self.info;
// TODO: use slab::replace_with (currently broken)
/*
match self.connections.remove(token) {
Some(ConnectionEntry::Handshake(h)) => {
match Session::new(h, io.event_loop, info) {
Ok(session) => {
assert!(token == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap());
},
Err(e) => {
debug!(target: "net", "Session construction error: {:?}", e);
}
}
},
_ => panic!("Error updating slab with session")
}*/
self.connections.replace_with(token, |c| {
match c {
ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info)
.map(|s| Some(ConnectionEntry::Session(s)))
.unwrap_or_else(|e| {
debug!(target: "net", "Session construction error: {:?}", e);
None
}),
_ => { panic!("No handshake to create a session from"); }
}
}).expect("Error updating slab with session");
}
fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
self.kill_connection(token, io)
}
fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
let mut remove = true;
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake
Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => {
for (p, _) in self.handlers.iter_mut() {
if s.have_capability(p) {
to_disconnect.push(p);
}
}
},
_ => {
remove = false;
},
}
for p in to_disconnect {
let mut h = self.handlers.get_mut(p).unwrap();
h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token);
}
if remove {
self.connections.remove(token);
}
}
}
impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Message: Send + 'static {
/// Initialize networking
fn initialize(&mut self, io: &mut IoContext<NetworkIoMessage<Message>>) {
/*
match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() {
Some(iface) => config.public_address = iface.addr.unwrap(),
None => warn!("No public network interface"),
*/
// Start listening for incoming connections
io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap();
io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap();
// open the udp socket
io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap();
io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap();
let port = self.info.config.listen_address.port();
self.info.listen_port = port;
// self.add_node("enode://a9a921de2ff09a9a4d38b623c67b2d6b477a8e654ae95d874750cbbcb31b33296496a7b4421934e2629269e180823e52c15c2b19fc59592ec51ffe4f2de76ed7@127.0.0.1:30303");
// GO bootnodes
self.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); // IE
self.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR
self.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG
// ETH/DEV cpp-ethereum (poc-9.ethdev.com)
self.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303");
}
fn stream_hup<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage<Message>>, stream: StreamToken) {
trace!(target: "net", "Hup: {}", stream);
match stream {
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(stream, io),
_ => warn!(target: "net", "Unexpected hup"),
};
}
fn stream_readable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage<Message>>, stream: StreamToken) {
match stream {
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(stream, io),
NODETABLE_RECEIVE => {},
TCP_ACCEPT => self.accept(io),
_ => panic!("Received unknown readable token"),
}
}
fn stream_writable<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage<Message>>, stream: StreamToken) {
match stream {
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(stream, io),
_ => panic!("Received unknown writable token"),
}
}
fn timeout<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage<Message>>, token: TimerToken) {
match token {
IDLE => self.maintain_network(io),
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, io),
NODETABLE_DISCOVERY => {},
NODETABLE_MAINTAIN => {},
_ => match self.timers.get_mut(&token).map(|p| *p) {
Some(protocol) => match self.handlers.get_mut(protocol) {
None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
Some(h) => { h.timeout(&mut NetworkContext::new(io, protocol, Some(token), &mut self.connections, &mut self.timers), token); }
},
None => {} // time not registerd through us
}
}
}
fn message<'s>(&'s mut self, io: &mut IoContext<'s, NetworkIoMessage<Message>>, message: &'s mut NetworkIoMessage<Message>) {
match message {
&mut NetworkIoMessage::AddHandler {
ref mut handler,
ref protocol,
ref versions
} => {
let mut h = mem::replace(handler, None).unwrap();
h.initialize(&mut NetworkContext::new(io, protocol, None, &mut self.connections, &mut self.timers));
self.handlers.insert(protocol, h);
for v in versions {
self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 });
}
},
&mut NetworkIoMessage::Send {
ref peer,
ref packet_id,
ref protocol,
ref data,
} => {
match self.connections.get_mut(*peer as usize) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| {
warn!(target: "net", "Send error: {:?}", e);
}); //TODO: don't copy vector data
},
_ => {
warn!(target: "net", "Send: Peer does not exist");
}
}
},
&mut NetworkIoMessage::User(ref message) => {
for (p, h) in self.handlers.iter_mut() {
h.message(&mut NetworkContext::new(io, p, None, &mut self.connections, &mut self.timers), &message);
}
}
}
}
}

86
util/src/network/mod.rs Normal file
View File

@@ -0,0 +1,86 @@
/// Network and general IO module.
///
/// Example usage for craeting a network service and adding an IO handler:
///
/// ```rust
/// extern crate ethcore_util as util;
/// use util::*;
///
/// struct MyHandler;
///
/// struct MyMessage {
/// data: u32
/// }
///
/// impl NetworkProtocolHandler<MyMessage> for MyHandler {
/// fn initialize(&mut self, io: &mut NetworkContext<MyMessage>) {
/// io.register_timer(1000);
/// }
///
/// fn read(&mut self, io: &mut NetworkContext<MyMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
/// println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer);
/// }
///
/// fn connected(&mut self, io: &mut NetworkContext<MyMessage>, peer: &PeerId) {
/// println!("Connected {}", peer);
/// }
///
/// fn disconnected(&mut self, io: &mut NetworkContext<MyMessage>, peer: &PeerId) {
/// println!("Disconnected {}", peer);
/// }
///
/// fn timeout(&mut self, io: &mut NetworkContext<MyMessage>, timer: TimerToken) {
/// println!("Timeout {}", timer);
/// }
///
/// fn message(&mut self, io: &mut NetworkContext<MyMessage>, message: &MyMessage) {
/// println!("Message {}", message.data);
/// }
/// }
///
/// fn main () {
/// let mut service = NetworkService::<MyMessage>::start().expect("Error creating network service");
/// service.register_protocol(Box::new(MyHandler), "myproto", &[1u8]);
///
/// // Wait for quit condition
/// // ...
/// // Drop the service
/// }
/// ```
mod host;
mod connection;
mod handshake;
mod session;
mod discovery;
mod service;
mod error;
mod node;
pub type PeerId = host::PeerId;
pub type PacketId = host::PacketId;
pub type NetworkContext<'s,'io, Message> = host::NetworkContext<'s, 'io, Message>;
pub type NetworkService<Message> = service::NetworkService<Message>;
pub type NetworkIoMessage<Message> = host::NetworkIoMessage<Message>;
pub use network::host::NetworkIoMessage::User as UserMessage;
pub type NetworkError = error::NetworkError;
use io::*;
/// Network IO protocol handler. This needs to be implemented for each new subprotocol.
/// All the handler function are called from within IO event loop.
/// `Message` is the type for message data.
pub trait NetworkProtocolHandler<Message>: Send where Message: Send {
/// Initialize the handler
fn initialize(&mut self, _io: &mut NetworkContext<Message>) {}
/// Called when new network packet received.
fn read(&mut self, io: &mut NetworkContext<Message>, peer: &PeerId, packet_id: u8, data: &[u8]);
/// Called when new peer is connected. Only called when peer supports the same protocol.
fn connected(&mut self, io: &mut NetworkContext<Message>, peer: &PeerId);
/// Called when a previously connected peer disconnects.
fn disconnected(&mut self, io: &mut NetworkContext<Message>, peer: &PeerId);
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&mut self, _io: &mut NetworkContext<Message>, _timer: TimerToken) {}
/// Called when a broadcasted message is received. The message can only be sent from a different IO handler.
fn message(&mut self, _io: &mut NetworkContext<Message>, _message: &Message) {}
}

83
util/src/network/node.rs Normal file
View File

@@ -0,0 +1,83 @@
use std::net::{SocketAddr, ToSocketAddrs};
use std::hash::{Hash, Hasher};
use std::str::{FromStr};
use hash::*;
use rlp::*;
use time::Tm;
use error::*;
/// Node public key
pub type NodeId = H512;
#[derive(Debug)]
/// Noe address info
pub struct NodeEndpoint {
/// IP(V4 or V6) address
pub address: SocketAddr,
/// Address as string (can be host name).
pub address_str: String,
/// Conneciton port.
pub udp_port: u16
}
impl NodeEndpoint {
/// Create endpoint from string. Performs name resolution if given a host name.
fn from_str(s: &str) -> Result<NodeEndpoint, UtilError> {
let address = s.to_socket_addrs().map(|mut i| i.next());
match address {
Ok(Some(a)) => Ok(NodeEndpoint {
address: a,
address_str: s.to_string(),
udp_port: a.port()
}),
Ok(_) => Err(UtilError::AddressResolve(None)),
Err(e) => Err(UtilError::AddressResolve(Some(e)))
}
}
}
#[derive(PartialEq, Eq, Copy, Clone)]
pub enum PeerType {
Required,
Optional
}
pub struct Node {
pub id: NodeId,
pub endpoint: NodeEndpoint,
pub peer_type: PeerType,
pub last_attempted: Option<Tm>,
}
impl FromStr for Node {
type Err = UtilError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" {
(try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..])))
}
else {
(NodeId::new(), try!(NodeEndpoint::from_str(s)))
};
Ok(Node {
id: id,
endpoint: endpoint,
peer_type: PeerType::Optional,
last_attempted: None,
})
}
}
impl PartialEq for Node {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Node { }
impl Hash for Node {
fn hash<H>(&self, state: &mut H) where H: Hasher {
self.id.hash(state)
}
}

View File

@@ -0,0 +1,61 @@
use error::*;
use network::{NetworkProtocolHandler};
use network::error::{NetworkError};
use network::host::{Host, NetworkIoMessage, PeerId, PacketId, ProtocolId};
use io::*;
/// IO Service with networking
/// `Message` defines a notification data type.
pub struct NetworkService<Message> where Message: Send + 'static {
io_service: IoService<NetworkIoMessage<Message>>,
host_info: String,
}
impl<Message> NetworkService<Message> where Message: Send + 'static {
/// Starts IO event loop
pub fn start() -> Result<NetworkService<Message>, UtilError> {
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
let host = Box::new(Host::new());
let host_info = host.info.client_version.clone();
info!("NetworkService::start(): id={:?}", host.info.id());
try!(io_service.register_handler(host));
Ok(NetworkService {
io_service: io_service,
host_info: host_info,
})
}
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
pub fn send(&mut self, peer: &PeerId, packet_id: PacketId, protocol: ProtocolId, data: &[u8]) -> Result<(), NetworkError> {
try!(self.io_service.send_message(NetworkIoMessage::Send {
peer: *peer,
packet_id: packet_id,
protocol: protocol,
data: data.to_vec()
}));
Ok(())
}
/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&mut self, handler: Box<NetworkProtocolHandler<Message>+Send>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
try!(self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: Some(handler),
protocol: protocol,
versions: versions.to_vec(),
}));
Ok(())
}
/// Returns host identifier string as advertised to other peers
pub fn host_info(&self) -> String {
self.host_info.clone()
}
/// Returns underlying io service.
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
&mut self.io_service
}
}

282
util/src/network/session.rs Normal file
View File

@@ -0,0 +1,282 @@
use mio::*;
use hash::*;
use rlp::*;
use network::connection::{EncryptedConnection, Packet};
use network::handshake::Handshake;
use error::*;
use network::error::{NetworkError, DisconnectReason};
use network::host::*;
use network::node::NodeId;
/// Peer session over encrypted connection.
/// When created waits for Hello packet exchange and signals ready state.
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
pub struct Session {
/// Shared session information
pub info: SessionInfo,
/// Underlying connection
connection: EncryptedConnection,
/// Session ready flag. Set after successfull Hello packet exchange
had_hello: bool,
}
/// Structure used to report various session events.
pub enum SessionData {
None,
/// Session is ready to send/receive packets.
Ready,
/// A packet has been received
Packet {
/// Packet data
data: Vec<u8>,
/// Packet protocol ID
protocol: &'static str,
/// Zero based packet ID
packet_id: u8,
},
}
/// Shared session information
pub struct SessionInfo {
/// Peer public key
pub id: NodeId,
/// Peer client ID
pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32,
/// Peer protocol capabilities
capabilities: Vec<SessionCapabilityInfo>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: String,
pub version: u8,
}
impl Decodable for PeerCapabilityInfo {
fn decode<D>(decoder: &D) -> Result<Self, DecoderError> where D: Decoder {
let c = try!(decoder.as_list());
let v: u32 = try!(Decodable::decode(&c[1]));
Ok(PeerCapabilityInfo {
protocol: try!(Decodable::decode(&c[0])),
version: v as u8,
})
}
}
#[derive(Debug, PartialEq, Eq)]
struct SessionCapabilityInfo {
pub protocol: &'static str,
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
}
const PACKET_HELLO: u8 = 0x80;
const PACKET_DISCONNECT: u8 = 0x01;
const PACKET_PING: u8 = 0x02;
const PACKET_PONG: u8 = 0x03;
const PACKET_GET_PEERS: u8 = 0x04;
const PACKET_PEERS: u8 = 0x05;
const PACKET_USER: u8 = 0x10;
const PACKET_LAST: u8 = 0x7f;
impl Session {
/// Create a new session out of comepleted handshake. Consumes handshake object.
pub fn new<Host:Handler<Timeout=Token>>(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, UtilError> {
let id = h.id.clone();
let connection = try!(EncryptedConnection::new(h));
let mut session = Session {
connection: connection,
had_hello: false,
info: SessionInfo {
id: id,
client_version: String::new(),
protocol_version: 0,
capabilities: Vec::new(),
},
};
try!(session.write_hello(host));
try!(session.write_ping());
try!(session.connection.register(event_loop));
Ok(session)
}
/// Check if session is ready to send/receive data
pub fn is_ready(&self) -> bool {
self.had_hello
}
/// Readable IO handler. Returns packet data if available.
pub fn readable<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<SessionData, UtilError> {
match try!(self.connection.readable(event_loop)) {
Some(data) => Ok(try!(self.read_packet(data, host))),
None => Ok(SessionData::None)
}
}
/// Writable IO handler. Sends pending packets.
pub fn writable<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
self.connection.writable(event_loop)
}
/// Checks if peer supports given capability
pub fn have_capability(&self, protocol: &str) -> bool {
self.info.capabilities.iter().any(|c| c.protocol == protocol)
}
/// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn reregister<Host:Handler>(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.reregister(event_loop)
}
/// Send a protocol packet to peer.
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
let mut i = 0usize;
while protocol != self.info.capabilities[i].protocol {
i += 1;
if i == self.info.capabilities.len() {
debug!(target: "net", "Unkown protocol: {:?}", protocol);
return Ok(())
}
}
let pid = self.info.capabilities[i].id_offset + packet_id;
let mut rlp = RlpStream::new();
rlp.append(&(pid as u32));
rlp.append_raw(data, 1);
self.connection.send_packet(&rlp.out())
}
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> {
if packet.data.len() < 2 {
return Err(From::from(NetworkError::BadProtocol));
}
let packet_id = packet.data[0];
if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello {
return Err(From::from(NetworkError::BadProtocol));
}
match packet_id {
PACKET_HELLO => {
let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size
try!(self.read_hello(&rlp, host));
Ok(SessionData::Ready)
},
PACKET_DISCONNECT => Err(From::from(NetworkError::Disconnect(DisconnectReason::DisconnectRequested))),
PACKET_PING => {
try!(self.write_pong());
Ok(SessionData::None)
},
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
PACKET_PEERS => Ok(SessionData::None),
PACKET_USER ... PACKET_LAST => {
let mut i = 0usize;
while packet_id < self.info.capabilities[i].id_offset {
i += 1;
if i == self.info.capabilities.len() {
debug!(target: "net", "Unkown packet: {:?}", packet_id);
return Ok(SessionData::None)
}
}
// map to protocol
let protocol = self.info.capabilities[i].protocol;
let pid = packet_id - self.info.capabilities[i].id_offset;
return Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
},
_ => {
debug!(target: "net", "Unkown packet: {:?}", packet_id);
Ok(SessionData::None)
}
}
}
fn write_hello(&mut self, host: &HostInfo) -> Result<(), UtilError> {
let mut rlp = RlpStream::new();
rlp.append_raw(&[PACKET_HELLO as u8], 0);
rlp.append_list(5)
.append(&host.protocol_version)
.append(&host.client_version)
.append(&host.capabilities)
.append(&host.listen_port)
.append(host.id());
self.connection.send_packet(&rlp.out())
}
fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), UtilError> {
let protocol = try!(rlp.val_at::<u32>(0));
let client_version = try!(rlp.val_at::<String>(1));
let peer_caps = try!(rlp.val_at::<Vec<PeerCapabilityInfo>>(2));
let id = try!(rlp.val_at::<NodeId>(4));
// Intersect with host capabilities
// Leave only highset mutually supported capability version
let mut caps: Vec<SessionCapabilityInfo> = Vec::new();
for hc in host.capabilities.iter() {
if peer_caps.iter().any(|c| c.protocol == hc.protocol && c.version == hc.version) {
caps.push(SessionCapabilityInfo {
protocol: hc.protocol,
version: hc.version,
id_offset: 0,
packet_count: hc.packet_count,
});
}
}
caps.retain(|c| host.capabilities.iter().any(|hc| hc.protocol == c.protocol && hc.version == c.version));
let mut i = 0;
while i < caps.len() {
if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) {
caps.remove(i);
}
else {
i += 1;
}
}
i = 0;
let mut offset: u8 = PACKET_USER;
while i < caps.len() {
caps[i].id_offset = offset;
offset += caps[i].packet_count;
i += 1;
}
trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
self.info.client_version = client_version;
self.info.capabilities = caps;
if protocol != host.protocol_version {
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));
}
self.had_hello = true;
Ok(())
}
fn write_ping(&mut self) -> Result<(), UtilError> {
self.send(try!(Session::prepare(PACKET_PING)))
}
fn write_pong(&mut self) -> Result<(), UtilError> {
self.send(try!(Session::prepare(PACKET_PONG)))
}
fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32));
rlp.append_list(1);
rlp.append(&(reason.clone() as u32));
self.connection.send_packet(&rlp.out()).ok();
NetworkError::Disconnect(reason)
}
fn prepare(packet_id: u8) -> Result<RlpStream, UtilError> {
let mut rlp = RlpStream::new();
rlp.append(&(packet_id as u32));
rlp.append_list(0);
Ok(rlp)
}
fn send(&mut self, rlp: RlpStream) -> Result<(), UtilError> {
self.connection.send_packet(&rlp.out())
}
}