This commit is contained in:
arkpar 2015-12-02 20:11:13 +01:00
parent 10a43c1fed
commit 75b1de0482
6 changed files with 316 additions and 119 deletions

View File

@ -1,4 +1,5 @@
#![allow(dead_code)] //TODO: remove this after everything is done
use std::collections::VecDeque;
use mio::{Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite};
use mio::tcp::*;
use hash::*;
@ -23,15 +24,117 @@ pub struct Connection {
pub socket: TcpStream,
rec_buf: Bytes,
rec_size: usize,
send_buf: Cursor<Bytes>,
send_queue: VecDeque<Cursor<Bytes>>,
interest: EventSet,
}
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
Ongoing,
Complete
}
impl Connection {
pub fn new(token: Token, socket: TcpStream) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup(),
}
}
pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start");
}
unsafe { self.rec_buf.set_len(0) }
self.rec_size = size;
}
//TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
warn!(target:"net", "Unexpected connection read");
}
let max = self.rec_size - self.rec_buf.len();
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(_)) if self.rec_buf.len() == self.rec_size => {
self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
},
Ok(_) => Ok(None),
Err(e) => Err(e),
}
}
pub fn send(&mut self, data: Bytes) { //TODO: take ownership version
if data.len() != 0 {
self.send_queue.push_back(Cursor::new(data));
}
if !self.interest.is_writable() {
self.interest.insert(EventSet::writable());
}
}
pub fn writable(&mut self) -> io::Result<WriteStatus> {
if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete)
}
{
let buf = self.send_queue.front_mut().unwrap();
let send_size = buf.get_ref().len();
if (buf.position() as usize) >= send_size {
warn!(target:"net", "Unexpected connection data");
return Ok(WriteStatus::Complete)
}
match self.socket.try_write_buf(buf) {
Ok(_) if (buf.position() as usize) < send_size => {
self.interest.insert(EventSet::writable());
Ok(WriteStatus::Ongoing)
},
Ok(_) if (buf.position() as usize) == send_size => {
self.interest.remove(EventSet::writable());
Ok(WriteStatus::Complete)
},
Ok(_) => { panic!("Wrote past buffer");},
Err(e) => Err(e)
}
}.and_then(|r| if r == WriteStatus::Complete {
self.send_queue.pop_front();
Ok(r)
}
else { Ok(r) }
)
}
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", self.token);
self.interest.insert(EventSet::readable());
event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", self.token);
event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}
}
pub struct Packet {
pub protocol: u16,
pub data: Bytes,
}
enum EncryptedConnectionState {
Header,
Payload,
@ -99,7 +202,7 @@ impl EncryptedConnection {
})
}
pub fn write_packet(&mut self, payload: &[u8]) -> Result<(), Error> {
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), Error> {
let mut header = RlpStream::new();
let len = payload.len() as usize;
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
@ -120,7 +223,7 @@ impl EncryptedConnection {
}
self.egress_mac.update(&packet[32..(32 + len + padding)]);
self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]);
self.connection.send(&packet);
self.connection.send(packet);
Ok(())
}
@ -153,7 +256,7 @@ impl EncryptedConnection {
Ok(())
}
fn read_payload(&mut self, payload: &[u8]) -> Result<Bytes, Error> {
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, Error> {
let padding = (16 - (self.payload_len % 16)) % 16;
let full_length = (self.payload_len + padding + 16) as usize;
if payload.len() != full_length {
@ -170,10 +273,13 @@ impl EncryptedConnection {
let mut packet = vec![0u8; self.payload_len as usize];
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..(full_length - 16)]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
packet.resize(self.payload_len as usize, 0u8);
Ok(packet)
Ok(Packet {
protocol: self.protocol_id,
data: packet
})
}
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Bytes>, Error> {
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, Error> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.reregister(event_loop));
match self.read_state {
@ -215,96 +321,3 @@ impl EncryptedConnection {
}
}
impl Connection {
pub fn new(token: Token, socket: TcpStream) -> Connection {
Connection {
token: token,
socket: socket,
send_buf: Cursor::new(Bytes::new()),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup(),
}
}
pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start");
}
unsafe { self.rec_buf.set_len(0) }
self.rec_size = size;
}
//TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
warn!(target:"net", "Unexpected connection read");
}
let max = self.rec_size - self.rec_buf.len();
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(_)) if self.rec_buf.len() == self.rec_size => {
self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
},
Ok(_) => Ok(None),
Err(e) => Err(e),
}
}
pub fn send(&mut self, data: &[u8]) { //TODO: take ownership version
let send_size = self.send_buf.get_ref().len();
if send_size != 0 || self.send_buf.position() as usize >= send_size {
warn!(target:"net", "Unexpected connection send start");
}
if self.send_buf.get_ref().capacity() < data.len() {
let capacity = self.send_buf.get_ref().capacity();
self.send_buf.get_mut().reserve(data.len() - capacity);
}
unsafe { self.send_buf.get_mut().set_len(data.len()) }
unsafe { ::std::ptr::copy_nonoverlapping(data.as_ptr(), self.send_buf.get_mut()[..].as_mut_ptr(), data.len()) };
if !self.interest.is_writable() {
self.interest.insert(EventSet::writable());
}
}
pub fn writable(&mut self) -> io::Result<WriteStatus> {
let send_size = self.send_buf.get_ref().len();
if (self.send_buf.position() as usize) >= send_size {
warn!(target:"net", "Unexpected connection data");
return Ok(WriteStatus::Complete)
}
match self.socket.try_write_buf(&mut self.send_buf) {
Ok(_) if (self.send_buf.position() as usize) < send_size => {
self.interest.insert(EventSet::writable());
Ok(WriteStatus::Ongoing)
},
Ok(_) if (self.send_buf.position() as usize) == send_size => {
self.interest.remove(EventSet::writable());
Ok(WriteStatus::Complete)
},
Ok(_) => { panic!("Wrote past buffer");},
Err(e) => Err(e)
}
}
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", self.token);
self.interest.insert(EventSet::readable());
event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", self.token);
event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}
}

