|
|
|
|
@@ -6,7 +6,7 @@ use hash::*;
|
|
|
|
|
use sha3::*;
|
|
|
|
|
use bytes::*;
|
|
|
|
|
use rlp::*;
|
|
|
|
|
use std::io::{self, Cursor, Read};
|
|
|
|
|
use std::io::{self, Cursor, Read, Write};
|
|
|
|
|
use error::*;
|
|
|
|
|
use io::{IoContext, StreamToken};
|
|
|
|
|
use network::error::NetworkError;
|
|
|
|
|
@@ -22,12 +22,17 @@ use tiny_keccak::Keccak;
|
|
|
|
|
const ENCRYPTED_HEADER_LEN: usize = 32;
|
|
|
|
|
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;
|
|
|
|
|
|
|
|
|
|
/// Low level tcp connection
|
|
|
|
|
pub struct Connection {
|
|
|
|
|
pub trait GenericSocket : Read + Write {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl GenericSocket for TcpStream {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct GenericConnection<Socket: GenericSocket> {
|
|
|
|
|
/// Connection id (token)
|
|
|
|
|
pub token: StreamToken,
|
|
|
|
|
/// Network socket
|
|
|
|
|
pub socket: TcpStream,
|
|
|
|
|
pub socket: Socket,
|
|
|
|
|
/// Receive buffer
|
|
|
|
|
rec_buf: Bytes,
|
|
|
|
|
/// Expected size
|
|
|
|
|
@@ -36,34 +41,11 @@ pub struct Connection {
|
|
|
|
|
send_queue: VecDeque<Cursor<Bytes>>,
|
|
|
|
|
/// Event flags this connection expects
|
|
|
|
|
interest: EventSet,
|
|
|
|
|
/// Shared network staistics
|
|
|
|
|
/// Shared network statistics
|
|
|
|
|
stats: Arc<NetworkStats>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Connection write status.
|
|
|
|
|
#[derive(PartialEq, Eq)]
|
|
|
|
|
pub enum WriteStatus {
|
|
|
|
|
/// Some data is still pending for current packet
|
|
|
|
|
Ongoing,
|
|
|
|
|
/// All data sent.
|
|
|
|
|
Complete
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Connection {
|
|
|
|
|
/// Create a new connection with given id and socket.
|
|
|
|
|
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
|
|
|
|
|
Connection {
|
|
|
|
|
token: token,
|
|
|
|
|
socket: socket,
|
|
|
|
|
send_queue: VecDeque::new(),
|
|
|
|
|
rec_buf: Bytes::new(),
|
|
|
|
|
rec_size: 0,
|
|
|
|
|
interest: EventSet::hup() | EventSet::readable(),
|
|
|
|
|
stats: stats,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Put a connection into read mode. Receiving up `size` bytes of data.
|
|
|
|
|
impl<Socket: GenericSocket> GenericConnection<Socket> {
|
|
|
|
|
pub fn expect(&mut self, size: usize) {
|
|
|
|
|
if self.rec_size != self.rec_buf.len() {
|
|
|
|
|
warn!(target:"net", "Unexpected connection read start");
|
|
|
|
|
@@ -79,7 +61,7 @@ impl Connection {
|
|
|
|
|
}
|
|
|
|
|
let max = self.rec_size - self.rec_buf.len();
|
|
|
|
|
// resolve "multiple applicable items in scope [E0034]" error
|
|
|
|
|
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
|
|
|
|
|
let sock_ref = <Socket as Read>::by_ref(&mut self.socket);
|
|
|
|
|
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
|
|
|
|
|
Ok(Some(size)) if size != 0 => {
|
|
|
|
|
self.stats.inc_recv(size);
|
|
|
|
|
@@ -142,6 +124,24 @@ impl Connection {
|
|
|
|
|
Ok(r)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Low level tcp connection
|
|
|
|
|
pub type Connection = GenericConnection<TcpStream>;
|
|
|
|
|
|
|
|
|
|
impl Connection {
|
|
|
|
|
/// Create a new connection with given id and socket.
|
|
|
|
|
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
|
|
|
|
|
Connection {
|
|
|
|
|
token: token,
|
|
|
|
|
socket: socket,
|
|
|
|
|
send_queue: VecDeque::new(),
|
|
|
|
|
rec_buf: Bytes::new(),
|
|
|
|
|
rec_size: 0,
|
|
|
|
|
interest: EventSet::hup() | EventSet::readable(),
|
|
|
|
|
stats: stats,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Register this connection with the IO event loop.
|
|
|
|
|
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
|
|
|
|
@@ -169,6 +169,15 @@ impl Connection {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Connection write status.
|
|
|
|
|
#[derive(PartialEq, Eq)]
|
|
|
|
|
pub enum WriteStatus {
|
|
|
|
|
/// Some data is still pending for current packet
|
|
|
|
|
Ongoing,
|
|
|
|
|
/// All data sent.
|
|
|
|
|
Complete
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// RLPx packet
|
|
|
|
|
pub struct Packet {
|
|
|
|
|
pub protocol: u16,
|
|
|
|
|
@@ -424,4 +433,230 @@ pub fn test_encryption() {
|
|
|
|
|
assert_eq!(got, after2);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use std::sync::*;
|
|
|
|
|
use super::super::stats::*;
|
|
|
|
|
use std::io::{Read, Write, Error, Cursor, ErrorKind};
|
|
|
|
|
use std::cmp;
|
|
|
|
|
use mio::{EventSet};
|
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
|
use bytes::*;
|
|
|
|
|
|
|
|
|
|
struct TestSocket {
|
|
|
|
|
read_buffer: Vec<u8>,
|
|
|
|
|
write_buffer: Vec<u8>,
|
|
|
|
|
cursor: usize,
|
|
|
|
|
buf_size: usize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl TestSocket {
|
|
|
|
|
fn new() -> TestSocket {
|
|
|
|
|
TestSocket {
|
|
|
|
|
read_buffer: vec![],
|
|
|
|
|
write_buffer: vec![],
|
|
|
|
|
cursor: 0,
|
|
|
|
|
buf_size: 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn new_buf(buf_size: usize) -> TestSocket {
|
|
|
|
|
TestSocket {
|
|
|
|
|
read_buffer: vec![],
|
|
|
|
|
write_buffer: vec![],
|
|
|
|
|
cursor: 0,
|
|
|
|
|
buf_size: buf_size,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Read for TestSocket {
|
|
|
|
|
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
|
|
|
|
let end_position = cmp::min(self.read_buffer.len(), self.cursor+buf.len());
|
|
|
|
|
let len = cmp::max(end_position - self.cursor, 0);
|
|
|
|
|
match len {
|
|
|
|
|
0 => Ok(0),
|
|
|
|
|
_ => {
|
|
|
|
|
for i in self.cursor..end_position {
|
|
|
|
|
buf[i-self.cursor] = self.read_buffer[i];
|
|
|
|
|
}
|
|
|
|
|
self.cursor = self.cursor + buf.len();
|
|
|
|
|
Ok(len)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Write for TestSocket {
|
|
|
|
|
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
|
|
|
|
|
if self.buf_size == 0 || buf.len() < self.buf_size {
|
|
|
|
|
self.write_buffer.extend(buf.iter().cloned());
|
|
|
|
|
Ok(buf.len())
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
self.write_buffer.extend(buf.iter().take(self.buf_size).cloned());
|
|
|
|
|
Ok(self.buf_size)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn flush(&mut self) -> Result<(), Error> {
|
|
|
|
|
unimplemented!();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl GenericSocket for TestSocket {}
|
|
|
|
|
|
|
|
|
|
struct TestBrokenSocket {
|
|
|
|
|
error: String
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Read for TestBrokenSocket {
|
|
|
|
|
fn read(&mut self, _: &mut [u8]) -> Result<usize, Error> {
|
|
|
|
|
Err(Error::new(ErrorKind::Other, self.error.clone()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Write for TestBrokenSocket {
|
|
|
|
|
fn write(&mut self, _: &[u8]) -> Result<usize, Error> {
|
|
|
|
|
Err(Error::new(ErrorKind::Other, self.error.clone()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn flush(&mut self) -> Result<(), Error> {
|
|
|
|
|
unimplemented!();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl GenericSocket for TestBrokenSocket {}
|
|
|
|
|
|
|
|
|
|
type TestConnection = GenericConnection<TestSocket>;
|
|
|
|
|
|
|
|
|
|
impl TestConnection {
|
|
|
|
|
pub fn new() -> TestConnection {
|
|
|
|
|
TestConnection {
|
|
|
|
|
token: 999998888usize,
|
|
|
|
|
socket: TestSocket::new(),
|
|
|
|
|
send_queue: VecDeque::new(),
|
|
|
|
|
rec_buf: Bytes::new(),
|
|
|
|
|
rec_size: 0,
|
|
|
|
|
interest: EventSet::hup() | EventSet::readable(),
|
|
|
|
|
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TestBrokenConnection = GenericConnection<TestBrokenSocket>;
|
|
|
|
|
|
|
|
|
|
impl TestBrokenConnection {
|
|
|
|
|
pub fn new() -> TestBrokenConnection {
|
|
|
|
|
TestBrokenConnection {
|
|
|
|
|
token: 999998888usize,
|
|
|
|
|
socket: TestBrokenSocket { error: "test broken socket".to_owned() },
|
|
|
|
|
send_queue: VecDeque::new(),
|
|
|
|
|
rec_buf: Bytes::new(),
|
|
|
|
|
rec_size: 0,
|
|
|
|
|
interest: EventSet::hup() | EventSet::readable(),
|
|
|
|
|
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_expect() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
connection.expect(1024);
|
|
|
|
|
assert_eq!(1024, connection.rec_size);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_write_empty() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
let status = connection.writable();
|
|
|
|
|
assert!(status.is_ok());
|
|
|
|
|
assert!(WriteStatus::Complete == status.unwrap());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_write() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
let data = Cursor::new(vec![0; 10240]);
|
|
|
|
|
connection.send_queue.push_back(data);
|
|
|
|
|
|
|
|
|
|
let status = connection.writable();
|
|
|
|
|
assert!(status.is_ok());
|
|
|
|
|
assert!(WriteStatus::Complete == status.unwrap());
|
|
|
|
|
assert_eq!(10240, connection.socket.write_buffer.len());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_write_is_buffered() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
connection.socket = TestSocket::new_buf(1024);
|
|
|
|
|
let data = Cursor::new(vec![0; 10240]);
|
|
|
|
|
connection.send_queue.push_back(data);
|
|
|
|
|
|
|
|
|
|
let status = connection.writable();
|
|
|
|
|
|
|
|
|
|
assert!(status.is_ok());
|
|
|
|
|
assert!(WriteStatus::Ongoing == status.unwrap());
|
|
|
|
|
assert_eq!(1024, connection.socket.write_buffer.len());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_write_to_broken() {
|
|
|
|
|
let mut connection = TestBrokenConnection::new();
|
|
|
|
|
let data = Cursor::new(vec![0; 10240]);
|
|
|
|
|
connection.send_queue.push_back(data);
|
|
|
|
|
|
|
|
|
|
let status = connection.writable();
|
|
|
|
|
|
|
|
|
|
assert!(!status.is_ok());
|
|
|
|
|
assert_eq!(1, connection.send_queue.len());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_read() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
connection.rec_size = 2048;
|
|
|
|
|
connection.rec_buf = vec![10; 1024];
|
|
|
|
|
connection.socket.read_buffer = vec![99; 2048];
|
|
|
|
|
|
|
|
|
|
let status = connection.readable();
|
|
|
|
|
|
|
|
|
|
assert!(status.is_ok());
|
|
|
|
|
assert_eq!(1024, connection.socket.cursor);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_read_from_broken() {
|
|
|
|
|
let mut connection = TestBrokenConnection::new();
|
|
|
|
|
connection.rec_size = 2048;
|
|
|
|
|
|
|
|
|
|
let status = connection.readable();
|
|
|
|
|
assert!(!status.is_ok());
|
|
|
|
|
assert_eq!(0, connection.rec_buf.len());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_read_nothing() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
connection.rec_size = 2048;
|
|
|
|
|
|
|
|
|
|
let status = connection.readable();
|
|
|
|
|
|
|
|
|
|
assert!(status.is_ok());
|
|
|
|
|
assert_eq!(0, connection.rec_buf.len());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn connection_read_full() {
|
|
|
|
|
let mut connection = TestConnection::new();
|
|
|
|
|
connection.rec_size = 1024;
|
|
|
|
|
connection.rec_buf = vec![76;1024];
|
|
|
|
|
|
|
|
|
|
let status = connection.readable();
|
|
|
|
|
|
|
|
|
|
assert!(status.is_ok());
|
|
|
|
|
assert_eq!(0, connection.socket.cursor);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|