devp2p snappy compression (#6683)

This commit is contained in:
Arkadiy Paronyan 2017-10-19 14:41:11 +02:00 committed by Gav Wood
parent fdbf6bf7d6
commit b4c4fddb10
7 changed files with 76 additions and 41 deletions

1
Cargo.lock generated
View File

@ -700,6 +700,7 @@ dependencies = [
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"snappy 0.1.0",
"time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
"tiny-keccak 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
] ]

View File

@ -33,6 +33,7 @@ path = { path = "../path" }
ethcore-logger = { path ="../../logger" } ethcore-logger = { path ="../../logger" }
ipnetwork = "0.12.6" ipnetwork = "0.12.6"
hash = { path = "../hash" } hash = { path = "../hash" }
snappy = { path = "../snappy" }
serde_json = "1.0" serde_json = "1.0"
[features] [features]

View File

@ -40,6 +40,7 @@ use crypto;
const ENCRYPTED_HEADER_LEN: usize = 32; const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000; const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;
pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1;
pub trait GenericSocket : Read + Write { pub trait GenericSocket : Read + Write {
} }
@ -345,7 +346,7 @@ impl EncryptedConnection {
ingress_mac: ingress_mac, ingress_mac: ingress_mac,
read_state: EncryptedConnectionState::Header, read_state: EncryptedConnectionState::Header,
protocol_id: 0, protocol_id: 0,
payload_len: 0 payload_len: 0,
}; };
enc.connection.expect(ENCRYPTED_HEADER_LEN); enc.connection.expect(ENCRYPTED_HEADER_LEN);
Ok(enc) Ok(enc)
@ -355,7 +356,7 @@ impl EncryptedConnection {
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new(); let mut header = RlpStream::new();
let len = payload.len(); let len = payload.len();
if len >= (1 << 24) { if len > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket); return Err(NetworkError::OversizedPacket);
} }
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);

View File