View File

@ -180,8 +180,8 @@ impl Handshake {
self.nonce.copy_to(nonce);
}
let message = try!(crypto::ecies::encrypt(&self.id, &data));
self.connection.send(&message[..]);
self.auth_cipher = message;
self.auth_cipher = message.clone();
self.connection.send(message);
self.state = HandshakeState::WritingAuth;
Ok(())
}
@ -198,8 +198,8 @@ impl Handshake {
self.nonce.copy_to(nonce);
}
let message = try!(crypto::ecies::encrypt(&self.id, &data));
self.connection.send(&message[..]);
self.ack_cipher = message;
self.ack_cipher = message.clone();
self.connection.send(message);
self.state = HandshakeState::WritingAck;
Ok(())
}

View File

@ -12,6 +12,7 @@ use mio::tcp::*;
use mio::udp::*;
use hash::*;
use crypto::*;
use rlp::*;
use time::Tm;
use network::handshake::Handshake;
use network::session::Session;
@ -55,6 +56,7 @@ impl NetworkConfiguration {
#[derive(Debug)]
struct NodeEndpoint {
address: SocketAddr,
address_str: String,
udp_port: u16
}
@ -62,6 +64,7 @@ impl NodeEndpoint {
fn new(address: SocketAddr) -> NodeEndpoint {
NodeEndpoint {
address: address,
address_str: address.to_string(),
udp_port: address.port()
}
}
@ -71,6 +74,7 @@ impl NodeEndpoint {
match address {
Ok(Some(a)) => Ok(NodeEndpoint {
address: a,
address_str: s.to_string(),
udp_port: a.port()
}),
Ok(_) => Err(Error::AddressResolve(None)),
@ -178,10 +182,38 @@ pub enum HostMessage {
Shutdown
}
#[derive(Debug, PartialEq, Eq)]
pub struct CapabilityInfo {
pub protocol: String,
pub version: u32,
}
impl Encodable for CapabilityInfo {
fn encode<E>(&self, encoder: &mut E) -> () where E: Encoder {
encoder.emit_list(|e| {
self.protocol.encode(e);
self.version.encode(e);
});
}
}
impl Decodable for CapabilityInfo {
fn decode_untrusted(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
Ok(CapabilityInfo {
protocol: try!(String::decode_untrusted(&try!(rlp.at(0)))),
version: try!(u32::decode_untrusted(&try!(rlp.at(1)))),
})
}
}
pub struct HostInfo {
keys: KeyPair,
config: NetworkConfiguration,
nonce: H256
nonce: H256,
pub protocol_version: u32,
pub client_version: String,
pub listen_port: u16,
pub capabilities: Vec<CapabilityInfo>
}
impl HostInfo {
@ -244,12 +276,17 @@ impl Host {
let udp_socket = UdpSocket::bound(&addr).unwrap();
event_loop.register_opt(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap();
event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap();
let port = config.listen_address.port();
let mut host = Host {
info: HostInfo {
keys: KeyPair::create().unwrap(),
config: config,
nonce: H256::random()
nonce: H256::random(),
protocol_version: 4,
client_version: "parity".to_string(),
listen_port: port,
capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }],
},
sender: sender,
udp_socket: udp_socket,
@ -263,7 +300,6 @@ impl Host {
idle_timeout: idle_timeout
};
host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300");
host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303");
host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303");
@ -429,7 +465,7 @@ impl Host {
}
fn have_session(&self, id: &NodeId) -> bool {
self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.id.eq(&id), _ => false })
self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false })
}
fn connecting_to(&self, id: &NodeId) -> bool {
@ -594,9 +630,10 @@ impl Host {
}
fn start_session(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
let info = &self.info;
self.connections.replace_with(token, |c| {
match c {
ConnectionEntry::Handshake(h) => Session::new(h, event_loop)
ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info)
.map(|s| Some(ConnectionEntry::Session(s)))
.unwrap_or_else(|e| {
debug!(target: "net", "Session construction error: {:?}", e);

View File

@ -4,6 +4,23 @@ mod connection;
mod handshake;
mod session;
#[derive(Debug, Copy, Clone)]
pub enum DisconnectReason
{
DisconnectRequested,
TCPError,
BadProtocol,
UselessPeer,
TooManyPeers,
DuplicatePeer,
IncompatibleProtocol,
NullIdentity,
ClientQuit,
UnexpectedIdentity,
LocalIdentity,
PingTimeout,
}
#[derive(Debug)]
pub enum Error {
Crypto(::crypto::CryptoError),
@ -13,6 +30,7 @@ pub enum Error {
AddressParse(::std::net::AddrParseError),
AddressResolve(Option<::std::io::Error>),
NodeIdParse(::error::EthcoreError),
Disconnect(DisconnectReason)
}
impl From<::std::io::Error> for Error {

View File

@ -1,33 +1,162 @@
#![allow(dead_code)] //TODO: remove this after everything is done
//TODO: remove all unwraps
//TODO: hello packet timeout
use mio::*;
use hash::*;
use network::connection::{EncryptedConnection};
use rlp::*;
use network::connection::{EncryptedConnection, Packet};
use network::handshake::Handshake;
use network::Error;
use network::{Error, DisconnectReason};
use network::host::*;
pub struct Session {
pub id: NodeId,
pub info: SessionInfo,
connection: EncryptedConnection,
had_hello: bool,
}
pub struct SessionInfo {
pub id: NodeId,
pub client_version: String,
pub protocol_version: u32,
pub capabilities: Vec<CapabilityInfo>,
}
const PACKET_HELLO: u8 = 0x80;
const PACKET_DISCONNECT: u8 = 0x01;
const PACKET_PING: u8 = 0x02;
const PACKET_PONG: u8 = 0x03;
const PACKET_GET_PEERS: u8 = 0x04;
const PACKET_PEERS: u8 = 0x05;
const PACKET_USER: u8 = 0x10;
const PACKET_LAST: u8 = 0x7f;
impl Session {
pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>) -> Result<Session, Error> {
pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, Error> {
let id = h.id.clone();
let mut connection = try!(EncryptedConnection::new(h));
try!(connection.register(event_loop));
Ok(Session {
id: id,
let mut session = Session {
connection: connection,
})
had_hello: false,
info: SessionInfo {
id: id,
client_version: String::new(),
protocol_version: 0,
capabilities: Vec::new(),
},
};
try!(session.write_hello(host));
try!(session.write_ping());
Ok(session)
}
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), Error> {
try!(self.connection.readable(event_loop));
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), Error> {
match try!(self.connection.readable(event_loop)) {
Some(data) => {
try!(self.read_packet(data, host));
},
None => {}
};
Ok(())
}
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), Error> {
self.connection.writable(event_loop)
}
pub fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<(), Error> {
let data = &packet.data;
if data.len() < 2 {
return Err(Error::BadProtocol);
}
let packet_id = data[0];
let rlp = UntrustedRlp::new(&data[1..]); //TODO: validate rlp expected size
if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello {
return Err(Error::BadProtocol);
}
match packet_id {
PACKET_HELLO => self.read_hello(&rlp, host),
PACKET_DISCONNECT => Err(Error::Disconnect(DisconnectReason::DisconnectRequested)),
PACKET_PING => self.write_pong(),
PACKET_GET_PEERS => Ok(()), //TODO;
PACKET_PEERS => Ok(()),
PACKET_USER ... PACKET_LAST => {
warn!(target: "net", "User packet: {:?}", rlp);
Ok(())
},
_ => {
debug!(target: "net", "Unkown packet: {:?}", rlp);
Ok(())
}
}
}
fn write_hello(&mut self, host: &HostInfo) -> Result<(), Error> {
let mut rlp = RlpStream::new();
rlp.append(&(PACKET_HELLO as u32));
rlp.append_list(5)
.append(&host.protocol_version)
.append(&host.client_version)
.append(&host.capabilities)
.append(&host.listen_port)
.append(host.id());
self.connection.send_packet(&rlp.out())
}
fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error> {
let protocol = try!(u32::decode_untrusted(&try!(rlp.at(0))));
let client_version = try!(String::decode_untrusted(&try!(rlp.at(0))));
let mut caps: Vec<CapabilityInfo> = try!(Decodable::decode_untrusted(&try!(rlp.at(2))));
let id = try!(NodeId::decode_untrusted(&try!(rlp.at(4))));
// Intersect with host capabilities
// Leave only highset mutually supported capability version
caps.retain(|c| host.capabilities.contains(&c));
let mut i = 0;
while i < caps.len() {
if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) {
caps.remove(i);
}
else {
i += 1;
}
}
trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
if protocol != host.protocol_version {
return Err(self.disconnect(DisconnectReason::UselessPeer));
}
self.had_hello = true;
Ok(())
}
fn write_ping(&mut self) -> Result<(), Error> {
self.send(try!(Session::prepare(PACKET_PING, 0)))
}
fn write_pong(&mut self) -> Result<(), Error> {
self.send(try!(Session::prepare(PACKET_PONG, 0)))
}
fn disconnect(&mut self, reason: DisconnectReason) -> Error {
let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32));
rlp.append_list(1);
rlp.append(&(reason.clone() as u32));
self.connection.send_packet(&rlp.out()).ok();
Error::Disconnect(reason)
}
fn prepare(packet_id: u8, items: usize) -> Result<RlpStream, Error> {
let mut rlp = RlpStream::new_list(1);
rlp.append(&(packet_id as u32));
rlp.append_list(items);
Ok(rlp)
}
fn send(&mut self, rlp: RlpStream) -> Result<(), Error> {
self.connection.send_packet(&rlp.out())
}
}

View File

@ -725,7 +725,7 @@ pub trait Decoder {
fn read_value<T, F>(bytes: &[u8], f: F) -> Result<T, DecoderError> where F: FnOnce(&[u8]) -> Result<T, DecoderError>;
}
struct BasicDecoder;
pub struct BasicDecoder;
impl Decoder for BasicDecoder {
fn read_value<T, F>(bytes: &[u8], f: F) -> Result<T, DecoderError> where F: FnOnce(&[u8]) -> Result<T, DecoderError> {