From 75b1de04822cdbb7b87cfa05ae7084fd2090ee79 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 2 Dec 2015 20:11:13 +0100 Subject: [PATCH] Session --- src/network/connection.rs | 211 ++++++++++++++++++++------------------ src/network/handshake.rs | 8 +- src/network/host.rs | 47 ++++++++- src/network/mod.rs | 18 ++++ src/network/session.rs | 149 +++++++++++++++++++++++++-- src/rlp.rs | 2 +- 6 files changed, 316 insertions(+), 119 deletions(-) diff --git a/src/network/connection.rs b/src/network/connection.rs index db5cde10d..e3a935dde 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] //TODO: remove this after everything is done +use std::collections::VecDeque; use mio::{Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; use mio::tcp::*; use hash::*; @@ -23,15 +24,117 @@ pub struct Connection { pub socket: TcpStream, rec_buf: Bytes, rec_size: usize, - send_buf: Cursor, + send_queue: VecDeque>, interest: EventSet, } +#[derive(PartialEq, Eq)] pub enum WriteStatus { Ongoing, Complete } +impl Connection { + pub fn new(token: Token, socket: TcpStream) -> Connection { + Connection { + token: token, + socket: socket, + send_queue: VecDeque::new(), + rec_buf: Bytes::new(), + rec_size: 0, + interest: EventSet::hup(), + } + } + + pub fn expect(&mut self, size: usize) { + if self.rec_size != self.rec_buf.len() { + warn!(target:"net", "Unexpected connection read start"); + } + unsafe { self.rec_buf.set_len(0) } + self.rec_size = size; + } + + //TODO: return a slice + pub fn readable(&mut self) -> io::Result> { + if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { + warn!(target:"net", "Unexpected connection read"); + } + let max = self.rec_size - self.rec_buf.len(); + // resolve "multiple applicable items in scope [E0034]" error + let sock_ref = ::by_ref(&mut self.socket); + match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { + Ok(Some(_)) if self.rec_buf.len() == self.rec_size => { + self.rec_size = 0; + Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + }, + Ok(_) => Ok(None), + Err(e) => Err(e), + } + } + + pub fn send(&mut self, data: Bytes) { //TODO: take ownership version + if data.len() != 0 { + self.send_queue.push_back(Cursor::new(data)); + } + if !self.interest.is_writable() { + self.interest.insert(EventSet::writable()); + } + } + + pub fn writable(&mut self) -> io::Result { + if self.send_queue.is_empty() { + return Ok(WriteStatus::Complete) + } + { + let buf = self.send_queue.front_mut().unwrap(); + let send_size = buf.get_ref().len(); + if (buf.position() as usize) >= send_size { + warn!(target:"net", "Unexpected connection data"); + return Ok(WriteStatus::Complete) + } + match self.socket.try_write_buf(buf) { + Ok(_) if (buf.position() as usize) < send_size => { + self.interest.insert(EventSet::writable()); + Ok(WriteStatus::Ongoing) + }, + Ok(_) if (buf.position() as usize) == send_size => { + self.interest.remove(EventSet::writable()); + Ok(WriteStatus::Complete) + }, + Ok(_) => { panic!("Wrote past buffer");}, + Err(e) => Err(e) + } + }.and_then(|r| if r == WriteStatus::Complete { + self.send_queue.pop_front(); + Ok(r) + } + else { Ok(r) } + ) + } + + pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + trace!(target: "net", "connection register; token={:?}", self.token); + self.interest.insert(EventSet::readable()); + event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { + error!("Failed to reregister {:?}, {:?}", self.token, e); + Err(e) + }) + } + + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + trace!(target: "net", "connection reregister; token={:?}", self.token); + event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { + error!("Failed to reregister {:?}, {:?}", self.token, e); + Err(e) + }) + } +} + +pub struct Packet { + pub protocol: u16, + pub data: Bytes, +} + enum EncryptedConnectionState { Header, Payload, @@ -99,7 +202,7 @@ impl EncryptedConnection { }) } - pub fn write_packet(&mut self, payload: &[u8]) -> Result<(), Error> { + pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), Error> { let mut header = RlpStream::new(); let len = payload.len() as usize; header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); @@ -120,7 +223,7 @@ impl EncryptedConnection { } self.egress_mac.update(&packet[32..(32 + len + padding)]); self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]); - self.connection.send(&packet); + self.connection.send(packet); Ok(()) } @@ -153,7 +256,7 @@ impl EncryptedConnection { Ok(()) } - fn read_payload(&mut self, payload: &[u8]) -> Result { + fn read_payload(&mut self, payload: &[u8]) -> Result { let padding = (16 - (self.payload_len % 16)) % 16; let full_length = (self.payload_len + padding + 16) as usize; if payload.len() != full_length { @@ -170,10 +273,13 @@ impl EncryptedConnection { let mut packet = vec![0u8; self.payload_len as usize]; self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..(full_length - 16)]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); packet.resize(self.payload_len as usize, 0u8); - Ok(packet) + Ok(Packet { + protocol: self.protocol_id, + data: packet + }) } - pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, Error> { + pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, Error> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); try!(self.connection.reregister(event_loop)); match self.read_state { @@ -215,96 +321,3 @@ impl EncryptedConnection { } } -impl Connection { - pub fn new(token: Token, socket: TcpStream) -> Connection { - Connection { - token: token, - socket: socket, - send_buf: Cursor::new(Bytes::new()), - rec_buf: Bytes::new(), - rec_size: 0, - interest: EventSet::hup(), - } - } - - pub fn expect(&mut self, size: usize) { - if self.rec_size != self.rec_buf.len() { - warn!(target:"net", "Unexpected connection read start"); - } - unsafe { self.rec_buf.set_len(0) } - self.rec_size = size; - } - - //TODO: return a slice - pub fn readable(&mut self) -> io::Result> { - if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { - warn!(target:"net", "Unexpected connection read"); - } - let max = self.rec_size - self.rec_buf.len(); - // resolve "multiple applicable items in scope [E0034]" error - let sock_ref = ::by_ref(&mut self.socket); - match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { - Ok(Some(_)) if self.rec_buf.len() == self.rec_size => { - self.rec_size = 0; - Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) - }, - Ok(_) => Ok(None), - Err(e) => Err(e), - } - } - - pub fn send(&mut self, data: &[u8]) { //TODO: take ownership version - let send_size = self.send_buf.get_ref().len(); - if send_size != 0 || self.send_buf.position() as usize >= send_size { - warn!(target:"net", "Unexpected connection send start"); - } - if self.send_buf.get_ref().capacity() < data.len() { - let capacity = self.send_buf.get_ref().capacity(); - self.send_buf.get_mut().reserve(data.len() - capacity); - } - unsafe { self.send_buf.get_mut().set_len(data.len()) } - unsafe { ::std::ptr::copy_nonoverlapping(data.as_ptr(), self.send_buf.get_mut()[..].as_mut_ptr(), data.len()) }; - if !self.interest.is_writable() { - self.interest.insert(EventSet::writable()); - } - } - - pub fn writable(&mut self) -> io::Result { - let send_size = self.send_buf.get_ref().len(); - if (self.send_buf.position() as usize) >= send_size { - warn!(target:"net", "Unexpected connection data"); - return Ok(WriteStatus::Complete) - } - match self.socket.try_write_buf(&mut self.send_buf) { - Ok(_) if (self.send_buf.position() as usize) < send_size => { - self.interest.insert(EventSet::writable()); - Ok(WriteStatus::Ongoing) - }, - Ok(_) if (self.send_buf.position() as usize) == send_size => { - self.interest.remove(EventSet::writable()); - Ok(WriteStatus::Complete) - }, - Ok(_) => { panic!("Wrote past buffer");}, - Err(e) => Err(e) - } - } - - pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection register; token={:?}", self.token); - self.interest.insert(EventSet::readable()); - event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { - error!("Failed to reregister {:?}, {:?}", self.token, e); - Err(e) - }) - } - - pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection reregister; token={:?}", self.token); - event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { - error!("Failed to reregister {:?}, {:?}", self.token, e); - Err(e) - }) - } -} - - diff --git a/src/network/handshake.rs b/src/network/handshake.rs index 186d6b8a7..7d6c49ae3 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -180,8 +180,8 @@ impl Handshake { self.nonce.copy_to(nonce); } let message = try!(crypto::ecies::encrypt(&self.id, &data)); - self.connection.send(&message[..]); - self.auth_cipher = message; + self.auth_cipher = message.clone(); + self.connection.send(message); self.state = HandshakeState::WritingAuth; Ok(()) } @@ -198,8 +198,8 @@ impl Handshake { self.nonce.copy_to(nonce); } let message = try!(crypto::ecies::encrypt(&self.id, &data)); - self.connection.send(&message[..]); - self.ack_cipher = message; + self.ack_cipher = message.clone(); + self.connection.send(message); self.state = HandshakeState::WritingAck; Ok(()) } diff --git a/src/network/host.rs b/src/network/host.rs index cdb6dd132..f0465bc15 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -12,6 +12,7 @@ use mio::tcp::*; use mio::udp::*; use hash::*; use crypto::*; +use rlp::*; use time::Tm; use network::handshake::Handshake; use network::session::Session; @@ -55,6 +56,7 @@ impl NetworkConfiguration { #[derive(Debug)] struct NodeEndpoint { address: SocketAddr, + address_str: String, udp_port: u16 } @@ -62,6 +64,7 @@ impl NodeEndpoint { fn new(address: SocketAddr) -> NodeEndpoint { NodeEndpoint { address: address, + address_str: address.to_string(), udp_port: address.port() } } @@ -71,6 +74,7 @@ impl NodeEndpoint { match address { Ok(Some(a)) => Ok(NodeEndpoint { address: a, + address_str: s.to_string(), udp_port: a.port() }), Ok(_) => Err(Error::AddressResolve(None)), @@ -178,10 +182,38 @@ pub enum HostMessage { Shutdown } +#[derive(Debug, PartialEq, Eq)] +pub struct CapabilityInfo { + pub protocol: String, + pub version: u32, +} + +impl Encodable for CapabilityInfo { + fn encode(&self, encoder: &mut E) -> () where E: Encoder { + encoder.emit_list(|e| { + self.protocol.encode(e); + self.version.encode(e); + }); + } +} + +impl Decodable for CapabilityInfo { + fn decode_untrusted(rlp: &UntrustedRlp) -> Result { + Ok(CapabilityInfo { + protocol: try!(String::decode_untrusted(&try!(rlp.at(0)))), + version: try!(u32::decode_untrusted(&try!(rlp.at(1)))), + }) + } +} + pub struct HostInfo { keys: KeyPair, config: NetworkConfiguration, - nonce: H256 + nonce: H256, + pub protocol_version: u32, + pub client_version: String, + pub listen_port: u16, + pub capabilities: Vec } impl HostInfo { @@ -244,12 +276,17 @@ impl Host { let udp_socket = UdpSocket::bound(&addr).unwrap(); event_loop.register_opt(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); + let port = config.listen_address.port(); let mut host = Host { info: HostInfo { keys: KeyPair::create().unwrap(), config: config, - nonce: H256::random() + nonce: H256::random(), + protocol_version: 4, + client_version: "parity".to_string(), + listen_port: port, + capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], }, sender: sender, udp_socket: udp_socket, @@ -263,7 +300,6 @@ impl Host { idle_timeout: idle_timeout }; - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); @@ -429,7 +465,7 @@ impl Host { } fn have_session(&self, id: &NodeId) -> bool { - self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.id.eq(&id), _ => false }) + self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false }) } fn connecting_to(&self, id: &NodeId) -> bool { @@ -594,9 +630,10 @@ impl Host { } fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { + let info = &self.info; self.connections.replace_with(token, |c| { match c { - ConnectionEntry::Handshake(h) => Session::new(h, event_loop) + ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) .unwrap_or_else(|e| { debug!(target: "net", "Session construction error: {:?}", e); diff --git a/src/network/mod.rs b/src/network/mod.rs index d1f9940c4..6426d523a 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -4,6 +4,23 @@ mod connection; mod handshake; mod session; +#[derive(Debug, Copy, Clone)] +pub enum DisconnectReason +{ + DisconnectRequested, + TCPError, + BadProtocol, + UselessPeer, + TooManyPeers, + DuplicatePeer, + IncompatibleProtocol, + NullIdentity, + ClientQuit, + UnexpectedIdentity, + LocalIdentity, + PingTimeout, +} + #[derive(Debug)] pub enum Error { Crypto(::crypto::CryptoError), @@ -13,6 +30,7 @@ pub enum Error { AddressParse(::std::net::AddrParseError), AddressResolve(Option<::std::io::Error>), NodeIdParse(::error::EthcoreError), + Disconnect(DisconnectReason) } impl From<::std::io::Error> for Error { diff --git a/src/network/session.rs b/src/network/session.rs index 71554b0ca..1ed2449b5 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -1,33 +1,162 @@ #![allow(dead_code)] //TODO: remove this after everything is done -//TODO: remove all unwraps +//TODO: hello packet timeout use mio::*; use hash::*; -use network::connection::{EncryptedConnection}; +use rlp::*; +use network::connection::{EncryptedConnection, Packet}; use network::handshake::Handshake; -use network::Error; +use network::{Error, DisconnectReason}; use network::host::*; pub struct Session { - pub id: NodeId, + pub info: SessionInfo, connection: EncryptedConnection, + had_hello: bool, } +pub struct SessionInfo { + pub id: NodeId, + pub client_version: String, + pub protocol_version: u32, + pub capabilities: Vec, +} + +const PACKET_HELLO: u8 = 0x80; +const PACKET_DISCONNECT: u8 = 0x01; +const PACKET_PING: u8 = 0x02; +const PACKET_PONG: u8 = 0x03; +const PACKET_GET_PEERS: u8 = 0x04; +const PACKET_PEERS: u8 = 0x05; +const PACKET_USER: u8 = 0x10; +const PACKET_LAST: u8 = 0x7f; + impl Session { - pub fn new(h: Handshake, event_loop: &mut EventLoop) -> Result { + pub fn new(h: Handshake, event_loop: &mut EventLoop, host: &HostInfo) -> Result { let id = h.id.clone(); let mut connection = try!(EncryptedConnection::new(h)); try!(connection.register(event_loop)); - Ok(Session { - id: id, + let mut session = Session { connection: connection, - }) + had_hello: false, + info: SessionInfo { + id: id, + client_version: String::new(), + protocol_version: 0, + capabilities: Vec::new(), + }, + }; + try!(session.write_hello(host)); + try!(session.write_ping()); + Ok(session) } - pub fn readable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { - try!(self.connection.readable(event_loop)); + + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), Error> { + match try!(self.connection.readable(event_loop)) { + Some(data) => { + try!(self.read_packet(data, host)); + }, + None => {} + }; Ok(()) } + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { self.connection.writable(event_loop) } + + pub fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<(), Error> { + let data = &packet.data; + if data.len() < 2 { + return Err(Error::BadProtocol); + } + let packet_id = data[0]; + let rlp = UntrustedRlp::new(&data[1..]); //TODO: validate rlp expected size + if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello { + return Err(Error::BadProtocol); + } + match packet_id { + PACKET_HELLO => self.read_hello(&rlp, host), + PACKET_DISCONNECT => Err(Error::Disconnect(DisconnectReason::DisconnectRequested)), + PACKET_PING => self.write_pong(), + PACKET_GET_PEERS => Ok(()), //TODO; + PACKET_PEERS => Ok(()), + PACKET_USER ... PACKET_LAST => { + warn!(target: "net", "User packet: {:?}", rlp); + Ok(()) + }, + _ => { + debug!(target: "net", "Unkown packet: {:?}", rlp); + Ok(()) + } + } + } + + fn write_hello(&mut self, host: &HostInfo) -> Result<(), Error> { + let mut rlp = RlpStream::new(); + rlp.append(&(PACKET_HELLO as u32)); + rlp.append_list(5) + .append(&host.protocol_version) + .append(&host.client_version) + .append(&host.capabilities) + .append(&host.listen_port) + .append(host.id()); + self.connection.send_packet(&rlp.out()) + } + + fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error> { + let protocol = try!(u32::decode_untrusted(&try!(rlp.at(0)))); + let client_version = try!(String::decode_untrusted(&try!(rlp.at(0)))); + let mut caps: Vec = try!(Decodable::decode_untrusted(&try!(rlp.at(2)))); + let id = try!(NodeId::decode_untrusted(&try!(rlp.at(4)))); + + // Intersect with host capabilities + // Leave only highset mutually supported capability version + caps.retain(|c| host.capabilities.contains(&c)); + let mut i = 0; + while i < caps.len() { + if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) { + caps.remove(i); + } + else { + i += 1; + } + } + + trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + if protocol != host.protocol_version { + return Err(self.disconnect(DisconnectReason::UselessPeer)); + } + self.had_hello = true; + Ok(()) + } + + fn write_ping(&mut self) -> Result<(), Error> { + self.send(try!(Session::prepare(PACKET_PING, 0))) + } + + fn write_pong(&mut self) -> Result<(), Error> { + self.send(try!(Session::prepare(PACKET_PONG, 0))) + } + + + fn disconnect(&mut self, reason: DisconnectReason) -> Error { + let mut rlp = RlpStream::new(); + rlp.append(&(PACKET_DISCONNECT as u32)); + rlp.append_list(1); + rlp.append(&(reason.clone() as u32)); + self.connection.send_packet(&rlp.out()).ok(); + Error::Disconnect(reason) + } + + fn prepare(packet_id: u8, items: usize) -> Result { + let mut rlp = RlpStream::new_list(1); + rlp.append(&(packet_id as u32)); + rlp.append_list(items); + Ok(rlp) + } + + fn send(&mut self, rlp: RlpStream) -> Result<(), Error> { + self.connection.send_packet(&rlp.out()) + } } diff --git a/src/rlp.rs b/src/rlp.rs index c023b1dfb..cb3fa378c 100644 --- a/src/rlp.rs +++ b/src/rlp.rs @@ -725,7 +725,7 @@ pub trait Decoder { fn read_value(bytes: &[u8], f: F) -> Result where F: FnOnce(&[u8]) -> Result; } -struct BasicDecoder; +pub struct BasicDecoder; impl Decoder for BasicDecoder { fn read_value(bytes: &[u8], f: F) -> Result where F: FnOnce(&[u8]) -> Result {