From 6f3c3fa020816dd266e118ed4e532d81aca14027 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 10 Jan 2016 22:42:27 +0100 Subject: [PATCH] Documentation --- src/network/connection.rs | 42 +++++++++++++++++++++++++++++++++++++++ src/network/handshake.rs | 26 ++++++++++++++++++++++++ src/network/host.rs | 39 ++++++++++++++++++++++++++++++++++-- src/network/session.rs | 28 ++++++++++++++++++++++++-- 4 files changed, 131 insertions(+), 4 deletions(-) diff --git a/src/network/connection.rs b/src/network/connection.rs index 3f5d420e9..2c671a08a 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -19,22 +19,33 @@ use tiny_keccak::Keccak; const ENCRYPTED_HEADER_LEN: usize = 32; +/// Low level tcp connection pub struct Connection { + /// Connection id (token) pub token: Token, + /// Network socket pub socket: TcpStream, + /// Receive buffer rec_buf: Bytes, + /// Expected size rec_size: usize, + /// Send out packets FIFO send_queue: VecDeque>, + /// Event flags this connection expects interest: EventSet, } +/// Connection write status. #[derive(PartialEq, Eq)] pub enum WriteStatus { + /// Some data is still pending for current packet Ongoing, + /// All data sent. Complete } impl Connection { + /// Create a new connection with given id and socket. pub fn new(token: Token, socket: TcpStream) -> Connection { Connection { token: token, @@ -46,6 +57,7 @@ impl Connection { } } + /// Put a connection into read mode. Receiving up `size` bytes of data. pub fn expect(&mut self, size: usize) { if self.rec_size != self.rec_buf.len() { warn!(target:"net", "Unexpected connection read start"); @@ -54,6 +66,7 @@ impl Connection { self.rec_size = size; } + /// Readable IO handler. Called when there is some data to be read. //TODO: return a slice pub fn readable(&mut self) -> io::Result> { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { @@ -72,6 +85,7 @@ impl Connection { } } + /// Add a packet to send queue. pub fn send(&mut self, data: Bytes) { if data.len() != 0 { self.send_queue.push_back(Cursor::new(data)); @@ -81,6 +95,7 @@ impl Connection { } } + /// Writable IO handler. Called when the socket is ready to send. pub fn writable(&mut self) -> io::Result { if self.send_queue.is_empty() { return Ok(WriteStatus::Complete) @@ -117,6 +132,7 @@ impl Connection { }) } + /// Register this connection with the IO event loop. pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", self.token); self.interest.insert(EventSet::readable()); @@ -126,6 +142,7 @@ impl Connection { }) } + /// Update connection registration. Should be called at the end of the IO handler. pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection reregister; token={:?}", self.token); event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { @@ -135,30 +152,47 @@ impl Connection { } } +/// RLPx packet pub struct Packet { pub protocol: u16, pub data: Bytes, } +/// Encrypted connection receiving state. enum EncryptedConnectionState { + /// Reading a header. Header, + /// Reading the rest of the packet. Payload, } +/// Connection implementing RLPx framing +/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing pub struct EncryptedConnection { + /// Underlying tcp connection connection: Connection, + /// Egress data encryptor encoder: CtrMode, + /// Ingress data decryptor decoder: CtrMode, + /// Ingress data decryptor mac_encoder: EcbEncryptor>, + /// MAC for egress data egress_mac: Keccak, + /// MAC for ingress data ingress_mac: Keccak, + /// Read state read_state: EncryptedConnectionState, + /// Disconnect timeout idle_timeout: Option, + /// Protocol id for the last received packet protocol_id: u16, + /// Payload expected to be received for the last header. payload_len: usize, } impl EncryptedConnection { + /// Create an encrypted connection out of the handshake. Consumes a handshake object. pub fn new(handshake: Handshake) -> Result { let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public)); let mut nonce_material = H512::new(); @@ -208,6 +242,7 @@ impl EncryptedConnection { }) } + /// Send a packet pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> { let mut header = RlpStream::new(); let len = payload.len() as usize; @@ -234,6 +269,7 @@ impl EncryptedConnection { Ok(()) } + /// Decrypt and authenticate an incoming packet header. Prepare for receiving payload. fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> { if header.len() != ENCRYPTED_HEADER_LEN { return Err(From::from(NetworkError::Auth)); @@ -263,6 +299,7 @@ impl EncryptedConnection { Ok(()) } + /// Decrypt and authenticate packet payload. fn read_payload(&mut self, payload: &[u8]) -> Result { let padding = (16 - (self.payload_len % 16)) % 16; let full_length = self.payload_len + padding + 16; @@ -288,6 +325,7 @@ impl EncryptedConnection { }) } + /// Update MAC after reading or writing any data. fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor>, seed: &[u8]) { let mut prev = H128::new(); mac.clone().finalize(&mut prev); @@ -299,6 +337,7 @@ impl EncryptedConnection { mac.update(&enc); } + /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.read_state { @@ -324,12 +363,14 @@ impl EncryptedConnection { } } + /// Writable IO handler. Processes send queeue. pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); Ok(()) } + /// Register this connection with the event handler. pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.expect(ENCRYPTED_HEADER_LEN); self.idle_timeout.map(|t| event_loop.clear_timeout(t)); @@ -338,6 +379,7 @@ impl EncryptedConnection { Ok(()) } + /// Update connection registration. This should be called at the end of the event loop. pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { try!(self.connection.reregister(event_loop)); Ok(()) diff --git a/src/network/handshake.rs b/src/network/handshake.rs index d9e280bfe..af67f7ddf 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -12,23 +12,39 @@ use network::NetworkError; #[derive(PartialEq, Eq, Debug)] enum HandshakeState { + /// Just created New, + /// Waiting for auth packet ReadingAuth, + /// Waiting for ack packet ReadingAck, + /// Ready to start a session StartSession, } +/// RLPx protocol handhake. See https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake pub struct Handshake { + /// Remote node public key pub id: NodeId, + /// Underlying connection pub connection: Connection, + /// Handshake state state: HandshakeState, + /// Outgoing or incoming connection pub originated: bool, + /// Disconnect timeout idle_timeout: Option, + /// ECDH ephemeral pub ecdhe: KeyPair, + /// Connection nonce pub nonce: H256, + /// Handshake public key pub remote_public: Public, + /// Remote connection nonce. pub remote_nonce: H256, + /// A copy of received encryped auth packet pub auth_cipher: Bytes, + /// A copy of received encryped ack packet pub ack_cipher: Bytes } @@ -36,6 +52,7 @@ const AUTH_PACKET_SIZE: usize = 307; const ACK_PACKET_SIZE: usize = 210; impl Handshake { + /// Create a new handshake object pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result { Ok(Handshake { id: id.clone(), @@ -52,6 +69,7 @@ impl Handshake { }) } + /// Start a handhsake pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> { self.originated = originated; if originated { @@ -64,10 +82,12 @@ impl Handshake { Ok(()) } + /// Check if handshake is complete pub fn done(&self) -> bool { self.state == HandshakeState::StartSession } + /// Readable IO handler. Drives the state change. pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.state { @@ -97,6 +117,7 @@ impl Handshake { Ok(()) } + /// Writabe IO handler. pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); @@ -106,6 +127,7 @@ impl Handshake { Ok(()) } + /// Register the IO handler with the event loop pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); @@ -113,6 +135,7 @@ impl Handshake { Ok(()) } + /// Parse, validate and confirm auth message fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); assert!(data.len() == AUTH_PACKET_SIZE); @@ -134,6 +157,7 @@ impl Handshake { self.write_ack() } + /// Parse and validate ack message fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); assert!(data.len() == ACK_PACKET_SIZE); @@ -144,6 +168,7 @@ impl Handshake { Ok(()) } + /// Sends auth message fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> { trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants @@ -170,6 +195,7 @@ impl Handshake { Ok(()) } + /// Sends ack message fn write_ack(&mut self) -> Result<(), UtilError> { trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants diff --git a/src/network/host.rs b/src/network/host.rs index f48b9eb2d..11dc491b2 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -22,7 +22,9 @@ const MAX_CONNECTIONS: usize = 1024; const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS: u32 = 10; +/// Node public key pub type NodeId = H512; +/// IO Timer id pub type TimerToken = usize; #[derive(Debug)] @@ -47,13 +49,18 @@ impl NetworkConfiguration { } #[derive(Debug)] +/// Noe address info pub struct NodeEndpoint { + /// IP(V4 or V6) address address: SocketAddr, + /// Address as string (can be host name). address_str: String, + /// Conneciton port. udp_port: u16 } impl NodeEndpoint { + /// Create endpoint from string. Performs name resolution if given a host name. fn from_str(s: &str) -> Result { let address = s.to_socket_addrs().map(|mut i| i.next()); match address { @@ -124,39 +131,52 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; const USER_TIMER: usize = LAST_CONNECTION; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; +/// Protocol handler level packet id pub type PacketId = u8; +/// Protocol / handler id pub type ProtocolId = &'static str; +/// Messages used to communitate with the event loop from other threads. pub enum HostMessage { + /// Shutdown the event loop Shutdown, + /// Register a new protocol handler. AddHandler { handler: Box, protocol: ProtocolId, versions: Vec, }, + /// Send data over the network. Send { peer: PeerId, packet_id: PacketId, protocol: ProtocolId, data: Vec, }, + /// Broadcast a message across the protocol handlers. UserMessage(UserMessage), } +/// Id for broadcast message pub type UserMessageId = u32; +/// User pub struct UserMessage { + /// ID of a protocol pub protocol: ProtocolId, pub id: UserMessageId, pub data: Option>, } +/// Local (temporary) peer session ID. pub type PeerId = usize; #[derive(Debug, PartialEq, Eq)] +/// Protocol info pub struct CapabilityInfo { pub protocol: ProtocolId, pub version: u8, + /// Total number of packet IDs this protocol support. pub packet_count: u8, } @@ -169,7 +189,7 @@ impl Encodable for CapabilityInfo { } } -/// IO access point +/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. pub struct HostIo<'s> { protocol: ProtocolId, connections: &'s mut Slab, @@ -179,6 +199,7 @@ pub struct HostIo<'s> { } impl<'s> HostIo<'s> { + /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { HostIo { protocol: protocol, @@ -252,24 +273,36 @@ struct UserTimer { delay: u64, } +/// Shared host information pub struct HostInfo { + /// Our private and public keys. keys: KeyPair, + /// Current network configuration config: NetworkConfiguration, + /// Connection nonce. nonce: H256, + /// RLPx protocol version pub protocol_version: u32, + /// Client identifier pub client_version: String, + /// TCP connection port. pub listen_port: u16, + /// Registered capabilities (handlers) pub capabilities: Vec } impl HostInfo { + /// Returns public key pub fn id(&self) -> &NodeId { self.keys.public() } + /// Returns secret key pub fn secret(&self) -> &Secret { self.keys.secret() } + + /// Increments and returns connection nonce. pub fn next_nonce(&mut self) -> H256 { self.nonce = self.nonce.sha3(); return self.nonce.clone(); @@ -281,6 +314,7 @@ enum ConnectionEntry { Session(Session) } +/// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host { info: HostInfo, _udp_socket: UdpSocket, @@ -293,6 +327,7 @@ pub struct Host { } impl Host { + /// Creates a new instance and registers it with the event loop. pub fn start(event_loop: &mut EventLoop) -> Result<(), UtilError> { let config = NetworkConfiguration::new(); /* @@ -457,7 +492,7 @@ impl Host { fn accept(&mut self, _event_loop: &mut EventLoop) { - warn!(target: "net", "accept"); + trace!(target: "net", "accept"); } fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { diff --git a/src/network/session.rs b/src/network/session.rs index 141898e24..96390d366 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -7,27 +7,44 @@ use error::*; use network::{NetworkError, DisconnectReason}; use network::host::*; +/// Peer session over encrypted connection. +/// When created waits for Hello packet exchange and signals ready state. +/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect. pub struct Session { + /// Shared session information pub info: SessionInfo, + /// Underlying connection connection: EncryptedConnection, + /// Session ready flag. Set after successfull Hello packet exchange had_hello: bool, } +/// Structure used to report various session events. pub enum SessionData { None, + /// Session is ready to send/receive packets. Ready, + /// A packet has been received Packet { + /// Packet data data: Vec, + /// Packet protocol ID protocol: &'static str, + /// Zero based packet ID packet_id: u8, }, } +/// Shared session information pub struct SessionInfo { + /// Peer public key pub id: NodeId, + /// Peer client ID pub client_version: String, + /// Peer RLPx protocol version pub protocol_version: u32, - pub capabilities: Vec, + /// Peer protocol capabilities + capabilities: Vec, } #[derive(Debug, PartialEq, Eq)] @@ -48,7 +65,7 @@ impl Decodable for PeerCapabilityInfo { } #[derive(Debug, PartialEq, Eq)] -pub struct SessionCapabilityInfo { +struct SessionCapabilityInfo { pub protocol: &'static str, pub version: u8, pub packet_count: u8, @@ -65,6 +82,7 @@ const PACKET_USER: u8 = 0x10; const PACKET_LAST: u8 = 0x7f; impl Session { + /// Create a new session out of comepleted handshake. Consumes handshake object. pub fn new(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { let id = h.id.clone(); let connection = try!(EncryptedConnection::new(h)); @@ -84,10 +102,12 @@ impl Session { Ok(session) } + /// Check if session is ready to send/receive data pub fn is_ready(&self) -> bool { self.had_hello } + /// Readable IO handler. Returns packet data if available. pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { Some(data) => Ok(try!(self.read_packet(data, host))), @@ -95,18 +115,22 @@ impl Session { } } + /// Writable IO handler. Sends pending packets. pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.connection.writable(event_loop) } + /// Checks if peer supports given capability pub fn have_capability(&self, protocol: &str) -> bool { self.info.capabilities.iter().any(|c| c.protocol == protocol) } + /// Update registration with the event loop. Should be called at the end of the IO handler. pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.reregister(event_loop) } + /// Send a protocol packet to peer. pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { let mut i = 0usize; while protocol != self.info.capabilities[i].protocol {