@ -19,6 +19,7 @@ use rlp::*;
use std::fmt; use std::fmt;
use ethkey::Error as KeyError; use ethkey::Error as KeyError;
use crypto::Error as CryptoError; use crypto::Error as CryptoError;
use snappy;
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DisconnectReason pub enum DisconnectReason
@ -107,6 +108,8 @@ pub enum NetworkError {
StdIo(::std::io::Error), StdIo(::std::io::Error),
/// Packet size is over the protocol limit. /// Packet size is over the protocol limit.
OversizedPacket, OversizedPacket,
/// Decompression error.
Decompression(snappy::InvalidInput),
} }
impl fmt::Display for NetworkError { impl fmt::Display for NetworkError {
@ -126,6 +129,7 @@ impl fmt::Display for NetworkError {
StdIo(ref err) => format!("{}", err), StdIo(ref err) => format!("{}", err),
InvalidNodeId => "Invalid node id".into(), InvalidNodeId => "Invalid node id".into(),
OversizedPacket => "Packet is too large".into(), OversizedPacket => "Packet is too large".into(),
Decompression(ref err) => format!("Error decompressing packet: {}", err),
}; };
f.write_fmt(format_args!("Network error ({})", msg)) f.write_fmt(format_args!("Network error ({})", msg))
@ -162,6 +166,12 @@ impl From<CryptoError> for NetworkError {
} }
} }
impl From<snappy::InvalidInput> for NetworkError {
fn from(err: snappy::InvalidInput) -> NetworkError {
NetworkError::Decompression(err)
}
}
impl From<::std::net::AddrParseError> for NetworkError { impl From<::std::net::AddrParseError> for NetworkError {
fn from(err: ::std::net::AddrParseError) -> NetworkError { fn from(err: ::std::net::AddrParseError) -> NetworkError {
NetworkError::AddressParse(err) NetworkError::AddressParse(err)

View File

@ -256,7 +256,7 @@ impl<'s> NetworkContext<'s> {
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
let session = self.resolve_session(peer); let session = self.resolve_session(peer);
if let Some(session) = session { if let Some(session) = session {
session.lock().send_packet(self.io, protocol, packet_id as u8, &data)?; session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
} else { } else {
trace!(target: "network", "Send: Peer no longer exist") trace!(target: "network", "Send: Peer no longer exist")
} }
@ -938,7 +938,7 @@ impl Host {
for (p, packet_id, data) in packet_data { for (p, packet_id, data) in packet_data {
let reserved = self.reserved_nodes.read(); let reserved = self.reserved_nodes.read();
if let Some(h) = handlers.get(&p).clone() { if let Some(h) = handlers.get(&p).clone() {
h.read(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); h.read(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token, packet_id, &data);
} }
} }
} }

View File

@ -81,6 +81,7 @@ extern crate ethcore_logger;
extern crate ipnetwork; extern crate ipnetwork;
extern crate hash; extern crate hash;
extern crate serde_json; extern crate serde_json;
extern crate snappy;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -115,7 +116,7 @@ pub use node_table::{is_valid_node_url, NodeId};
use ipnetwork::{IpNetwork, IpNetworkError}; use ipnetwork::{IpNetwork, IpNetworkError};
use std::str::FromStr; use std::str::FromStr;
const PROTOCOL_VERSION: u32 = 4; const PROTOCOL_VERSION: u32 = 5;
/// Network IO protocol handler. This needs to be implemented for each new subprotocol. /// Network IO protocol handler. This needs to be implemented for each new subprotocol.
/// All the handler function are called from within IO event loop. /// All the handler function are called from within IO event loop.

View File

@ -25,7 +25,7 @@ use mio::deprecated::{Handler, EventLoop};
use mio::tcp::*; use mio::tcp::*;
use bigint::hash::*; use bigint::hash::*;
use rlp::*; use rlp::*;
use connection::{EncryptedConnection, Packet, Connection}; use connection::{EncryptedConnection, Packet, Connection, MAX_PAYLOAD_SIZE};
use handshake::Handshake; use handshake::Handshake;
use io::{IoContext, StreamToken}; use io::{IoContext, StreamToken};
use error::{NetworkError, DisconnectReason}; use error::{NetworkError, DisconnectReason};
@ -33,10 +33,13 @@ use host::*;
use node_table::NodeId; use node_table::NodeId;
use stats::NetworkStats; use stats::NetworkStats;
use time; use time;
use snappy;
// Timeout must be less than (interval - 1). // Timeout must be less than (interval - 1).
const PING_TIMEOUT_SEC: u64 = 60; const PING_TIMEOUT_SEC: u64 = 60;
const PING_INTERVAL_SEC: u64 = 120; const PING_INTERVAL_SEC: u64 = 120;
const MIN_PROTOCOL_VERSION: u32 = 4;
const MIN_COMPRESSION_PROTOCOL_VERSION: u32 = 5;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
enum ProtocolState { enum ProtocolState {
@ -61,6 +64,7 @@ pub struct Session {
state: State, state: State,
// Protocol states -- accumulates pending packets until signaled as ready. // Protocol states -- accumulates pending packets until signaled as ready.
protocol_states: HashMap<ProtocolId, ProtocolState>, protocol_states: HashMap<ProtocolId, ProtocolState>,
compression: bool,
} }
enum State { enum State {
@ -198,6 +202,7 @@ impl Session {
pong_time_ns: None, pong_time_ns: None,
expired: false, expired: false,
protocol_states: HashMap::new(), protocol_states: HashMap::new(),
compression: false,
}) })
} }
@ -211,7 +216,6 @@ impl Session {
}; };
self.state = State::Session(connection); self.state = State::Session(connection);
self.write_hello(io, host)?; self.write_hello(io, host)?;
self.send_ping(io)?;
Ok(()) Ok(())
} }
@ -326,28 +330,43 @@ impl Session {
} }
/// Send a protocol packet to peer. /// Send a protocol packet to peer.
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: [u8; 3], packet_id: u8, data: &[u8]) -> Result<(), NetworkError> pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: Option<[u8; 3]>, packet_id: u8, data: &[u8]) -> Result<(), NetworkError>
where Message: Send + Sync + Clone { where Message: Send + Sync + Clone {
if self.info.capabilities.is_empty() || !self.had_hello { if protocol.is_some() && (self.info.capabilities.is_empty() || !self.had_hello) {
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), str::from_utf8(&protocol[..]).unwrap_or("??"), packet_id); debug!(target: "network", "Sending to unconfirmed session {}, protocol: {:?}, packet: {}", self.token(), protocol.as_ref().map(|p| str::from_utf8(&p[..]).unwrap_or("??")), packet_id);
return Err(From::from(NetworkError::BadProtocol)); return Err(From::from(NetworkError::BadProtocol));
} }
if self.expired() { if self.expired() {
return Err(From::from(NetworkError::Expired)); return Err(From::from(NetworkError::Expired));
} }
let mut i = 0usize; let mut i = 0usize;
while protocol != self.info.capabilities[i].protocol { let pid = match protocol {
i += 1; Some(protocol) => {
if i == self.info.capabilities.len() { while protocol != self.info.capabilities[i].protocol {
debug!(target: "network", "Unknown protocol: {:?}", protocol); i += 1;
return Ok(()) if i == self.info.capabilities.len() {
} debug!(target: "network", "Unknown protocol: {:?}", protocol);
} return Ok(())
let pid = self.info.capabilities[i].id_offset + packet_id; }
}
self.info.capabilities[i].id_offset + packet_id
},
None => packet_id
};
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append(&(pid as u32)); rlp.append(&(pid as u32));
rlp.append_raw(data, 1); let mut compressed = Vec::new();
self.send(io, rlp) let mut payload = data; // create a reference with local lifetime
if self.compression {
if payload.len() > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
}
let len = snappy::compress_into(&payload, &mut compressed);
trace!(target: "network", "compressed {} to {}", payload.len(), len);
payload = &compressed[0..len];
}
rlp.append_raw(payload, 1);
self.send(io, &rlp.drain())
} }
/// Keep this session alive. Returns false if ping timeout happened /// Keep this session alive. Returns false if ping timeout happened
@ -396,14 +415,23 @@ impl Session {
if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello { if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello {
return Err(From::from(NetworkError::BadProtocol)); return Err(From::from(NetworkError::BadProtocol));
} }
let data = if self.compression {
let compressed = &packet.data[1..];
if snappy::decompressed_len(&compressed)? > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
}
snappy::decompress(&compressed)?
} else {
packet.data[1..].to_owned()
};
match packet_id { match packet_id {
PACKET_HELLO => { PACKET_HELLO => {
let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size let rlp = UntrustedRlp::new(&data); //TODO: validate rlp expected size
self.read_hello(io, &rlp, host)?; self.read_hello(io, &rlp, host)?;
Ok(SessionData::Ready) Ok(SessionData::Ready)
}, },
PACKET_DISCONNECT => { PACKET_DISCONNECT => {
let rlp = UntrustedRlp::new(&packet.data[1..]); let rlp = UntrustedRlp::new(&data);
let reason: u8 = rlp.val_at(0)?; let reason: u8 = rlp.val_at(0)?;
if self.had_hello { if self.had_hello {
debug!(target:"network", "Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason)); debug!(target:"network", "Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason));
@ -439,11 +467,11 @@ impl Session {
match *self.protocol_states.entry(protocol).or_insert_with(|| ProtocolState::Pending(Vec::new())) { match *self.protocol_states.entry(protocol).or_insert_with(|| ProtocolState::Pending(Vec::new())) {
ProtocolState::Connected => { ProtocolState::Connected => {
trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, protocol_packet_id, i, self.info.capabilities); trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, protocol_packet_id, i, self.info.capabilities);
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: protocol_packet_id } ) Ok(SessionData::Packet { data: data, protocol: protocol, packet_id: protocol_packet_id } )
} }
ProtocolState::Pending(ref mut pending) => { ProtocolState::Pending(ref mut pending) => {
trace!(target: "network", "Packet {} deferred until protocol connection event completion", packet_id); trace!(target: "network", "Packet {} deferred until protocol connection event completion", packet_id);
pending.push((packet.data, protocol_packet_id)); pending.push((data, protocol_packet_id));
Ok(SessionData::Continue) Ok(SessionData::Continue)
} }
@ -465,7 +493,7 @@ impl Session {
.append_list(&host.capabilities) .append_list(&host.capabilities)
.append(&host.local_endpoint.address.port()) .append(&host.local_endpoint.address.port())
.append(host.id()); .append(host.id());
self.send(io, rlp) self.send(io, &rlp.drain())
} }
fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), NetworkError> fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), NetworkError>
@ -494,8 +522,7 @@ impl Session {
while i < caps.len() { while i < caps.len() {
if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) { if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) {
caps.remove(i); caps.remove(i);
} } else {
else {
i += 1; i += 1;
} }
} }
@ -520,52 +547,46 @@ impl Session {
trace!(target: "network", "No common capabilities with peer."); trace!(target: "network", "No common capabilities with peer.");
return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer))); return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));
} }
if protocol != host.protocol_version { if protocol < MIN_PROTOCOL_VERSION {
trace!(target: "network", "Peer protocol version mismatch: {}", protocol); trace!(target: "network", "Peer protocol version mismatch: {}", protocol);
return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer))); return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));
} }
self.compression = protocol >= MIN_COMPRESSION_PROTOCOL_VERSION;
self.send_ping(io)?;
self.had_hello = true; self.had_hello = true;
Ok(()) Ok(())
} }
/// Senf ping packet /// Senf ping packet
pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone { pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
self.send(io, Session::prepare(PACKET_PING)?)?; self.send_packet(io, None, PACKET_PING, &EMPTY_LIST_RLP)?;
self.ping_time_ns = time::precise_time_ns(); self.ping_time_ns = time::precise_time_ns();
self.pong_time_ns = None; self.pong_time_ns = None;
Ok(()) Ok(())
} }
fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone { fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
self.send(io, Session::prepare(PACKET_PONG)?) self.send_packet(io, None, PACKET_PONG, &EMPTY_LIST_RLP)
} }
/// Disconnect this session /// Disconnect this session
pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> NetworkError where Message: Send + Sync + Clone { pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> NetworkError where Message: Send + Sync + Clone {
if let State::Session(_) = self.state { if let State::Session(_) = self.state {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32));
rlp.begin_list(1); rlp.begin_list(1);
rlp.append(&(reason as u32)); rlp.append(&(reason as u32));
self.send(io, rlp).ok(); self.send_packet(io, None, PACKET_DISCONNECT, &rlp.drain()).ok();
} }
NetworkError::Disconnect(reason) NetworkError::Disconnect(reason)
} }
fn prepare(packet_id: u8) -> Result<RlpStream, NetworkError> { fn send<Message>(&mut self, io: &IoContext<Message>, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
let mut rlp = RlpStream::new();
rlp.append(&(packet_id as u32));
rlp.begin_list(0);
Ok(rlp)
}
fn send<Message>(&mut self, io: &IoContext<Message>, rlp: RlpStream) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
match self.state { match self.state {
State::Handshake(_) => { State::Handshake(_) => {
warn!(target:"network", "Unexpected send request"); warn!(target:"network", "Unexpected send request");
}, },
State::Session(ref mut s) => { State::Session(ref mut s) => {
s.send_packet(io, &rlp.out())? s.send_packet(io, data)?
}, },
} }
Ok(()) Ok(())