Documentation

This commit is contained in:
arkpar 2016-01-10 22:42:27 +01:00
parent 02b530f4aa
commit 6f3c3fa020
4 changed files with 131 additions and 4 deletions

View File

@ -19,22 +19,33 @@ use tiny_keccak::Keccak;
const ENCRYPTED_HEADER_LEN: usize = 32;
/// Low level tcp connection
pub struct Connection {
/// Connection id (token)
pub token: Token,
/// Network socket
pub socket: TcpStream,
/// Receive buffer
rec_buf: Bytes,
/// Expected size
rec_size: usize,
/// Send out packets FIFO
send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet,
}
/// Connection write status.
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing,
/// All data sent.
Complete
}
impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: Token, socket: TcpStream) -> Connection {
Connection {
token: token,
@ -46,6 +57,7 @@ impl Connection {
}
}
/// Put a connection into read mode. Receiving up `size` bytes of data.
pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start");
@ -54,6 +66,7 @@ impl Connection {
self.rec_size = size;
}
/// Readable IO handler. Called when there is some data to be read.
//TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
@ -72,6 +85,7 @@ impl Connection {
}
}
/// Add a packet to send queue.
pub fn send(&mut self, data: Bytes) {
if data.len() != 0 {
self.send_queue.push_back(Cursor::new(data));
@ -81,6 +95,7 @@ impl Connection {
}
}
/// Writable IO handler. Called when the socket is ready to send.
pub fn writable(&mut self) -> io::Result<WriteStatus> {
if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete)
@ -117,6 +132,7 @@ impl Connection {
})
}
/// Register this connection with the IO event loop.
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", self.token);
self.interest.insert(EventSet::readable());
@ -126,6 +142,7 @@ impl Connection {
})
}
/// Update connection registration. Should be called at the end of the IO handler.
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", self.token);
event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
@ -135,30 +152,47 @@ impl Connection {
}
}
/// RLPx packet
pub struct Packet {
pub protocol: u16,
pub data: Bytes,
}
/// Encrypted connection receiving state.
enum EncryptedConnectionState {
/// Reading a header.
Header,
/// Reading the rest of the packet.
Payload,
}
/// Connection implementing RLPx framing
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
pub struct EncryptedConnection {
/// Underlying tcp connection
connection: Connection,
/// Egress data encryptor
encoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
decoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
mac_encoder: EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>,
/// MAC for egress data
egress_mac: Keccak,
/// MAC for ingress data
ingress_mac: Keccak,
/// Read state
read_state: EncryptedConnectionState,
/// Disconnect timeout
idle_timeout: Option<Timeout>,
/// Protocol id for the last received packet
protocol_id: u16,
/// Payload expected to be received for the last header.
payload_len: usize,
}
impl EncryptedConnection {
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
pub fn new(handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
let mut nonce_material = H512::new();
@ -208,6 +242,7 @@ impl EncryptedConnection {
})
}
/// Send a packet
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> {
let mut header = RlpStream::new();
let len = payload.len() as usize;
@ -234,6 +269,7 @@ impl EncryptedConnection {
Ok(())
}
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> {
if header.len() != ENCRYPTED_HEADER_LEN {
return Err(From::from(NetworkError::Auth));
@ -263,6 +299,7 @@ impl EncryptedConnection {
Ok(())
}
/// Decrypt and authenticate packet payload.
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, UtilError> {
let padding = (16 - (self.payload_len % 16)) % 16;
let full_length = self.payload_len + padding + 16;
@ -288,6 +325,7 @@ impl EncryptedConnection {
})
}
/// Update MAC after reading or writing any data.
fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, seed: &[u8]) {
let mut prev = H128::new();
mac.clone().finalize(&mut prev);
@ -299,6 +337,7 @@ impl EncryptedConnection {
mac.update(&enc);
}
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
match self.read_state {
@ -324,12 +363,14 @@ impl EncryptedConnection {
}
}
/// Writable IO handler. Processes send queeue.
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.writable());
Ok(())
}
/// Register this connection with the event handler.
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.expect(ENCRYPTED_HEADER_LEN);
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
@ -338,6 +379,7 @@ impl EncryptedConnection {
Ok(())
}
/// Update connection registration. This should be called at the end of the event loop.
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.reregister(event_loop));
Ok(())

View File

