From b0cef968e7c900a04534eecd79e526fb4410f43c Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 10 Jan 2016 14:02:01 +0100 Subject: [PATCH 1/4] Networking fixes --- src/network/connection.rs | 23 +++++++++++------- src/network/host.rs | 51 +++++++++++++++++++++++++++++++++------ src/network/session.rs | 20 ++++++++++----- src/rlp/untrusted_rlp.rs | 2 +- 4 files changed, 73 insertions(+), 23 deletions(-) diff --git a/src/network/connection.rs b/src/network/connection.rs index 2a8025b83..131aae318 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -154,7 +154,7 @@ pub struct EncryptedConnection { read_state: EncryptedConnectionState, idle_timeout: Option, protocol_id: u16, - payload_len: u32, + payload_len: usize, } impl EncryptedConnection { @@ -223,7 +223,7 @@ impl EncryptedConnection { self.egress_mac.clone().finalize(&mut packet[16..32]); self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding"); if padding != 0 { - let pad = [08; 16]; + let pad = [0u8; 16]; self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding"); } self.egress_mac.update(&packet[32..(32 + len + padding)]); @@ -252,7 +252,7 @@ impl EncryptedConnection { let header_rlp = UntrustedRlp::new(&hdec[3..6]); let protocol_id = try!(header_rlp.val_at::(0)); - self.payload_len = length; + self.payload_len = length as usize; self.protocol_id = protocol_id; self.read_state = EncryptedConnectionState::Payload; @@ -264,7 +264,7 @@ impl EncryptedConnection { fn read_payload(&mut self, payload: &[u8]) -> Result { let padding = (16 - (self.payload_len % 16)) % 16; - let full_length = (self.payload_len + padding + 16) as usize; + let full_length = self.payload_len + padding + 16; if payload.len() != full_length { return Err(Error::Auth); } @@ -277,9 +277,10 @@ impl EncryptedConnection { return Err(Error::Auth); } - let mut packet = vec![0u8; self.payload_len as usize]; - self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..(full_length - 16)]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); - packet.resize(self.payload_len as usize, 0u8); + let mut packet = vec![0u8; self.payload_len]; + self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..self.payload_len]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); + let mut pad_buf = [0u8; 16]; + self.decoder.decrypt(&mut RefReadBuffer::new(&payload[self.payload_len..(payload.len() - 16)]), &mut RefWriteBuffer::new(&mut pad_buf), false).expect("Invalid length or padding"); Ok(Packet { protocol: self.protocol_id, data: packet @@ -299,7 +300,6 @@ impl EncryptedConnection { pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, Error> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); - try!(self.connection.reregister(event_loop)); match self.read_state { EncryptedConnectionState::Header => { match try!(self.connection.readable()) { @@ -326,7 +326,6 @@ impl EncryptedConnection { pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); - try!(self.connection.reregister(event_loop)); Ok(()) } @@ -337,6 +336,12 @@ impl EncryptedConnection { try!(self.connection.reregister(event_loop)); Ok(()) } + + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { + try!(self.connection.reregister(event_loop)); + Ok(()) + } + } #[test] diff --git a/src/network/host.rs b/src/network/host.rs index 9e2b3e101..b9ca0152b 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -295,11 +295,10 @@ impl Host { pub fn start(event_loop: &mut EventLoop) -> Result<(), Error> { let config = NetworkConfiguration::new(); /* - match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { - Some(iface) => config.public_address = iface.addr.unwrap(), - None => warn!("No public network interface"), - } - */ + match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { + Some(iface) => config.public_address = iface.addr.unwrap(), + None => warn!("No public network interface"), + */ let addr = config.listen_address; // Setup the server socket @@ -487,8 +486,17 @@ impl Host { if create_session { self.start_session(token, event_loop); } + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + }, + _ => (), + } } + fn connection_closed(&mut self, token: Token, event_loop: &mut EventLoop) { + self.kill_connection(token, event_loop); + } fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { let mut kill = false; @@ -549,6 +557,12 @@ impl Host { h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); } + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); + }, + _ => (), + } } fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { @@ -570,7 +584,23 @@ impl Host { self.kill_connection(token, event_loop) } - fn kill_connection(&mut self, token: Token, _event_loop: &mut EventLoop) { + fn kill_connection(&mut self, token: Token, event_loop: &mut EventLoop) { + let mut to_disconnect: Vec = Vec::new(); + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake + Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { + for (p, _) in self.handlers.iter_mut() { + if s.have_capability(p) { + to_disconnect.push(p); + } + } + }, + _ => (), + } + for p in to_disconnect { + let mut h = self.handlers.get_mut(p).unwrap(); + h.disconnected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + } self.connections.remove(token); } } @@ -580,7 +610,14 @@ impl Handler for Host { type Message = HostMessage; fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { - if events.is_readable() { + if events.is_hup() { + trace!(target: "net", "hup"); + match token.as_usize() { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(token, event_loop), + _ => warn!(target: "net", "Unexpected hup"), + }; + } + else if events.is_readable() { match token.as_usize() { TCP_ACCEPT => self.accept(event_loop), IDLE => self.maintain_network(event_loop), diff --git a/src/network/session.rs b/src/network/session.rs index 720902150..8e89ff2c4 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -83,6 +83,10 @@ impl Session { Ok(session) } + pub fn is_ready(&self) -> bool { + self.had_hello + } + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { Some(data) => self.read_packet(data, host), @@ -98,6 +102,10 @@ impl Session { self.info.capabilities.iter().any(|c| c.protocol == protocol) } + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { + self.connection.reregister(event_loop) + } + pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), Error> { let mut i = 0usize; while protocol != self.info.capabilities[i].protocol { @@ -159,7 +167,7 @@ impl Session { fn write_hello(&mut self, host: &HostInfo) -> Result<(), Error> { let mut rlp = RlpStream::new(); - rlp.append(&(PACKET_HELLO as u32)); + rlp.append_raw(&[PACKET_HELLO as u8], 0); rlp.append_list(5) .append(&host.protocol_version) .append(&host.client_version) @@ -217,11 +225,11 @@ impl Session { } fn write_ping(&mut self) -> Result<(), Error> { - self.send(try!(Session::prepare(PACKET_PING, 0))) + self.send(try!(Session::prepare(PACKET_PING))) } fn write_pong(&mut self) -> Result<(), Error> { - self.send(try!(Session::prepare(PACKET_PONG, 0))) + self.send(try!(Session::prepare(PACKET_PONG))) } fn disconnect(&mut self, reason: DisconnectReason) -> Error { @@ -233,10 +241,10 @@ impl Session { Error::Disconnect(reason) } - fn prepare(packet_id: u8, items: usize) -> Result { - let mut rlp = RlpStream::new_list(1); + fn prepare(packet_id: u8) -> Result { + let mut rlp = RlpStream::new(); rlp.append(&(packet_id as u32)); - rlp.append_list(items); + rlp.append_list(0); Ok(rlp) } diff --git a/src/rlp/untrusted_rlp.rs b/src/rlp/untrusted_rlp.rs index 5a12cbc5e..452a198bb 100644 --- a/src/rlp/untrusted_rlp.rs +++ b/src/rlp/untrusted_rlp.rs @@ -188,7 +188,7 @@ impl<'a, 'view> View<'a, 'view> for UntrustedRlp<'a> where 'a: 'view { } fn val_at(&self, index: usize) -> Result where T: Decodable { - self.at(index).unwrap().as_val() + try!(self.at(index)).as_val() } } From 02b530f4aae70c7b8b55062a4adf287031f3da1f Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 10 Jan 2016 15:13:12 +0100 Subject: [PATCH 2/4] Style --- src/network/connection.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/network/connection.rs b/src/network/connection.rs index efa0059eb..3f5d420e9 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -342,7 +342,6 @@ impl EncryptedConnection { try!(self.connection.reregister(event_loop)); Ok(()) } - } #[test] From 6f3c3fa020816dd266e118ed4e532d81aca14027 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 10 Jan 2016 22:42:27 +0100 Subject: [PATCH 3/4] Documentation --- src/network/connection.rs | 42 +++++++++++++++++++++++++++++++++++++++ src/network/handshake.rs | 26 ++++++++++++++++++++++++ src/network/host.rs | 39 ++++++++++++++++++++++++++++++++++-- src/network/session.rs | 28 ++++++++++++++++++++++++-- 4 files changed, 131 insertions(+), 4 deletions(-) diff --git a/src/network/connection.rs b/src/network/connection.rs index 3f5d420e9..2c671a08a 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -19,22 +19,33 @@ use tiny_keccak::Keccak; const ENCRYPTED_HEADER_LEN: usize = 32; +/// Low level tcp connection pub struct Connection { + /// Connection id (token) pub token: Token, + /// Network socket pub socket: TcpStream, + /// Receive buffer rec_buf: Bytes, + /// Expected size rec_size: usize, + /// Send out packets FIFO send_queue: VecDeque>, + /// Event flags this connection expects interest: EventSet, } +/// Connection write status. #[derive(PartialEq, Eq)] pub enum WriteStatus { + /// Some data is still pending for current packet Ongoing, + /// All data sent. Complete } impl Connection { + /// Create a new connection with given id and socket. pub fn new(token: Token, socket: TcpStream) -> Connection { Connection { token: token, @@ -46,6 +57,7 @@ impl Connection { } } + /// Put a connection into read mode. Receiving up `size` bytes of data. pub fn expect(&mut self, size: usize) { if self.rec_size != self.rec_buf.len() { warn!(target:"net", "Unexpected connection read start"); @@ -54,6 +66,7 @@ impl Connection { self.rec_size = size; } + /// Readable IO handler. Called when there is some data to be read. //TODO: return a slice pub fn readable(&mut self) -> io::Result> { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { @@ -72,6 +85,7 @@ impl Connection { } } + /// Add a packet to send queue. pub fn send(&mut self, data: Bytes) { if data.len() != 0 { self.send_queue.push_back(Cursor::new(data)); @@ -81,6 +95,7 @@ impl Connection { } } + /// Writable IO handler. Called when the socket is ready to send. pub fn writable(&mut self) -> io::Result { if self.send_queue.is_empty() { return Ok(WriteStatus::Complete) @@ -117,6 +132,7 @@ impl Connection { }) } + /// Register this connection with the IO event loop. pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", self.token); self.interest.insert(EventSet::readable()); @@ -126,6 +142,7 @@ impl Connection { }) } + /// Update connection registration. Should be called at the end of the IO handler. pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection reregister; token={:?}", self.token); event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { @@ -135,30 +152,47 @@ impl Connection { } } +/// RLPx packet pub struct Packet { pub protocol: u16, pub data: Bytes, } +/// Encrypted connection receiving state. enum EncryptedConnectionState { + /// Reading a header. Header, + /// Reading the rest of the packet. Payload, } +/// Connection implementing RLPx framing +/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing pub struct EncryptedConnection { + /// Underlying tcp connection connection: Connection, + /// Egress data encryptor encoder: CtrMode, + /// Ingress data decryptor decoder: CtrMode, + /// Ingress data decryptor mac_encoder: EcbEncryptor>, + /// MAC for egress data egress_mac: Keccak, + /// MAC for ingress data ingress_mac: Keccak, + /// Read state read_state: EncryptedConnectionState, + /// Disconnect timeout idle_timeout: Option, + /// Protocol id for the last received packet protocol_id: u16, + /// Payload expected to be received for the last header. payload_len: usize, } impl EncryptedConnection { + /// Create an encrypted connection out of the handshake. Consumes a handshake object. pub fn new(handshake: Handshake) -> Result { let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public)); let mut nonce_material = H512::new(); @@ -208,6 +242,7 @@ impl EncryptedConnection { }) } + /// Send a packet pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> { let mut header = RlpStream::new(); let len = payload.len() as usize; @@ -234,6 +269,7 @@ impl EncryptedConnection { Ok(()) } + /// Decrypt and authenticate an incoming packet header. Prepare for receiving payload. fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> { if header.len() != ENCRYPTED_HEADER_LEN { return Err(From::from(NetworkError::Auth)); @@ -263,6 +299,7 @@ impl EncryptedConnection { Ok(()) } + /// Decrypt and authenticate packet payload. fn read_payload(&mut self, payload: &[u8]) -> Result { let padding = (16 - (self.payload_len % 16)) % 16; let full_length = self.payload_len + padding + 16; @@ -288,6 +325,7 @@ impl EncryptedConnection { }) } + /// Update MAC after reading or writing any data. fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor>, seed: &[u8]) { let mut prev = H128::new(); mac.clone().finalize(&mut prev); @@ -299,6 +337,7 @@ impl EncryptedConnection { mac.update(&enc); } + /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.read_state { @@ -324,12 +363,14 @@ impl EncryptedConnection { } } + /// Writable IO handler. Processes send queeue. pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); Ok(()) } + /// Register this connection with the event handler. pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.expect(ENCRYPTED_HEADER_LEN); self.idle_timeout.map(|t| event_loop.clear_timeout(t)); @@ -338,6 +379,7 @@ impl EncryptedConnection { Ok(()) } + /// Update connection registration. This should be called at the end of the event loop. pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { try!(self.connection.reregister(event_loop)); Ok(()) diff --git a/src/network/handshake.rs b/src/network/handshake.rs index d9e280bfe..af67f7ddf 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -12,23 +12,39 @@ use network::NetworkError; #[derive(PartialEq, Eq, Debug)] enum HandshakeState { + /// Just created New, + /// Waiting for auth packet ReadingAuth, + /// Waiting for ack packet ReadingAck, + /// Ready to start a session StartSession, } +/// RLPx protocol handhake. See https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake pub struct Handshake { + /// Remote node public key pub id: NodeId, + /// Underlying connection pub connection: Connection, + /// Handshake state state: HandshakeState, + /// Outgoing or incoming connection pub originated: bool, + /// Disconnect timeout idle_timeout: Option, + /// ECDH ephemeral pub ecdhe: KeyPair, + /// Connection nonce pub nonce: H256, + /// Handshake public key pub remote_public: Public, + /// Remote connection nonce. pub remote_nonce: H256, + /// A copy of received encryped auth packet pub auth_cipher: Bytes, + /// A copy of received encryped ack packet pub ack_cipher: Bytes } @@ -36,6 +52,7 @@ const AUTH_PACKET_SIZE: usize = 307; const ACK_PACKET_SIZE: usize = 210; impl Handshake { + /// Create a new handshake object pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result { Ok(Handshake { id: id.clone(), @@ -52,6 +69,7 @@ impl Handshake { }) } + /// Start a handhsake pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> { self.originated = originated; if originated { @@ -64,10 +82,12 @@ impl Handshake { Ok(()) } + /// Check if handshake is complete pub fn done(&self) -> bool { self.state == HandshakeState::StartSession } + /// Readable IO handler. Drives the state change. pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.state { @@ -97,6 +117,7 @@ impl Handshake { Ok(()) } + /// Writabe IO handler. pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.writable()); @@ -106,6 +127,7 @@ impl Handshake { Ok(()) } + /// Register the IO handler with the event loop pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); @@ -113,6 +135,7 @@ impl Handshake { Ok(()) } + /// Parse, validate and confirm auth message fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); assert!(data.len() == AUTH_PACKET_SIZE); @@ -134,6 +157,7 @@ impl Handshake { self.write_ack() } + /// Parse and validate ack message fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); assert!(data.len() == ACK_PACKET_SIZE); @@ -144,6 +168,7 @@ impl Handshake { Ok(()) } + /// Sends auth message fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> { trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants @@ -170,6 +195,7 @@ impl Handshake { Ok(()) } + /// Sends ack message fn write_ack(&mut self) -> Result<(), UtilError> { trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants diff --git a/src/network/host.rs b/src/network/host.rs index f48b9eb2d..11dc491b2 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -22,7 +22,9 @@ const MAX_CONNECTIONS: usize = 1024; const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS: u32 = 10; +/// Node public key pub type NodeId = H512; +/// IO Timer id pub type TimerToken = usize; #[derive(Debug)] @@ -47,13 +49,18 @@ impl NetworkConfiguration { } #[derive(Debug)] +/// Noe address info pub struct NodeEndpoint { + /// IP(V4 or V6) address address: SocketAddr, + /// Address as string (can be host name). address_str: String, + /// Conneciton port. udp_port: u16 } impl NodeEndpoint { + /// Create endpoint from string. Performs name resolution if given a host name. fn from_str(s: &str) -> Result { let address = s.to_socket_addrs().map(|mut i| i.next()); match address { @@ -124,39 +131,52 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; const USER_TIMER: usize = LAST_CONNECTION; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; +/// Protocol handler level packet id pub type PacketId = u8; +/// Protocol / handler id pub type ProtocolId = &'static str; +/// Messages used to communitate with the event loop from other threads. pub enum HostMessage { + /// Shutdown the event loop Shutdown, + /// Register a new protocol handler. AddHandler { handler: Box, protocol: ProtocolId, versions: Vec, }, + /// Send data over the network. Send { peer: PeerId, packet_id: PacketId, protocol: ProtocolId, data: Vec, }, + /// Broadcast a message across the protocol handlers. UserMessage(UserMessage), } +/// Id for broadcast message pub type UserMessageId = u32; +/// User pub struct UserMessage { + /// ID of a protocol pub protocol: ProtocolId, pub id: UserMessageId, pub data: Option>, } +/// Local (temporary) peer session ID. pub type PeerId = usize; #[derive(Debug, PartialEq, Eq)] +/// Protocol info pub struct CapabilityInfo { pub protocol: ProtocolId, pub version: u8, + /// Total number of packet IDs this protocol support. pub packet_count: u8, } @@ -169,7 +189,7 @@ impl Encodable for CapabilityInfo { } } -/// IO access point +/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. pub struct HostIo<'s> { protocol: ProtocolId, connections: &'s mut Slab, @@ -179,6 +199,7 @@ pub struct HostIo<'s> { } impl<'s> HostIo<'s> { + /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { HostIo { protocol: protocol, @@ -252,24 +273,36 @@ struct UserTimer { delay: u64, } +/// Shared host information pub struct HostInfo { + /// Our private and public keys. keys: KeyPair, + /// Current network configuration config: NetworkConfiguration, + /// Connection nonce. nonce: H256, + /// RLPx protocol version pub protocol_version: u32, + /// Client identifier pub client_version: String, + /// TCP connection port. pub listen_port: u16, + /// Registered capabilities (handlers) pub capabilities: Vec } impl HostInfo { + /// Returns public key pub fn id(&self) -> &NodeId { self.keys.public() } + /// Returns secret key pub fn secret(&self) -> &Secret { self.keys.secret() } + + /// Increments and returns connection nonce. pub fn next_nonce(&mut self) -> H256 { self.nonce = self.nonce.sha3(); return self.nonce.clone(); @@ -281,6 +314,7 @@ enum ConnectionEntry { Session(Session) } +/// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host { info: HostInfo, _udp_socket: UdpSocket, @@ -293,6 +327,7 @@ pub struct Host { } impl Host { + /// Creates a new instance and registers it with the event loop. pub fn start(event_loop: &mut EventLoop) -> Result<(), UtilError> { let config = NetworkConfiguration::new(); /* @@ -457,7 +492,7 @@ impl Host { fn accept(&mut self, _event_loop: &mut EventLoop) { - warn!(target: "net", "accept"); + trace!(target: "net", "accept"); } fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { diff --git a/src/network/session.rs b/src/network/session.rs index 141898e24..96390d366 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -7,27 +7,44 @@ use error::*; use network::{NetworkError, DisconnectReason}; use network::host::*; +/// Peer session over encrypted connection. +/// When created waits for Hello packet exchange and signals ready state. +/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect. pub struct Session { + /// Shared session information pub info: SessionInfo, + /// Underlying connection connection: EncryptedConnection, + /// Session ready flag. Set after successfull Hello packet exchange had_hello: bool, } +/// Structure used to report various session events. pub enum SessionData { None, + /// Session is ready to send/receive packets. Ready, + /// A packet has been received Packet { + /// Packet data data: Vec, + /// Packet protocol ID protocol: &'static str, + /// Zero based packet ID packet_id: u8, }, } +/// Shared session information pub struct SessionInfo { + /// Peer public key pub id: NodeId, + /// Peer client ID pub client_version: String, + /// Peer RLPx protocol version pub protocol_version: u32, - pub capabilities: Vec, + /// Peer protocol capabilities + capabilities: Vec, } #[derive(Debug, PartialEq, Eq)] @@ -48,7 +65,7 @@ impl Decodable for PeerCapabilityInfo { } #[derive(Debug, PartialEq, Eq)] -pub struct SessionCapabilityInfo { +struct SessionCapabilityInfo { pub protocol: &'static str, pub version: u8, pub packet_count: u8, @@ -65,6 +82,7 @@ const PACKET_USER: u8 = 0x10; const PACKET_LAST: u8 = 0x7f; impl Session { + /// Create a new session out of comepleted handshake. Consumes handshake object. pub fn new(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { let id = h.id.clone(); let connection = try!(EncryptedConnection::new(h)); @@ -84,10 +102,12 @@ impl Session { Ok(session) } + /// Check if session is ready to send/receive data pub fn is_ready(&self) -> bool { self.had_hello } + /// Readable IO handler. Returns packet data if available. pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { Some(data) => Ok(try!(self.read_packet(data, host))), @@ -95,18 +115,22 @@ impl Session { } } + /// Writable IO handler. Sends pending packets. pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), UtilError> { self.connection.writable(event_loop) } + /// Checks if peer supports given capability pub fn have_capability(&self, protocol: &str) -> bool { self.info.capabilities.iter().any(|c| c.protocol == protocol) } + /// Update registration with the event loop. Should be called at the end of the IO handler. pub fn reregister(&mut self, event_loop: &mut EventLoop) -> Result<(), UtilError> { self.connection.reregister(event_loop) } + /// Send a protocol packet to peer. pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { let mut i = 0usize; while protocol != self.info.capabilities[i].protocol { From c269cb5c85585679aac1b4d7d61bf12c36f3fee0 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 11 Jan 2016 11:52:18 +0100 Subject: [PATCH 4/4] Added sync to std uses --- src/standard.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/standard.rs b/src/standard.rs index b591220f3..dfdf4488b 100644 --- a/src/standard.rs +++ b/src/standard.rs @@ -9,6 +9,7 @@ pub use std::io::{Read,Write}; pub use std::hash::{Hash, Hasher}; pub use std::error::Error as StdError; +pub use std::sync::*; pub use std::ops::*; pub use std::cmp::*; pub use std::cell::*;