use error-chain in ethcore-network

This commit is contained in:
debris
2017-11-13 14:37:08 +01:00
parent b9fbe52f32
commit 3cf52dac59
28 changed files with 235 additions and 244 deletions

View File

@@ -26,7 +26,6 @@ use bigint::hash::*;
use ethcore_bytes::*;
use rlp::*;
use std::io::{self, Cursor, Read, Write};
use error::*;
use io::{IoContext, StreamToken};
use handshake::Handshake;
use stats::NetworkStats;
@@ -37,6 +36,7 @@ use rcrypto::buffer::*;
use tiny_keccak::Keccak;
use bytes::{Buf, BufMut};
use crypto;
use error::{Error, ErrorKind};
const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;
@@ -125,7 +125,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
}
/// Writable IO handler. Called when the socket is ready to send.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, Error> where Message: Send + Clone + Sync + 'static {
{
let buf = match self.send_queue.front_mut() {
Some(buf) => buf,
@@ -300,7 +300,7 @@ pub struct EncryptedConnection {
impl EncryptedConnection {
/// Create an encrypted connection out of the handshake.
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, NetworkError> {
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, Error> {
let shared = crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)?;
let mut nonce_material = H512::new();
if handshake.originated {
@@ -353,11 +353,11 @@ impl EncryptedConnection {
}
/// Send a packet
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new();
let len = payload.len();
if len > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
bail!(ErrorKind::OversizedPacket);
}
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
header.append_raw(&[0xc2u8, 0x80u8, 0x80u8], 1);
@@ -383,16 +383,16 @@ impl EncryptedConnection {
}
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
fn read_header(&mut self, header: &[u8]) -> Result<(), NetworkError> {
fn read_header(&mut self, header: &[u8]) -> Result<(), Error> {
if header.len() != ENCRYPTED_HEADER_LEN {
return Err(From::from(NetworkError::Auth));
return Err(ErrorKind::Auth.into());
}
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &header[0..16]);
let mac = &header[16..];
let mut expected = H256::new();
self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[0..16] {
return Err(From::from(NetworkError::Auth));
return Err(ErrorKind::Auth.into());
}
let mut hdec = H128::new();
@@ -413,11 +413,11 @@ impl EncryptedConnection {
}
/// Decrypt and authenticate packet payload.
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, NetworkError> {
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, Error> {
let padding = (16 - (self.payload_len % 16)) % 16;
let full_length = self.payload_len + padding + 16;
if payload.len() != full_length {
return Err(From::from(NetworkError::Auth));
return Err(ErrorKind::Auth.into());
}
self.ingress_mac.update(&payload[0..payload.len() - 16]);
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &[0u8; 0]);
@@ -425,7 +425,7 @@ impl EncryptedConnection {
let mut expected = H128::new();
self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[..] {
return Err(From::from(NetworkError::Auth));
return Err(ErrorKind::Auth.into());
}
let mut packet = vec![0u8; self.payload_len];
@@ -451,7 +451,7 @@ impl EncryptedConnection {
}
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, NetworkError> where Message: Send + Clone + Sync + 'static {
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, Error> where Message: Send + Clone + Sync + 'static {
io.clear_timer(self.connection.token)?;
if let EncryptedConnectionState::Header = self.read_state {
if let Some(data) = self.connection.readable()? {
@@ -474,7 +474,7 @@ impl EncryptedConnection {
}
/// Writable IO handler. Processes send queeue.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
self.connection.writable(io)?;
Ok(())
}

View File

@@ -27,7 +27,7 @@ use time;
use bigint::hash::*;
use rlp::*;
use node_table::*;
use error::NetworkError;
use error::{Error, ErrorKind};
use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover};
use IpFilter;
@@ -362,15 +362,15 @@ impl Discovery {
res
}
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
// validate packet
if packet.len() < 32 + 65 + 4 + 1 {
return Err(NetworkError::BadProtocol);
return Err(ErrorKind::BadProtocol.into());
}
let hash_signed = keccak(&packet[32..]);
if hash_signed[..] != packet[0..32] {
return Err(NetworkError::BadProtocol);
return Err(ErrorKind::BadProtocol.into());
}
let signed = &packet[(32 + 65)..];
@@ -391,10 +391,10 @@ impl Discovery {
}
}
fn check_timestamp(&self, timestamp: u64) -> Result<(), NetworkError> {
fn check_timestamp(&self, timestamp: u64) -> Result<(), Error> {
if self.check_timestamps && timestamp < time::get_time().sec as u64{
debug!(target: "discovery", "Expired packet");
return Err(NetworkError::Expired);
return Err(ErrorKind::Expired.into());
}
Ok(())
}
@@ -403,7 +403,7 @@ impl Discovery {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
}
fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from);
let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?;
let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?;
@@ -428,7 +428,7 @@ impl Discovery {
Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() }))
}
fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Pong from {:?}", &from);
// TODO: validate pong packet
let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?;
@@ -445,7 +445,7 @@ impl Discovery {
Ok(None)
}
fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got FindNode from {:?}", &from);
let target: NodeId = rlp.val_at(0)?;
let timestamp: u64 = rlp.val_at(1)?;
@@ -478,7 +478,7 @@ impl Discovery {
packets.collect()
}
fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
// TODO: validate packet
let mut added = HashMap::new();
trace!(target: "discovery", "Got {} Neighbours from {:?}", rlp.at(0)?.item_count()?, &from);
@@ -536,12 +536,12 @@ impl Discovery {
self.start();
}
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
Ok(())
}
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
let registration = if !self.send_queue.is_empty() {
Ready::readable() | Ready::writable()
} else {

View File

@@ -14,12 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{io, net, fmt};
use io::IoError;
use rlp::*;
use std::fmt;
use ethkey::Error as KeyError;
use crypto::Error as CryptoError;
use snappy;
use {rlp, ethkey, crypto, snappy};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DisconnectReason
@@ -83,98 +80,80 @@ impl fmt::Display for DisconnectReason {
}
}
#[derive(Debug)]
/// Network error.
pub enum NetworkError {
/// Authentication error.
Auth,
/// Unrecognised protocol.
BadProtocol,
/// Message expired.
Expired,
/// Peer not found.
PeerNotFound,
/// Peer is diconnected.
Disconnect(DisconnectReason),
/// Invalid NodeId
InvalidNodeId,
/// Socket IO error.
Io(IoError),
/// Error concerning the network address parsing subsystem.
AddressParse(::std::net::AddrParseError),
/// Error concerning the network address resolution subsystem.
AddressResolve(Option<::std::io::Error>),
/// Error concerning the Rust standard library's IO subsystem.
StdIo(::std::io::Error),
/// Packet size is over the protocol limit.
OversizedPacket,
/// Decompression error.
Decompression(snappy::InvalidInput),
}
error_chain! {
foreign_links {
SocketIo(IoError) #[doc = "Socket IO error."];
Io(io::Error) #[doc = "Error concerning the Rust standard library's IO subsystem."];
AddressParse(net::AddrParseError) #[doc = "Error concerning the network address parsing subsystem."];
Decompression(snappy::InvalidInput) #[doc = "Decompression error."];
}
impl fmt::Display for NetworkError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::NetworkError::*;
errors {
#[doc = "Error concerning the network address resolution subsystem."]
AddressResolve(err: Option<io::Error>) {
description("Failed to resolve network address"),
display("Failed to resolve network address {}", err.as_ref().map_or("".to_string(), |e| e.to_string())),
}
let msg = match *self {
Auth => "Authentication failure".into(),
BadProtocol => "Bad protocol".into(),
Expired => "Expired message".into(),
PeerNotFound => "Peer not found".into(),
Disconnect(ref reason) => format!("Peer disconnected: {}", reason),
Io(ref err) => format!("Socket I/O error: {}", err),
AddressParse(ref err) => format!("{}", err),
AddressResolve(Some(ref err)) => format!("{}", err),
AddressResolve(_) => "Failed to resolve network address.".into(),
StdIo(ref err) => format!("{}", err),
InvalidNodeId => "Invalid node id".into(),
OversizedPacket => "Packet is too large".into(),
Decompression(ref err) => format!("Error decompressing packet: {}", err),
};
#[doc = "Authentication failure"]
Auth {
description("Authentication failure"),
display("Authentication failure"),
}
f.write_fmt(format_args!("Network error ({})", msg))
#[doc = "Unrecognised protocol"]
BadProtocol {
description("Bad protocol"),
display("Bad protocol"),
}
#[doc = "Expired message"]
Expired {
description("Expired message"),
display("Expired message"),
}
#[doc = "Peer not found"]
PeerNotFound {
description("Peer not found"),
display("Peer not found"),
}
#[doc = "Peer is disconnected"]
Disconnect(reason: DisconnectReason) {
description("Peer disconnected"),
display("Peer disconnected: {}", reason),
}
#[doc = "Invalid node id"]
InvalidNodeId {
description("Invalid node id"),
display("Invalid node id"),
}
#[doc = "Packet size is over the protocol limit"]
OversizedPacket {
description("Packet is too large"),
display("Packet is too large"),
}
}
}
impl From<DecoderError> for NetworkError {
fn from(_err: DecoderError) -> NetworkError {
NetworkError::Auth
impl From<rlp::DecoderError> for Error {
fn from(_err: rlp::DecoderError) -> Self {
ErrorKind::Auth.into()
}
}
impl From<::std::io::Error> for NetworkError {
fn from(err: ::std::io::Error) -> NetworkError {
NetworkError::StdIo(err)
impl From<ethkey::Error> for Error {
fn from(_err: ethkey::Error) -> Self {
ErrorKind::Auth.into()
}
}
impl From<IoError> for NetworkError {
fn from(err: IoError) -> NetworkError {
NetworkError::Io(err)
}
}
impl From<KeyError> for NetworkError {
fn from(_err: KeyError) -> Self {
NetworkError::Auth
}
}
impl From<CryptoError> for NetworkError {
fn from(_err: CryptoError) -> NetworkError {
NetworkError::Auth
}
}
impl From<snappy::InvalidInput> for NetworkError {
fn from(err: snappy::InvalidInput) -> NetworkError {
NetworkError::Decompression(err)
}
}
impl From<::std::net::AddrParseError> for NetworkError {
fn from(err: ::std::net::AddrParseError) -> NetworkError {
NetworkError::AddressParse(err)
impl From<crypto::Error> for Error {
fn from(_err: crypto::Error) -> Self {
ErrorKind::Auth.into()
}
}
@@ -187,13 +166,13 @@ fn test_errors() {
}
assert_eq!(DisconnectReason::Unknown, r);
match <NetworkError as From<DecoderError>>::from(DecoderError::RlpIsTooBig) {
NetworkError::Auth => {},
match *<Error as From<rlp::DecoderError>>::from(rlp::DecoderError::RlpIsTooBig).kind() {
ErrorKind::Auth => {},
_ => panic!("Unexpeceted error"),
}
match <NetworkError as From<CryptoError>>::from(CryptoError::InvalidMessage) {
NetworkError::Auth => {},
match *<Error as From<crypto::Error>>::from(crypto::Error::InvalidMessage).kind() {
ErrorKind::Auth => {},
_ => panic!("Unexpeceted error"),
}
}

View File

@@ -24,11 +24,11 @@ use rlp::*;
use connection::{Connection};
use host::{HostInfo};
use node_table::NodeId;
use error::*;
use stats::NetworkStats;
use io::{IoContext, StreamToken};
use ethkey::{KeyPair, Public, Secret, recover, sign, Generator, Random};
use crypto::{ecdh, ecies};
use error::{Error, ErrorKind};
#[derive(PartialEq, Eq, Debug)]
enum HandshakeState {
@@ -83,7 +83,7 @@ const ECIES_OVERHEAD: usize = 113;
impl Handshake {
/// Create a new handshake object
pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, NetworkError> {
pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, Error> {
Ok(Handshake {
id: if let Some(id) = id { id.clone()} else { NodeId::new() },
connection: Connection::new(token, socket, stats),
@@ -106,7 +106,7 @@ impl Handshake {
}
/// Start a handhsake
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone+ Sync + 'static {
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), Error> where Message: Send + Clone+ Sync + 'static {
self.originated = originated;
io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok();
if originated {
@@ -125,7 +125,7 @@ impl Handshake {
}
/// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
if !self.expired() {
while let Some(data) = self.connection.readable()? {
match self.state {
@@ -154,14 +154,14 @@ impl Handshake {
}
/// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
if !self.expired() {
self.connection.writable(io)?;
}
Ok(())
}
fn set_auth(&mut self, host_secret: &Secret, sig: &[u8], remote_public: &[u8], remote_nonce: &[u8], remote_version: u64) -> Result<(), NetworkError> {
fn set_auth(&mut self, host_secret: &Secret, sig: &[u8], remote_public: &[u8], remote_nonce: &[u8], remote_version: u64) -> Result<(), Error> {
self.id.clone_from_slice(remote_public);
self.remote_nonce.clone_from_slice(remote_nonce);
self.remote_version = remote_version;
@@ -172,11 +172,11 @@ impl Handshake {
}
/// Parse, validate and confirm auth message
fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str());
if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target: "network", "Wrong auth packet size");
return Err(From::from(NetworkError::BadProtocol));
return Err(ErrorKind::BadProtocol.into());
}
self.auth_cipher = data.to_vec();
match ecies::decrypt(secret, &[], data) {
@@ -193,7 +193,7 @@ impl Handshake {
let total = (((data[0] as u16) << 8 | (data[1] as u16)) as usize) + 2;
if total < V4_AUTH_PACKET_SIZE {
debug!(target: "network", "Wrong EIP8 auth packet size");
return Err(From::from(NetworkError::BadProtocol));
return Err(ErrorKind::BadProtocol.into());
}
let rest = total - data.len();
self.state = HandshakeState::ReadingAuthEip8;
@@ -203,7 +203,7 @@ impl Handshake {
Ok(())
}
fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.auth_cipher.extend_from_slice(data);
let auth = ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])?;
@@ -218,11 +218,11 @@ impl Handshake {
}
/// Parse and validate ack message
fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> {
fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), Error> {
trace!(target: "network", "Received handshake ack from {:?}", self.connection.remote_addr_str());
if data.len() != V4_ACK_PACKET_SIZE {
debug!(target: "network", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol));
return Err(ErrorKind::BadProtocol.into());
}
self.ack_cipher = data.to_vec();
match ecies::decrypt(secret, &[], data) {
@@ -236,7 +236,7 @@ impl Handshake {
let total = (((data[0] as u16) << 8 | (data[1] as u16)) as usize) + 2;
if total < V4_ACK_PACKET_SIZE {
debug!(target: "network", "Wrong EIP8 ack packet size");
return Err(From::from(NetworkError::BadProtocol));
return Err(ErrorKind::BadProtocol.into());
}
let rest = total - data.len();
self.state = HandshakeState::ReadingAckEip8;
@@ -246,7 +246,7 @@ impl Handshake {
Ok(())
}
fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> {
fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), Error> {
trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.ack_cipher.extend_from_slice(data);
let ack = ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..])?;
@@ -259,7 +259,7 @@ impl Handshake {
}
/// Sends auth message
fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len();
@@ -286,7 +286,7 @@ impl Handshake {
}
/// Sends ack message
fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len();
@@ -305,7 +305,7 @@ impl Handshake {
}
/// Sends EIP8 ack message
fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str());
let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public());

View File

@@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
use std::io::{Read, Write, ErrorKind};
use std::io::{Read, Write, self};
use std::fs;
use ethkey::{KeyPair, Secret, Random, Generator};
use hash::keccak;
@@ -33,7 +33,6 @@ use bigint::hash::*;
use util::version;
use rlp::*;
use session::{Session, SessionInfo, SessionData};
use error::*;
use io::*;
use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION, IpFilter};
use node_table::*;
@@ -43,6 +42,7 @@ use ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use connection_filter::{ConnectionFilter, ConnectionDirection};
use error::{Error, ErrorKind, DisconnectReason};
type Slab<T> = ::slab::Slab<T, usize>;
@@ -248,12 +248,12 @@ impl<'s> NetworkContext<'s> {
}
/// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
/// Send a packet over the network to another peer using specified protocol.
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
@@ -264,9 +264,9 @@ impl<'s> NetworkContext<'s> {
}
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
assert!(self.session.is_some(), "Respond called without network context");
self.session_id.map_or_else(|| Err(NetworkError::Expired), |id| self.send(id, packet_id, data))
self.session_id.map_or_else(|| Err(ErrorKind::Expired.into()), |id| self.send(id, packet_id, data))
}
/// Get an IoChannel.
@@ -292,7 +292,7 @@ impl<'s> NetworkContext<'s> {
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), NetworkError> {
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
self.io.message(NetworkIoMessage::AddTimer {
token: token,
delay: ms,
@@ -386,7 +386,7 @@ pub struct Host {
impl Host {
/// Create a new instance
pub fn new(mut config: NetworkConfiguration, stats: Arc<NetworkStats>, filter: Option<Arc<ConnectionFilter>>) -> Result<Host, NetworkError> {
pub fn new(mut config: NetworkConfiguration, stats: Arc<NetworkStats>, filter: Option<Arc<ConnectionFilter>>) -> Result<Host, Error> {
let mut listen_address = match config.listen_address {
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
Some(addr) => addr,
@@ -468,7 +468,7 @@ impl Host {
}
}
pub fn add_reserved_node(&self, id: &str) -> Result<(), NetworkError> {
pub fn add_reserved_node(&self, id: &str) -> Result<(), Error> {
let n = Node::from_str(id)?;
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
@@ -512,7 +512,7 @@ impl Host {
}
}
pub fn remove_reserved_node(&self, id: &str) -> Result<(), NetworkError> {
pub fn remove_reserved_node(&self, id: &str) -> Result<(), Error> {
let n = Node::from_str(id)?;
self.reserved_nodes.write().remove(&n.id);
@@ -533,7 +533,7 @@ impl Host {
format!("{}", Node::new(info.id().clone(), info.local_endpoint.clone()))
}
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), NetworkError> {
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.write().iter_mut() {
@@ -563,7 +563,7 @@ impl Host {
peers
}
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), NetworkError> {
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
if self.info.read().public_endpoint.is_some() {
return Ok(());
}
@@ -746,7 +746,7 @@ impl Host {
}
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage>) -> Result<(), NetworkError> {
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
let nonce = self.info.write().next_nonce();
let mut sessions = self.sessions.write();
@@ -775,7 +775,7 @@ impl Host {
let socket = match self.tcp_listener.lock().accept() {
Ok((sock, _addr)) => sock,
Err(e) => {
if e.kind() != ErrorKind::WouldBlock {
if e.kind() != io::ErrorKind::WouldBlock {
debug!(target: "network", "Error accepting connection: {:?}", e);
}
break
@@ -821,7 +821,7 @@ impl Host {
match session_result {
Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol) = e {
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
if let Some(id) = s.id() {
if !self.reserved_nodes.read().contains(id) {
self.nodes.write().mark_as_useless(id);

View File

@@ -56,6 +56,7 @@
//TODO: use Poll from mio
#![allow(deprecated)]
#![recursion_limit="128"]
extern crate ethcore_io as io;
extern crate ethcore_util as util;
@@ -83,6 +84,9 @@ extern crate hash;
extern crate serde_json;
extern crate snappy;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
@@ -109,7 +113,7 @@ mod tests;
pub use host::{HostInfo, PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration};
pub use service::NetworkService;
pub use error::NetworkError;
pub use error::{Error, ErrorKind};
pub use stats::NetworkStats;
pub use session::SessionInfo;
pub use connection_filter::{ConnectionFilter, ConnectionDirection};

View File

@@ -28,7 +28,7 @@ use std::io::{Read, Write};
use bigint::hash::*;
use rlp::*;
use time::Tm;
use NetworkError;
use error::{Error, ErrorKind};
use {AllowIP, IpFilter};
use discovery::{TableUpdates, NodeEntry};
use ip_utils::*;
@@ -117,18 +117,18 @@ impl NodeEndpoint {
}
impl FromStr for NodeEndpoint {
type Err = NetworkError;
type Err = Error;
/// Create endpoint from string. Performs name resolution if given a host name.
fn from_str(s: &str) -> Result<NodeEndpoint, NetworkError> {
fn from_str(s: &str) -> Result<NodeEndpoint, Error> {
let address = s.to_socket_addrs().map(|mut i| i.next());
match address {
Ok(Some(a)) => Ok(NodeEndpoint {
address: a,
udp_port: a.port()
}),
Ok(_) => Err(NetworkError::AddressResolve(None)),
Err(e) => Err(NetworkError::AddressResolve(Some(e)))
Ok(_) => Err(ErrorKind::AddressResolve(None).into()),
Err(e) => Err(ErrorKind::AddressResolve(Some(e)).into())
}
}
}
@@ -171,10 +171,10 @@ impl Display for Node {
}
impl FromStr for Node {
type Err = NetworkError;
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (id, endpoint) = if s.len() > 136 && &s[0..8] == "enode://" && &s[136..137] == "@" {
(s[8..136].parse().map_err(|_| NetworkError::InvalidNodeId)?, NodeEndpoint::from_str(&s[137..])?)
(s[8..136].parse().map_err(|_| ErrorKind::InvalidNodeId)?, NodeEndpoint::from_str(&s[137..])?)
}
else {
(NodeId::new(), NodeEndpoint::from_str(s)?)
@@ -363,7 +363,7 @@ impl Drop for NodeTable {
}
/// Check if node url is valid
pub fn validate_node_url(url: &str) -> Option<NetworkError> {
pub fn validate_node_url(url: &str) -> Option<Error> {
use std::str::FromStr;
match Node::from_str(url) {
Ok(_) => None,

View File

@@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use {NetworkProtocolHandler, NetworkConfiguration, NonReservedPeerMode};
use error::NetworkError;
use error::Error;
use host::{Host, NetworkContext, NetworkIoMessage, PeerId, ProtocolId};
use stats::NetworkStats;
use io::*;
@@ -54,7 +54,7 @@ pub struct NetworkService {
impl NetworkService {
/// Starts IO event loop
pub fn new(config: NetworkConfiguration, filter: Option<Arc<ConnectionFilter>>) -> Result<NetworkService, NetworkError> {
pub fn new(config: NetworkConfiguration, filter: Option<Arc<ConnectionFilter>>) -> Result<NetworkService, Error> {
let host_handler = Arc::new(HostHandler { public_url: RwLock::new(None) });
let io_service = IoService::<NetworkIoMessage>::start()?;
@@ -72,7 +72,7 @@ impl NetworkService {
}
/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), NetworkError> {
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), Error> {
self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler,
protocol: protocol,
@@ -115,7 +115,7 @@ impl NetworkService {
}
/// Start network IO
pub fn start(&self) -> Result<(), NetworkError> {
pub fn start(&self) -> Result<(), Error> {
let mut host = self.host.write();
if host.is_none() {
let h = Arc::new(Host::new(self.config.clone(), self.stats.clone(), self.filter.clone())?);
@@ -131,7 +131,7 @@ impl NetworkService {
}
/// Stop network IO
pub fn stop(&self) -> Result<(), NetworkError> {
pub fn stop(&self) -> Result<(), Error> {
let mut host = self.host.write();
if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
@@ -147,7 +147,7 @@ impl NetworkService {
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), NetworkError> {
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let host = self.host.read();
if let Some(ref host) = *host {
host.add_reserved_node(peer)
@@ -157,7 +157,7 @@ impl NetworkService {
}
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), NetworkError> {
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let host = self.host.read();
if let Some(ref host) = *host {
host.remove_reserved_node(peer)

View File

@@ -28,7 +28,7 @@ use rlp::*;
use connection::{EncryptedConnection, Packet, Connection, MAX_PAYLOAD_SIZE};
use handshake::Handshake;
use io::{IoContext, StreamToken};
use error::{NetworkError, DisconnectReason};
use error::{Error, ErrorKind, DisconnectReason};
use host::*;
use node_table::NodeId;
use stats::NetworkStats;
@@ -178,7 +178,7 @@ impl Session {
/// Create a new session out of comepleted handshake. This clones the handshake connection object
/// and leaves the handhsake in limbo to be deregistered from the event loop.
pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>,
nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, NetworkError>
nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, Error>
where Message: Send + Clone + Sync + 'static {
let originated = id.is_some();
let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake");
@@ -206,7 +206,7 @@ impl Session {
})
}
fn complete_handshake<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
fn complete_handshake<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), Error> where Message: Send + Sync + Clone {
let connection = if let State::Handshake(ref mut h) = self.state {
self.info.id = Some(h.id.clone());
self.info.remote_address = h.connection.remote_addr_str();
@@ -260,7 +260,7 @@ impl Session {
}
/// Readable IO handler. Returns packet data if available.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<SessionData, NetworkError> where Message: Send + Sync + Clone {
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<SessionData, Error> where Message: Send + Sync + Clone {
if self.expired() {
return Ok(SessionData::None)
}
@@ -291,7 +291,7 @@ impl Session {
}
/// Writable IO handler. Sends pending packets.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), Error> where Message: Send + Sync + Clone {
match self.state {
State::Handshake(ref mut h) => h.writable(io),
State::Session(ref mut s) => s.writable(io),
@@ -309,7 +309,7 @@ impl Session {
}
/// Register the session socket with the event loop
pub fn register_socket<Host:Handler<Timeout = Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
pub fn register_socket<Host:Handler<Timeout = Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
if self.expired() {
return Ok(());
}
@@ -318,26 +318,26 @@ impl Session {
}
/// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
self.connection().update_socket(reg, event_loop)?;
Ok(())
}
/// Delete registration
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> {
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
self.connection().deregister_socket(event_loop)?;
Ok(())
}
/// Send a protocol packet to peer.
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: Option<[u8; 3]>, packet_id: u8, data: &[u8]) -> Result<(), NetworkError>
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: Option<[u8; 3]>, packet_id: u8, data: &[u8]) -> Result<(), Error>
where Message: Send + Sync + Clone {
if protocol.is_some() && (self.info.capabilities.is_empty() || !self.had_hello) {
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {:?}, packet: {}", self.token(), protocol.as_ref().map(|p| str::from_utf8(&p[..]).unwrap_or("??")), packet_id);
return Err(From::from(NetworkError::BadProtocol));
bail!(ErrorKind::BadProtocol);
}
if self.expired() {
return Err(From::from(NetworkError::Expired));
return Err(ErrorKind::Expired.into());
}
let mut i = 0usize;
let pid = match protocol {
@@ -359,7 +359,7 @@ impl Session {
let mut payload = data; // create a reference with local lifetime
if self.compression {
if payload.len() > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
bail!(ErrorKind::OversizedPacket);
}
let len = snappy::compress_into(&payload, &mut compressed);
trace!(target: "network", "compressed {} to {}", payload.len(), len);
@@ -406,19 +406,19 @@ impl Session {
}
}
fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError>
fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, Error>
where Message: Send + Sync + Clone {
if packet.data.len() < 2 {
return Err(From::from(NetworkError::BadProtocol));
return Err(ErrorKind::BadProtocol.into());
}
let packet_id = packet.data[0];
if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello {
return Err(From::from(NetworkError::BadProtocol));
return Err(ErrorKind::BadProtocol.into());
}
let data = if self.compression {
let compressed = &packet.data[1..];
if snappy::decompressed_len(&compressed)? > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket);
bail!(ErrorKind::OversizedPacket);
}
snappy::decompress(&compressed)?
} else {
@@ -436,7 +436,7 @@ impl Session {
if self.had_hello {
debug!(target:"network", "Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason));
}
Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason))))
Err(ErrorKind::Disconnect(DisconnectReason::from_u8(reason)).into())
}
PACKET_PING => {
self.send_pong(io)?;
@@ -484,7 +484,7 @@ impl Session {
}
}
fn write_hello<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
fn write_hello<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), Error> where Message: Send + Sync + Clone {
let mut rlp = RlpStream::new();
rlp.append_raw(&[PACKET_HELLO as u8], 0);
rlp.begin_list(5)
@@ -496,7 +496,7 @@ impl Session {
self.send(io, &rlp.drain())
}
fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), NetworkError>
fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error>
where Message: Send + Sync + Clone {
let protocol = rlp.val_at::<u32>(0)?;
let client_version = rlp.val_at::<String>(1)?;
@@ -558,29 +558,29 @@ impl Session {
}
/// Senf ping packet
pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Sync + Clone {
self.send_packet(io, None, PACKET_PING, &EMPTY_LIST_RLP)?;
self.ping_time_ns = time::precise_time_ns();
self.pong_time_ns = None;
Ok(())
}
fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Sync + Clone {
self.send_packet(io, None, PACKET_PONG, &EMPTY_LIST_RLP)
}
/// Disconnect this session
pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> NetworkError where Message: Send + Sync + Clone {
pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> Error where Message: Send + Sync + Clone {
if let State::Session(_) = self.state {
let mut rlp = RlpStream::new();
rlp.begin_list(1);
rlp.append(&(reason as u32));
self.send_packet(io, None, PACKET_DISCONNECT, &rlp.drain()).ok();
}
NetworkError::Disconnect(reason)
ErrorKind::Disconnect(reason).into()
}
fn send<Message>(&mut self, io: &IoContext<Message>, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
fn send<Message>(&mut self, io: &IoContext<Message>, data: &[u8]) -> Result<(), Error> where Message: Send + Sync + Clone {
match self.state {
State::Handshake(_) => {
warn!(target:"network", "Unexpected send request");