@ -12,23 +12,39 @@ use network::NetworkError;
#[derive(PartialEq, Eq, Debug)]
enum HandshakeState {
/// Just created
New,
/// Waiting for auth packet
ReadingAuth,
/// Waiting for ack packet
ReadingAck,
/// Ready to start a session
StartSession,
}
/// RLPx protocol handhake. See https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake
pub struct Handshake {
/// Remote node public key
pub id: NodeId,
/// Underlying connection
pub connection: Connection,
/// Handshake state
state: HandshakeState,
/// Outgoing or incoming connection
pub originated: bool,
/// Disconnect timeout
idle_timeout: Option<Timeout>,
/// ECDH ephemeral
pub ecdhe: KeyPair,
/// Connection nonce
pub nonce: H256,
/// Handshake public key
pub remote_public: Public,
/// Remote connection nonce.
pub remote_nonce: H256,
/// A copy of received encryped auth packet
pub auth_cipher: Bytes,
/// A copy of received encryped ack packet
pub ack_cipher: Bytes
}
@ -36,6 +52,7 @@ const AUTH_PACKET_SIZE: usize = 307;
const ACK_PACKET_SIZE: usize = 210;
impl Handshake {
/// Create a new handshake object
pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> {
Ok(Handshake {
id: id.clone(),
@ -52,6 +69,7 @@ impl Handshake {
})
}
/// Start a handhsake
pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> {
self.originated = originated;
if originated {
@ -64,10 +82,12 @@ impl Handshake {
Ok(())
}
/// Check if handshake is complete
pub fn done(&self) -> bool {
self.state == HandshakeState::StartSession
}
/// Readable IO handler. Drives the state change.
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
match self.state {
@ -97,6 +117,7 @@ impl Handshake {
Ok(())
}
/// Writabe IO handler.
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.writable());
@ -106,6 +127,7 @@ impl Handshake {
Ok(())
}
/// Register the IO handler with the event loop
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok();
@ -113,6 +135,7 @@ impl Handshake {
Ok(())
}
/// Parse, validate and confirm auth message
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == AUTH_PACKET_SIZE);
@ -134,6 +157,7 @@ impl Handshake {
self.write_ack()
}
/// Parse and validate ack message
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == ACK_PACKET_SIZE);
@ -144,6 +168,7 @@ impl Handshake {
Ok(())
}
/// Sends auth message
fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
@ -170,6 +195,7 @@ impl Handshake {
Ok(())
}
/// Sends ack message
fn write_ack(&mut self) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants

View File

@ -22,7 +22,9 @@ const MAX_CONNECTIONS: usize = 1024;
const MAX_USER_TIMERS: usize = 32;
const IDEAL_PEERS: u32 = 10;
/// Node public key
pub type NodeId = H512;
/// IO Timer id
pub type TimerToken = usize;
#[derive(Debug)]
@ -47,13 +49,18 @@ impl NetworkConfiguration {
}
#[derive(Debug)]
/// Noe address info
pub struct NodeEndpoint {
/// IP(V4 or V6) address
address: SocketAddr,
/// Address as string (can be host name).
address_str: String,
/// Conneciton port.
udp_port: u16
}
impl NodeEndpoint {
/// Create endpoint from string. Performs name resolution if given a host name.
fn from_str(s: &str) -> Result<NodeEndpoint, UtilError> {
let address = s.to_socket_addrs().map(|mut i| i.next());
match address {
@ -124,39 +131,52 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1;
const USER_TIMER: usize = LAST_CONNECTION;
const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1;
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = &'static str;
/// Messages used to communitate with the event loop from other threads.
pub enum HostMessage {
/// Shutdown the event loop
Shutdown,
/// Register a new protocol handler.
AddHandler {
handler: Box<ProtocolHandler+Send>,
protocol: ProtocolId,
versions: Vec<u8>,
},
/// Send data over the network.
Send {
peer: PeerId,
packet_id: PacketId,
protocol: ProtocolId,
data: Vec<u8>,
},
/// Broadcast a message across the protocol handlers.
UserMessage(UserMessage),
}
/// Id for broadcast message
pub type UserMessageId = u32;
/// User
pub struct UserMessage {
/// ID of a protocol
pub protocol: ProtocolId,
pub id: UserMessageId,
pub data: Option<Vec<u8>>,
}
/// Local (temporary) peer session ID.
pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8,
}
@ -169,7 +189,7 @@ impl Encodable for CapabilityInfo {
}
}
/// IO access point
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct HostIo<'s> {
protocol: ProtocolId,
connections: &'s mut Slab<ConnectionEntry>,
@ -179,6 +199,7 @@ pub struct HostIo<'s> {
}
impl<'s> HostIo<'s> {
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(protocol: ProtocolId, session: Option<Token>, event_loop: &'s mut EventLoop<Host>, connections: &'s mut Slab<ConnectionEntry>, timers: &'s mut Slab<UserTimer>) -> HostIo<'s> {
HostIo {
protocol: protocol,
@ -252,24 +273,36 @@ struct UserTimer {
delay: u64,
}
/// Shared host information
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
nonce: H256,
/// RLPx protocol version
pub protocol_version: u32,
/// Client identifier
pub client_version: String,
/// TCP connection port.
pub listen_port: u16,
/// Registered capabilities (handlers)
pub capabilities: Vec<CapabilityInfo>
}
impl HostInfo {
/// Returns public key
pub fn id(&self) -> &NodeId {
self.keys.public()
}
/// Returns secret key
pub fn secret(&self) -> &Secret {
self.keys.secret()
}
/// Increments and returns connection nonce.
pub fn next_nonce(&mut self) -> H256 {
self.nonce = self.nonce.sha3();
return self.nonce.clone();
@ -281,6 +314,7 @@ enum ConnectionEntry {
Session(Session)
}
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host {
info: HostInfo,
_udp_socket: UdpSocket,
@ -293,6 +327,7 @@ pub struct Host {
}
impl Host {
/// Creates a new instance and registers it with the event loop.
pub fn start(event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
let config = NetworkConfiguration::new();
/*
@ -457,7 +492,7 @@ impl Host {
fn accept(&mut self, _event_loop: &mut EventLoop<Host>) {
warn!(target: "net", "accept");
trace!(target: "net", "accept");
}
fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {

View File

@ -7,27 +7,44 @@ use error::*;
use network::{NetworkError, DisconnectReason};
use network::host::*;
/// Peer session over encrypted connection.
/// When created waits for Hello packet exchange and signals ready state.
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
pub struct Session {
/// Shared session information
pub info: SessionInfo,
/// Underlying connection
connection: EncryptedConnection,
/// Session ready flag. Set after successfull Hello packet exchange
had_hello: bool,
}
/// Structure used to report various session events.
pub enum SessionData {
None,
/// Session is ready to send/receive packets.
Ready,
/// A packet has been received
Packet {
/// Packet data
data: Vec<u8>,
/// Packet protocol ID
protocol: &'static str,
/// Zero based packet ID
packet_id: u8,
},
}
/// Shared session information
pub struct SessionInfo {
/// Peer public key
pub id: NodeId,
/// Peer client ID
pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32,
pub capabilities: Vec<SessionCapabilityInfo>,
/// Peer protocol capabilities
capabilities: Vec<SessionCapabilityInfo>,
}
#[derive(Debug, PartialEq, Eq)]
@ -48,7 +65,7 @@ impl Decodable for PeerCapabilityInfo {
}
#[derive(Debug, PartialEq, Eq)]
pub struct SessionCapabilityInfo {
struct SessionCapabilityInfo {
pub protocol: &'static str,
pub version: u8,
pub packet_count: u8,
@ -65,6 +82,7 @@ const PACKET_USER: u8 = 0x10;
const PACKET_LAST: u8 = 0x7f;
impl Session {
/// Create a new session out of comepleted handshake. Consumes handshake object.
pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, UtilError> {
let id = h.id.clone();
let connection = try!(EncryptedConnection::new(h));
@ -84,10 +102,12 @@ impl Session {
Ok(session)
}
/// Check if session is ready to send/receive data
pub fn is_ready(&self) -> bool {
self.had_hello
}
/// Readable IO handler. Returns packet data if available.
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<SessionData, UtilError> {
match try!(self.connection.readable(event_loop)) {
Some(data) => Ok(try!(self.read_packet(data, host))),
@ -95,18 +115,22 @@ impl Session {
}
}
/// Writable IO handler. Sends pending packets.
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
self.connection.writable(event_loop)
}
/// Checks if peer supports given capability
pub fn have_capability(&self, protocol: &str) -> bool {
self.info.capabilities.iter().any(|c| c.protocol == protocol)
}
/// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.reregister(event_loop)
}
/// Send a protocol packet to peer.
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
let mut i = 0usize;
while protocol != self.info.capabilities[i].protocol {