Merge pull request #7040 from paritytech/squashed_network_error_chain

squashed ethcore-network changes which introduce error-chain
This commit is contained in:
Marek Kotewicz 2017-11-15 18:18:25 +01:00 committed by GitHub
commit 0230a44b15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 235 additions and 244 deletions

5
Cargo.lock generated
View File

@ -424,9 +424,6 @@ dependencies = [
name = "error-chain" name = "error-chain"
version = "0.11.0" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"backtrace 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "eth-secp256k1" name = "eth-secp256k1"
@ -637,6 +634,7 @@ dependencies = [
"ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.103 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.103 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-bigint 0.2.1", "ethcore-bigint 0.2.1",
"ethcore-bytes 0.1.0", "ethcore-bytes 0.1.0",
"ethcore-devtools 1.9.0", "ethcore-devtools 1.9.0",
@ -2935,6 +2933,7 @@ name = "snappy"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.4.5 (git+https://github.com/paritytech/rust-rocksdb)",
] ]
[[package]] [[package]]

View File

@ -17,10 +17,8 @@
//! Defines error types and levels of punishment to use upon //! Defines error types and levels of punishment to use upon
//! encountering. //! encountering.
use rlp::DecoderError;
use network::NetworkError;
use std::fmt; use std::fmt;
use {rlp, network};
/// Levels of punishment. /// Levels of punishment.
/// ///
@ -41,9 +39,9 @@ pub enum Punishment {
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// An RLP decoding error. /// An RLP decoding error.
Rlp(DecoderError), Rlp(rlp::DecoderError),
/// A network error. /// A network error.
Network(NetworkError), Network(network::Error),
/// Out of credits. /// Out of credits.
NoCredits, NoCredits,
/// Unrecognized packet code. /// Unrecognized packet code.
@ -92,14 +90,14 @@ impl Error {
} }
} }
impl From<DecoderError> for Error { impl From<rlp::DecoderError> for Error {
fn from(err: DecoderError) -> Self { fn from(err: rlp::DecoderError) -> Self {
Error::Rlp(err) Error::Rlp(err)
} }
} }
impl From<NetworkError> for Error { impl From<network::Error> for Error {
fn from(err: NetworkError) -> Self { fn from(err: network::Error) -> Self {
Error::Network(err) Error::Network(err)
} }
} }

View File

@ -28,7 +28,7 @@ use bigint::hash::H256;
use util::{version_data, Address}; use util::{version_data, Address};
use bytes::Bytes; use bytes::Bytes;
use ansi_term::Colour; use ansi_term::Colour;
use ethsync::{NetworkConfiguration, validate_node_url, NetworkError}; use ethsync::{NetworkConfiguration, validate_node_url, self};
use ethcore::ethstore::ethkey::{Secret, Public}; use ethcore::ethstore::ethkey::{Secret, Public};
use ethcore::client::{VMType}; use ethcore::client::{VMType};
use ethcore::miner::{MinerOptions, Banning, StratumOptions}; use ethcore::miner::{MinerOptions, Banning, StratumOptions};
@ -700,9 +700,9 @@ impl Configuration {
let lines = buffer.lines().map(|s| s.trim().to_owned()).filter(|s| !s.is_empty() && !s.starts_with("#")).collect::<Vec<_>>(); let lines = buffer.lines().map(|s| s.trim().to_owned()).filter(|s| !s.is_empty() && !s.starts_with("#")).collect::<Vec<_>>();
for line in &lines { for line in &lines {
match validate_node_url(line) { match validate_node_url(line).map(Into::into) {
None => continue, None => continue,
Some(NetworkError::AddressResolve(_)) => return Err(format!("Failed to resolve hostname of a boot node: {}", line)), Some(ethsync::ErrorKind::AddressResolve(_)) => return Err(format!("Failed to resolve hostname of a boot node: {}", line)),
Some(_) => return Err(format!("Invalid node address format given for a boot node: {}", line)), Some(_) => return Err(format!("Invalid node address format given for a boot node: {}", line)),
} }
} }

View File

@ -29,7 +29,7 @@ use cache::CacheConfig;
use dir::DatabaseDirectories; use dir::DatabaseDirectories;
use upgrade::{upgrade, upgrade_data_paths}; use upgrade::{upgrade, upgrade_data_paths};
use migration::migrate; use migration::migrate;
use ethsync::{validate_node_url, NetworkError}; use ethsync::{validate_node_url, self};
use path; use path;
pub fn to_duration(s: &str) -> Result<Duration, String> { pub fn to_duration(s: &str) -> Result<Duration, String> {
@ -181,9 +181,9 @@ pub fn parity_ipc_path(base: &str, path: &str, shift: u16) -> String {
pub fn to_bootnodes(bootnodes: &Option<String>) -> Result<Vec<String>, String> { pub fn to_bootnodes(bootnodes: &Option<String>) -> Result<Vec<String>, String> {
match *bootnodes { match *bootnodes {
Some(ref x) if !x.is_empty() => x.split(',').map(|s| { Some(ref x) if !x.is_empty() => x.split(',').map(|s| {
match validate_node_url(s) { match validate_node_url(s).map(Into::into) {
None => Ok(s.to_owned()), None => Ok(s.to_owned()),
Some(NetworkError::AddressResolve(_)) => Err(format!("Failed to resolve hostname of a boot node: {}", s)), Some(ethsync::ErrorKind::AddressResolve(_)) => Err(format!("Failed to resolve hostname of a boot node: {}", s)),
Some(_) => Err(format!("Invalid node address format given for a boot node: {}", s)), Some(_) => Err(format!("Invalid node address format given for a boot node: {}", s)),
} }
}).collect(), }).collect(),

View File

@ -17,7 +17,7 @@
use std::sync::Arc; use std::sync::Arc;
use ethcore::client::BlockChainClient; use ethcore::client::BlockChainClient;
use ethsync::{AttachedProtocol, SyncConfig, NetworkConfiguration, NetworkError, Params, ConnectionFilter}; use ethsync::{self, AttachedProtocol, SyncConfig, NetworkConfiguration, Params, ConnectionFilter};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use light::Provider; use light::Provider;
@ -36,7 +36,7 @@ pub fn sync(
_log_settings: &LogConfig, _log_settings: &LogConfig,
attached_protos: Vec<AttachedProtocol>, attached_protos: Vec<AttachedProtocol>,
connection_filter: Option<Arc<ConnectionFilter>>, connection_filter: Option<Arc<ConnectionFilter>>,
) -> Result<SyncModules, NetworkError> { ) -> Result<SyncModules, ethsync::Error> {
let eth_sync = EthSync::new(Params { let eth_sync = EthSync::new(Params {
config: sync_cfg, config: sync_cfg,
chain: client, chain: client,

View File

@ -19,7 +19,7 @@ use std::collections::{HashMap, BTreeMap};
use std::io; use std::io;
use bytes::Bytes; use bytes::Bytes;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId, use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError, ConnectionFilter}; NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, Error, ErrorKind, ConnectionFilter};
use bigint::prelude::U256; use bigint::prelude::U256;
use bigint::hash::{H256, H512}; use bigint::hash::{H256, H512};
use io::{TimerToken}; use io::{TimerToken};
@ -207,7 +207,7 @@ pub struct EthSync {
impl EthSync { impl EthSync {
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn new(params: Params, connection_filter: Option<Arc<ConnectionFilter>>) -> Result<Arc<EthSync>, NetworkError> { pub fn new(params: Params, connection_filter: Option<Arc<ConnectionFilter>>) -> Result<Arc<EthSync>, Error> {
const MAX_LIGHTSERV_LOAD: f64 = 0.5; const MAX_LIGHTSERV_LOAD: f64 = 0.5;
let pruning_info = params.chain.pruning_info(); let pruning_info = params.chain.pruning_info();
@ -397,8 +397,8 @@ impl ChainNotify for EthSync {
} }
fn start(&self) { fn start(&self) {
match self.network.start() { match self.network.start().map_err(Into::into) {
Err(NetworkError::StdIo(ref e)) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port {:?} is already in use, make sure that another instance of an Ethereum client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")), Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port {:?} is already in use, make sure that another instance of an Ethereum client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")),
Err(err) => warn!("Error starting network: {}", err), Err(err) => warn!("Error starting network: {}", err),
_ => {}, _ => {},
} }
@ -675,7 +675,7 @@ pub struct LightSync {
impl LightSync { impl LightSync {
/// Create a new light sync service. /// Create a new light sync service.
pub fn new<L>(params: LightSyncParams<L>) -> Result<Self, NetworkError> pub fn new<L>(params: LightSyncParams<L>) -> Result<Self, Error>
where L: AsLightClient + Provider + Sync + Send + 'static where L: AsLightClient + Provider + Sync + Send + 'static
{ {
use light_sync::LightSync as SyncHandler; use light_sync::LightSync as SyncHandler;
@ -752,8 +752,10 @@ impl ManageNetwork for LightSync {
} }
fn start_network(&self) { fn start_network(&self) {
match self.network.start() { match self.network.start().map_err(Into::into) {
Err(NetworkError::StdIo(ref e)) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port {:?} is already in use, make sure that another instance of an Ethereum client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")), Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse => {
warn!("Network port {:?} is already in use, make sure that another instance of an Ethereum client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set."))
}
Err(err) => warn!("Error starting network: {}", err), Err(err) => warn!("Error starting network: {}", err),
_ => {}, _ => {},
} }

View File

@ -23,7 +23,7 @@ use bigint::hash::H256;
use triehash::ordered_trie_root; use triehash::ordered_trie_root;
use bytes::Bytes; use bytes::Bytes;
use rlp::*; use rlp::*;
use network::NetworkError; use network;
use ethcore::header::Header as BlockHeader; use ethcore::header::Header as BlockHeader;
known_heap_size!(0, HeaderId); known_heap_size!(0, HeaderId);
@ -341,7 +341,7 @@ impl BlockCollection {
self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash) self.downloading_headers.contains(hash) || self.downloading_bodies.contains(hash)
} }
fn insert_body(&mut self, b: Bytes) -> Result<(), NetworkError> { fn insert_body(&mut self, b: Bytes) -> Result<(), network::Error> {
let header_id = { let header_id = {
let body = UntrustedRlp::new(&b); let body = UntrustedRlp::new(&b);
let tx = body.at(0)?; let tx = body.at(0)?;
@ -365,18 +365,18 @@ impl BlockCollection {
}, },
None => { None => {
warn!("Got body with no header {}", h); warn!("Got body with no header {}", h);
Err(NetworkError::BadProtocol) Err(network::ErrorKind::BadProtocol.into())
} }
} }
} }
None => { None => {
trace!(target: "sync", "Ignored unknown/stale block body. tx_root = {:?}, uncles = {:?}", header_id.transactions_root, header_id.uncles); trace!(target: "sync", "Ignored unknown/stale block body. tx_root = {:?}, uncles = {:?}", header_id.transactions_root, header_id.uncles);
Err(NetworkError::BadProtocol) Err(network::ErrorKind::BadProtocol.into())
} }
} }
} }
fn insert_receipt(&mut self, r: Bytes) -> Result<(), NetworkError> { fn insert_receipt(&mut self, r: Bytes) -> Result<(), network::Error> {
let receipt_root = { let receipt_root = {
let receipts = UntrustedRlp::new(&r); let receipts = UntrustedRlp::new(&r);
ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here
@ -392,7 +392,7 @@ impl BlockCollection {
}, },
None => { None => {
warn!("Got receipt with no header {}", h); warn!("Got receipt with no header {}", h);
return Err(NetworkError::BadProtocol) return Err(network::ErrorKind::BadProtocol.into())
} }
} }
} }
@ -400,7 +400,7 @@ impl BlockCollection {
} }
_ => { _ => {
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root); trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);
Err(NetworkError::BadProtocol) Err(network::ErrorKind::BadProtocol.into())
} }
} }
} }

View File

@ -97,7 +97,7 @@ use bigint::hash::{H256, H256FastMap};
use parking_lot::RwLock; use parking_lot::RwLock;
use bytes::Bytes; use bytes::Bytes;
use rlp::*; use rlp::*;
use network::*; use network::{self, PeerId, PacketId};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockImportError, BlockQueueInfo}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockImportError, BlockQueueInfo};
use ethcore::error::*; use ethcore::error::*;
@ -1493,7 +1493,7 @@ impl ChainSync {
} }
/// Send Status message /// Send Status message
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> { fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer); let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
let warp_protocol = warp_protocol_version != 0; let warp_protocol = warp_protocol_version != 0;
let protocol = if warp_protocol { warp_protocol_version } else { PROTOCOL_VERSION_63 }; let protocol = if warp_protocol { warp_protocol_version } else { PROTOCOL_VERSION_63 };
@ -1705,7 +1705,7 @@ impl ChainSync {
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult, where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
FError : FnOnce(NetworkError) -> String FError : FnOnce(network::Error) -> String
{ {
let response = rlp_func(io, rlp, peer); let response = rlp_func(io, rlp, peer);
match response { match response {

View File

@ -72,7 +72,7 @@ mod api;
pub use api::*; pub use api::*;
pub use chain::{SyncStatus, SyncState}; pub use chain::{SyncStatus, SyncState};
pub use network::{validate_node_url, NonReservedPeerMode, NetworkError, ConnectionFilter, ConnectionDirection}; pub use network::{validate_node_url, NonReservedPeerMode, Error, ErrorKind, ConnectionFilter, ConnectionDirection};
#[cfg(test)] #[cfg(test)]
pub(crate) type Address = bigint::hash::H160; pub(crate) type Address = bigint::hash::H160;

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap; use std::collections::HashMap;
use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo, ProtocolId}; use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId};
use bytes::Bytes; use bytes::Bytes;
use ethcore::client::BlockChainClient; use ethcore::client::BlockChainClient;
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
@ -31,11 +31,11 @@ pub trait SyncIo {
/// Disconnect peer /// Disconnect peer
fn disconnect_peer(&mut self, peer_id: PeerId); fn disconnect_peer(&mut self, peer_id: PeerId);
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet. /// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>; fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
/// Send a packet to a peer. /// Send a packet to a peer.
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>; fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
/// Send a packet to a peer using specified protocol. /// Send a packet to a peer using specified protocol.
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>; fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
/// Get the blockchain /// Get the blockchain
fn chain(&self) -> &BlockChainClient; fn chain(&self) -> &BlockChainClient;
/// Get the snapshot service. /// Get the snapshot service.
@ -92,15 +92,15 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
self.network.disconnect_peer(peer_id); self.network.disconnect_peer(peer_id);
} }
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{ fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
self.network.respond(packet_id, data) self.network.respond(packet_id, data)
} }
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{ fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
self.network.send(peer_id, packet_id, data) self.network.send(peer_id, packet_id, data)
} }
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{ fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
self.network.send_protocol(protocol, peer_id, packet_id, data) self.network.send_protocol(protocol, peer_id, packet_id, data)
} }

View File

@ -19,7 +19,7 @@ use std::sync::Arc;
use bigint::hash::H256; use bigint::hash::H256;
use parking_lot::RwLock; use parking_lot::RwLock;
use bytes::Bytes; use bytes::Bytes;
use network::*; use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use tests::snapshot::*; use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, ClientConfig, ChainNotify}; use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, ClientConfig, ChainNotify};
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
@ -90,7 +90,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
false false
} }
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
self.packets.push(TestPacket { self.packets.push(TestPacket {
data: data, data: data,
packet_id: packet_id, packet_id: packet_id,
@ -99,7 +99,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
Ok(()) Ok(())
} }
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
self.packets.push(TestPacket { self.packets.push(TestPacket {
data: data, data: data,
packet_id: packet_id, packet_id: packet_id,
@ -108,7 +108,7 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
Ok(()) Ok(())
} }
fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
self.send(peer_id, packet_id, data) self.send(peer_id, packet_id, data)
} }

View File

@ -7,5 +7,5 @@ authors = ["Parity Technologies <admin@parity.io>"]
rlp = { path = "../rlp" } rlp = { path = "../rlp" }
kvdb = { path = "../kvdb" } kvdb = { path = "../kvdb" }
ethcore-bigint = { path = "../bigint" } ethcore-bigint = { path = "../bigint" }
error-chain = "0.11.0" error-chain = { version = "0.11", default-features = false }
rustc-hex = "1.0" rustc-hex = "1.0"

View File

@ -67,9 +67,9 @@ extern crate parking_lot;
mod service; mod service;
mod worker; mod worker;
use mio::{Token}; use std::{fmt, error};
use mio::deprecated::{EventLoop, NotifyError}; use mio::deprecated::{EventLoop, NotifyError};
use std::fmt; use mio::Token;
pub use worker::LOCAL_STACK_SIZE; pub use worker::LOCAL_STACK_SIZE;
@ -93,6 +93,12 @@ impl fmt::Display for IoError {
} }
} }
impl error::Error for IoError {
fn description(&self) -> &str {
"IO error"
}
}
impl From<::std::io::Error> for IoError { impl From<::std::io::Error> for IoError {
fn from(err: ::std::io::Error) -> IoError { fn from(err: ::std::io::Error) -> IoError {
IoError::StdIo(err) IoError::StdIo(err)

View File

@ -5,5 +5,5 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
elastic-array = "0.9" elastic-array = "0.9"
error-chain = "0.11.0" error-chain = { version = "0.11", default-features = false }
ethcore-bytes = { path = "../bytes" } ethcore-bytes = { path = "../bytes" }

View File

@ -8,7 +8,7 @@ log = "0.3"
macros = { path = "../macros" } macros = { path = "../macros" }
kvdb = { path = "../kvdb" } kvdb = { path = "../kvdb" }
kvdb-rocksdb = { path = "../kvdb-rocksdb" } kvdb-rocksdb = { path = "../kvdb-rocksdb" }
error-chain = "0.11.0" error-chain = { version = "0.11", default-features = false }
[dev-dependencies] [dev-dependencies]
tempdir = "0.3" tempdir = "0.3"

View File

@ -34,6 +34,7 @@ ipnetwork = "0.12.6"
hash = { path = "../hash" } hash = { path = "../hash" }
snappy = { path = "../snappy" } snappy = { path = "../snappy" }
serde_json = "1.0" serde_json = "1.0"
error-chain = { version = "0.11", default-features = false }
[dev-dependencies] [dev-dependencies]
ethcore-devtools = { path = "../../devtools" } ethcore-devtools = { path = "../../devtools" }

View File

@ -26,7 +26,6 @@ use bigint::hash::*;
use ethcore_bytes::*; use ethcore_bytes::*;
use rlp::*; use rlp::*;
use std::io::{self, Cursor, Read, Write}; use std::io::{self, Cursor, Read, Write};
use error::*;
use io::{IoContext, StreamToken}; use io::{IoContext, StreamToken};
use handshake::Handshake; use handshake::Handshake;
use stats::NetworkStats; use stats::NetworkStats;
@ -37,6 +36,7 @@ use rcrypto::buffer::*;
use tiny_keccak::Keccak; use tiny_keccak::Keccak;
use bytes::{Buf, BufMut}; use bytes::{Buf, BufMut};
use crypto; use crypto;
use error::{Error, ErrorKind};
const ENCRYPTED_HEADER_LEN: usize = 32; const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000; const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;
@ -125,7 +125,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
/// Writable IO handler. Called when the socket is ready to send. /// Writable IO handler. Called when the socket is ready to send.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, Error> where Message: Send + Clone + Sync + 'static {
{ {
let buf = match self.send_queue.front_mut() { let buf = match self.send_queue.front_mut() {
Some(buf) => buf, Some(buf) => buf,
@ -300,7 +300,7 @@ pub struct EncryptedConnection {
impl EncryptedConnection { impl EncryptedConnection {
/// Create an encrypted connection out of the handshake. /// Create an encrypted connection out of the handshake.
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, NetworkError> { pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, Error> {
let shared = crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)?; let shared = crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral)?;
let mut nonce_material = H512::new(); let mut nonce_material = H512::new();
if handshake.originated { if handshake.originated {
@ -353,11 +353,11 @@ impl EncryptedConnection {
} }
/// Send a packet /// Send a packet
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new(); let mut header = RlpStream::new();
let len = payload.len(); let len = payload.len();
if len > MAX_PAYLOAD_SIZE { if len > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket); bail!(ErrorKind::OversizedPacket);
} }
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
header.append_raw(&[0xc2u8, 0x80u8, 0x80u8], 1); header.append_raw(&[0xc2u8, 0x80u8, 0x80u8], 1);
@ -383,16 +383,16 @@ impl EncryptedConnection {
} }
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload. /// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
fn read_header(&mut self, header: &[u8]) -> Result<(), NetworkError> { fn read_header(&mut self, header: &[u8]) -> Result<(), Error> {
if header.len() != ENCRYPTED_HEADER_LEN { if header.len() != ENCRYPTED_HEADER_LEN {
return Err(From::from(NetworkError::Auth)); return Err(ErrorKind::Auth.into());
} }
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &header[0..16]); EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &header[0..16]);
let mac = &header[16..]; let mac = &header[16..];
let mut expected = H256::new(); let mut expected = H256::new();
self.ingress_mac.clone().finalize(&mut expected); self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[0..16] { if mac != &expected[0..16] {
return Err(From::from(NetworkError::Auth)); return Err(ErrorKind::Auth.into());
} }
let mut hdec = H128::new(); let mut hdec = H128::new();
@ -413,11 +413,11 @@ impl EncryptedConnection {
} }
/// Decrypt and authenticate packet payload. /// Decrypt and authenticate packet payload.
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, NetworkError> { fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, Error> {
let padding = (16 - (self.payload_len % 16)) % 16; let padding = (16 - (self.payload_len % 16)) % 16;
let full_length = self.payload_len + padding + 16; let full_length = self.payload_len + padding + 16;
if payload.len() != full_length { if payload.len() != full_length {
return Err(From::from(NetworkError::Auth)); return Err(ErrorKind::Auth.into());
} }
self.ingress_mac.update(&payload[0..payload.len() - 16]); self.ingress_mac.update(&payload[0..payload.len() - 16]);
EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &[0u8; 0]); EncryptedConnection::update_mac(&mut self.ingress_mac, &mut self.mac_encoder, &[0u8; 0]);
@ -425,7 +425,7 @@ impl EncryptedConnection {
let mut expected = H128::new(); let mut expected = H128::new();
self.ingress_mac.clone().finalize(&mut expected); self.ingress_mac.clone().finalize(&mut expected);
if mac != &expected[..] { if mac != &expected[..] {
return Err(From::from(NetworkError::Auth)); return Err(ErrorKind::Auth.into());
} }
let mut packet = vec![0u8; self.payload_len]; let mut packet = vec![0u8; self.payload_len];
@ -451,7 +451,7 @@ impl EncryptedConnection {
} }
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, NetworkError> where Message: Send + Clone + Sync + 'static { pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, Error> where Message: Send + Clone + Sync + 'static {
io.clear_timer(self.connection.token)?; io.clear_timer(self.connection.token)?;
if let EncryptedConnectionState::Header = self.read_state { if let EncryptedConnectionState::Header = self.read_state {
if let Some(data) = self.connection.readable()? { if let Some(data) = self.connection.readable()? {
@ -474,7 +474,7 @@ impl EncryptedConnection {
} }
/// Writable IO handler. Processes send queeue. /// Writable IO handler. Processes send queeue.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
self.connection.writable(io)?; self.connection.writable(io)?;
Ok(()) Ok(())
} }

View File

@ -27,7 +27,7 @@ use time;
use bigint::hash::*; use bigint::hash::*;
use rlp::*; use rlp::*;
use node_table::*; use node_table::*;
use error::NetworkError; use error::{Error, ErrorKind};
use io::{StreamToken, IoContext}; use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover}; use ethkey::{Secret, KeyPair, sign, recover};
use IpFilter; use IpFilter;
@ -362,15 +362,15 @@ impl Discovery {
res res
} }
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
// validate packet // validate packet
if packet.len() < 32 + 65 + 4 + 1 { if packet.len() < 32 + 65 + 4 + 1 {
return Err(NetworkError::BadProtocol); return Err(ErrorKind::BadProtocol.into());
} }
let hash_signed = keccak(&packet[32..]); let hash_signed = keccak(&packet[32..]);
if hash_signed[..] != packet[0..32] { if hash_signed[..] != packet[0..32] {
return Err(NetworkError::BadProtocol); return Err(ErrorKind::BadProtocol.into());
} }
let signed = &packet[(32 + 65)..]; let signed = &packet[(32 + 65)..];
@ -391,10 +391,10 @@ impl Discovery {
} }
} }
fn check_timestamp(&self, timestamp: u64) -> Result<(), NetworkError> { fn check_timestamp(&self, timestamp: u64) -> Result<(), Error> {
if self.check_timestamps && timestamp < time::get_time().sec as u64{ if self.check_timestamps && timestamp < time::get_time().sec as u64{
debug!(target: "discovery", "Expired packet"); debug!(target: "discovery", "Expired packet");
return Err(NetworkError::Expired); return Err(ErrorKind::Expired.into());
} }
Ok(()) Ok(())
} }
@ -403,7 +403,7 @@ impl Discovery {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
} }
fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from); trace!(target: "discovery", "Got Ping from {:?}", &from);
let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?; let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?;
let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?; let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?;
@ -428,7 +428,7 @@ impl Discovery {
Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() })) Ok(Some(TableUpdates { added: added_map, removed: HashSet::new() }))
} }
fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_pong(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Pong from {:?}", &from); trace!(target: "discovery", "Got Pong from {:?}", &from);
// TODO: validate pong packet // TODO: validate pong packet
let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?; let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?;
@ -445,7 +445,7 @@ impl Discovery {
Ok(None) Ok(None)
} }
fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_find_node(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got FindNode from {:?}", &from); trace!(target: "discovery", "Got FindNode from {:?}", &from);
let target: NodeId = rlp.val_at(0)?; let target: NodeId = rlp.val_at(0)?;
let timestamp: u64 = rlp.val_at(1)?; let timestamp: u64 = rlp.val_at(1)?;
@ -478,7 +478,7 @@ impl Discovery {
packets.collect() packets.collect()
} }
fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> { fn on_neighbours(&mut self, rlp: &UntrustedRlp, _node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
// TODO: validate packet // TODO: validate packet
let mut added = HashMap::new(); let mut added = HashMap::new();
trace!(target: "discovery", "Got {} Neighbours from {:?}", rlp.at(0)?.item_count()?, &from); trace!(target: "discovery", "Got {} Neighbours from {:?}", rlp.at(0)?.item_count()?, &from);
@ -536,12 +536,12 @@ impl Discovery {
self.start(); self.start();
} }
pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn register_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket"); event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket");
Ok(()) Ok(())
} }
pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn update_registration<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
let registration = if !self.send_queue.is_empty() { let registration = if !self.send_queue.is_empty() {
Ready::readable() | Ready::writable() Ready::readable() | Ready::writable()
} else { } else {

View File

@ -14,12 +14,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{io, net, fmt};
use io::IoError; use io::IoError;
use rlp::*; use {rlp, ethkey, crypto, snappy};
use std::fmt;
use ethkey::Error as KeyError;
use crypto::Error as CryptoError;
use snappy;
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DisconnectReason pub enum DisconnectReason
@ -83,98 +80,80 @@ impl fmt::Display for DisconnectReason {
} }
} }
#[derive(Debug)] error_chain! {
/// Network error. foreign_links {
pub enum NetworkError { SocketIo(IoError) #[doc = "Socket IO error."];
/// Authentication error. Io(io::Error) #[doc = "Error concerning the Rust standard library's IO subsystem."];
Auth, AddressParse(net::AddrParseError) #[doc = "Error concerning the network address parsing subsystem."];
/// Unrecognised protocol. Decompression(snappy::InvalidInput) #[doc = "Decompression error."];
BadProtocol,
/// Message expired.
Expired,
/// Peer not found.
PeerNotFound,
/// Peer is diconnected.
Disconnect(DisconnectReason),
/// Invalid NodeId
InvalidNodeId,
/// Socket IO error.
Io(IoError),
/// Error concerning the network address parsing subsystem.
AddressParse(::std::net::AddrParseError),
/// Error concerning the network address resolution subsystem.
AddressResolve(Option<::std::io::Error>),
/// Error concerning the Rust standard library's IO subsystem.
StdIo(::std::io::Error),
/// Packet size is over the protocol limit.
OversizedPacket,
/// Decompression error.
Decompression(snappy::InvalidInput),
} }
impl fmt::Display for NetworkError { errors {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { #[doc = "Error concerning the network address resolution subsystem."]
use self::NetworkError::*; AddressResolve(err: Option<io::Error>) {
description("Failed to resolve network address"),
display("Failed to resolve network address {}", err.as_ref().map_or("".to_string(), |e| e.to_string())),
}
let msg = match *self { #[doc = "Authentication failure"]
Auth => "Authentication failure".into(), Auth {
BadProtocol => "Bad protocol".into(), description("Authentication failure"),
Expired => "Expired message".into(), display("Authentication failure"),
PeerNotFound => "Peer not found".into(), }
Disconnect(ref reason) => format!("Peer disconnected: {}", reason),
Io(ref err) => format!("Socket I/O error: {}", err),
AddressParse(ref err) => format!("{}", err),
AddressResolve(Some(ref err)) => format!("{}", err),
AddressResolve(_) => "Failed to resolve network address.".into(),
StdIo(ref err) => format!("{}", err),
InvalidNodeId => "Invalid node id".into(),
OversizedPacket => "Packet is too large".into(),
Decompression(ref err) => format!("Error decompressing packet: {}", err),
};
f.write_fmt(format_args!("Network error ({})", msg)) #[doc = "Unrecognised protocol"]
BadProtocol {
description("Bad protocol"),
display("Bad protocol"),
}
#[doc = "Expired message"]
Expired {
description("Expired message"),
display("Expired message"),
}
#[doc = "Peer not found"]
PeerNotFound {
description("Peer not found"),
display("Peer not found"),
}
#[doc = "Peer is disconnected"]
Disconnect(reason: DisconnectReason) {
description("Peer disconnected"),
display("Peer disconnected: {}", reason),
}
#[doc = "Invalid node id"]
InvalidNodeId {
description("Invalid node id"),
display("Invalid node id"),
}
#[doc = "Packet size is over the protocol limit"]
OversizedPacket {
description("Packet is too large"),
display("Packet is too large"),
}
} }
} }
impl From<DecoderError> for NetworkError { impl From<rlp::DecoderError> for Error {
fn from(_err: DecoderError) -> NetworkError { fn from(_err: rlp::DecoderError) -> Self {
NetworkError::Auth ErrorKind::Auth.into()
} }
} }
impl From<::std::io::Error> for NetworkError { impl From<ethkey::Error> for Error {
fn from(err: ::std::io::Error) -> NetworkError { fn from(_err: ethkey::Error) -> Self {
NetworkError::StdIo(err) ErrorKind::Auth.into()
} }
} }
impl From<IoError> for NetworkError { impl From<crypto::Error> for Error {
fn from(err: IoError) -> NetworkError { fn from(_err: crypto::Error) -> Self {
NetworkError::Io(err) ErrorKind::Auth.into()
}
}
impl From<KeyError> for NetworkError {
fn from(_err: KeyError) -> Self {
NetworkError::Auth
}
}
impl From<CryptoError> for NetworkError {
fn from(_err: CryptoError) -> NetworkError {
NetworkError::Auth
}
}
impl From<snappy::InvalidInput> for NetworkError {
fn from(err: snappy::InvalidInput) -> NetworkError {
NetworkError::Decompression(err)
}
}
impl From<::std::net::AddrParseError> for NetworkError {
fn from(err: ::std::net::AddrParseError) -> NetworkError {
NetworkError::AddressParse(err)
} }
} }
@ -187,13 +166,13 @@ fn test_errors() {
} }
assert_eq!(DisconnectReason::Unknown, r); assert_eq!(DisconnectReason::Unknown, r);
match <NetworkError as From<DecoderError>>::from(DecoderError::RlpIsTooBig) { match *<Error as From<rlp::DecoderError>>::from(rlp::DecoderError::RlpIsTooBig).kind() {
NetworkError::Auth => {}, ErrorKind::Auth => {},
_ => panic!("Unexpeceted error"), _ => panic!("Unexpeceted error"),
} }
match <NetworkError as From<CryptoError>>::from(CryptoError::InvalidMessage) { match *<Error as From<crypto::Error>>::from(crypto::Error::InvalidMessage).kind() {
NetworkError::Auth => {}, ErrorKind::Auth => {},
_ => panic!("Unexpeceted error"), _ => panic!("Unexpeceted error"),
} }
} }

View File

@ -24,11 +24,11 @@ use rlp::*;
use connection::{Connection}; use connection::{Connection};
use host::{HostInfo}; use host::{HostInfo};
use node_table::NodeId; use node_table::NodeId;
use error::*;
use stats::NetworkStats; use stats::NetworkStats;
use io::{IoContext, StreamToken}; use io::{IoContext, StreamToken};
use ethkey::{KeyPair, Public, Secret, recover, sign, Generator, Random}; use ethkey::{KeyPair, Public, Secret, recover, sign, Generator, Random};
use crypto::{ecdh, ecies}; use crypto::{ecdh, ecies};
use error::{Error, ErrorKind};
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
enum HandshakeState { enum HandshakeState {
@ -83,7 +83,7 @@ const ECIES_OVERHEAD: usize = 113;
impl Handshake { impl Handshake {
/// Create a new handshake object /// Create a new handshake object
pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, NetworkError> { pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, Error> {
Ok(Handshake { Ok(Handshake {
id: if let Some(id) = id { id.clone()} else { NodeId::new() }, id: if let Some(id) = id { id.clone()} else { NodeId::new() },
connection: Connection::new(token, socket, stats), connection: Connection::new(token, socket, stats),
@ -106,7 +106,7 @@ impl Handshake {
} }
/// Start a handhsake /// Start a handhsake
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone+ Sync + 'static { pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), Error> where Message: Send + Clone+ Sync + 'static {
self.originated = originated; self.originated = originated;
io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok(); io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok();
if originated { if originated {
@ -125,7 +125,7 @@ impl Handshake {
} }
/// Readable IO handler. Drives the state change. /// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
if !self.expired() { if !self.expired() {
while let Some(data) = self.connection.readable()? { while let Some(data) = self.connection.readable()? {
match self.state { match self.state {
@ -154,14 +154,14 @@ impl Handshake {
} }
/// Writabe IO handler. /// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
if !self.expired() { if !self.expired() {
self.connection.writable(io)?; self.connection.writable(io)?;
} }
Ok(()) Ok(())
} }
fn set_auth(&mut self, host_secret: &Secret, sig: &[u8], remote_public: &[u8], remote_nonce: &[u8], remote_version: u64) -> Result<(), NetworkError> { fn set_auth(&mut self, host_secret: &Secret, sig: &[u8], remote_public: &[u8], remote_nonce: &[u8], remote_version: u64) -> Result<(), Error> {
self.id.clone_from_slice(remote_public); self.id.clone_from_slice(remote_public);
self.remote_nonce.clone_from_slice(remote_nonce); self.remote_nonce.clone_from_slice(remote_nonce);
self.remote_version = remote_version; self.remote_version = remote_version;
@ -172,11 +172,11 @@ impl Handshake {
} }
/// Parse, validate and confirm auth message /// Parse, validate and confirm auth message
fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str());
if data.len() != V4_AUTH_PACKET_SIZE { if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target: "network", "Wrong auth packet size"); debug!(target: "network", "Wrong auth packet size");
return Err(From::from(NetworkError::BadProtocol)); return Err(ErrorKind::BadProtocol.into());
} }
self.auth_cipher = data.to_vec(); self.auth_cipher = data.to_vec();
match ecies::decrypt(secret, &[], data) { match ecies::decrypt(secret, &[], data) {
@ -193,7 +193,7 @@ impl Handshake {
let total = (((data[0] as u16) << 8 | (data[1] as u16)) as usize) + 2; let total = (((data[0] as u16) << 8 | (data[1] as u16)) as usize) + 2;
if total < V4_AUTH_PACKET_SIZE { if total < V4_AUTH_PACKET_SIZE {
debug!(target: "network", "Wrong EIP8 auth packet size"); debug!(target: "network", "Wrong EIP8 auth packet size");
return Err(From::from(NetworkError::BadProtocol)); return Err(ErrorKind::BadProtocol.into());
} }
let rest = total - data.len(); let rest = total - data.len();
self.state = HandshakeState::ReadingAuthEip8; self.state = HandshakeState::ReadingAuthEip8;
@ -203,7 +203,7 @@ impl Handshake {
Ok(()) Ok(())
} }
fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.auth_cipher.extend_from_slice(data); self.auth_cipher.extend_from_slice(data);
let auth = ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])?; let auth = ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])?;
@ -218,11 +218,11 @@ impl Handshake {
} }
/// Parse and validate ack message /// Parse and validate ack message
fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> { fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), Error> {
trace!(target: "network", "Received handshake ack from {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Received handshake ack from {:?}", self.connection.remote_addr_str());
if data.len() != V4_ACK_PACKET_SIZE { if data.len() != V4_ACK_PACKET_SIZE {
debug!(target: "network", "Wrong ack packet size"); debug!(target: "network", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol)); return Err(ErrorKind::BadProtocol.into());
} }
self.ack_cipher = data.to_vec(); self.ack_cipher = data.to_vec();
match ecies::decrypt(secret, &[], data) { match ecies::decrypt(secret, &[], data) {
@ -236,7 +236,7 @@ impl Handshake {
let total = (((data[0] as u16) << 8 | (data[1] as u16)) as usize) + 2; let total = (((data[0] as u16) << 8 | (data[1] as u16)) as usize) + 2;
if total < V4_ACK_PACKET_SIZE { if total < V4_ACK_PACKET_SIZE {
debug!(target: "network", "Wrong EIP8 ack packet size"); debug!(target: "network", "Wrong EIP8 ack packet size");
return Err(From::from(NetworkError::BadProtocol)); return Err(ErrorKind::BadProtocol.into());
} }
let rest = total - data.len(); let rest = total - data.len();
self.state = HandshakeState::ReadingAckEip8; self.state = HandshakeState::ReadingAckEip8;
@ -246,7 +246,7 @@ impl Handshake {
Ok(()) Ok(())
} }
fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> { fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), Error> {
trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.ack_cipher.extend_from_slice(data); self.ack_cipher.extend_from_slice(data);
let ack = ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..])?; let ack = ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..])?;
@ -259,7 +259,7 @@ impl Handshake {
} }
/// Sends auth message /// Sends auth message
fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len(); let len = data.len();
@ -286,7 +286,7 @@ impl Handshake {
} }
/// Sends ack message /// Sends ack message
fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len(); let len = data.len();
@ -305,7 +305,7 @@ impl Handshake {
} }
/// Sends EIP8 ack message /// Sends EIP8 ack message
fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str());
let mut rlp = RlpStream::new_list(3); let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public()); rlp.append(self.ecdhe.public());

View File

@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*; use std::ops::*;
use std::cmp::min; use std::cmp::min;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::io::{Read, Write, ErrorKind}; use std::io::{Read, Write, self};
use std::fs; use std::fs;
use ethkey::{KeyPair, Secret, Random, Generator}; use ethkey::{KeyPair, Secret, Random, Generator};
use hash::keccak; use hash::keccak;
@ -33,7 +33,6 @@ use bigint::hash::*;
use util::version; use util::version;
use rlp::*; use rlp::*;
use session::{Session, SessionInfo, SessionData}; use session::{Session, SessionInfo, SessionData};
use error::*;
use io::*; use io::*;
use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION, IpFilter}; use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION, IpFilter};
use node_table::*; use node_table::*;
@ -43,6 +42,7 @@ use ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner; use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use connection_filter::{ConnectionFilter, ConnectionDirection}; use connection_filter::{ConnectionFilter, ConnectionDirection};
use error::{Error, ErrorKind, DisconnectReason};
type Slab<T> = ::slab::Slab<T, usize>; type Slab<T> = ::slab::Slab<T, usize>;
@ -248,12 +248,12 @@ impl<'s> NetworkContext<'s> {
} }
/// Send a packet over the network to another peer. /// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data) self.send_protocol(self.protocol, peer, packet_id, data)
} }
/// Send a packet over the network to another peer using specified protocol. /// Send a packet over the network to another peer using specified protocol.
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
let session = self.resolve_session(peer); let session = self.resolve_session(peer);
if let Some(session) = session { if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?; session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
@ -264,9 +264,9 @@ impl<'s> NetworkContext<'s> {
} }
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing. /// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
assert!(self.session.is_some(), "Respond called without network context"); assert!(self.session.is_some(), "Respond called without network context");
self.session_id.map_or_else(|| Err(NetworkError::Expired), |id| self.send(id, packet_id, data)) self.session_id.map_or_else(|| Err(ErrorKind::Expired.into()), |id| self.send(id, packet_id, data))
} }
/// Get an IoChannel. /// Get an IoChannel.
@ -292,7 +292,7 @@ impl<'s> NetworkContext<'s> {
} }
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token. /// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), NetworkError> { pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
self.io.message(NetworkIoMessage::AddTimer { self.io.message(NetworkIoMessage::AddTimer {
token: token, token: token,
delay: ms, delay: ms,
@ -386,7 +386,7 @@ pub struct Host {
impl Host { impl Host {
/// Create a new instance /// Create a new instance
pub fn new(mut config: NetworkConfiguration, stats: Arc<NetworkStats>, filter: Option<Arc<ConnectionFilter>>) -> Result<Host, NetworkError> { pub fn new(mut config: NetworkConfiguration, stats: Arc<NetworkStats>, filter: Option<Arc<ConnectionFilter>>) -> Result<Host, Error> {
let mut listen_address = match config.listen_address { let mut listen_address = match config.listen_address {
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)), None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
Some(addr) => addr, Some(addr) => addr,
@ -468,7 +468,7 @@ impl Host {
} }
} }
pub fn add_reserved_node(&self, id: &str) -> Result<(), NetworkError> { pub fn add_reserved_node(&self, id: &str) -> Result<(), Error> {
let n = Node::from_str(id)?; let n = Node::from_str(id)?;
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
@ -512,7 +512,7 @@ impl Host {
} }
} }
pub fn remove_reserved_node(&self, id: &str) -> Result<(), NetworkError> { pub fn remove_reserved_node(&self, id: &str) -> Result<(), Error> {
let n = Node::from_str(id)?; let n = Node::from_str(id)?;
self.reserved_nodes.write().remove(&n.id); self.reserved_nodes.write().remove(&n.id);
@ -533,7 +533,7 @@ impl Host {
format!("{}", Node::new(info.id().clone(), info.local_endpoint.clone())) format!("{}", Node::new(info.id().clone(), info.local_endpoint.clone()))
} }
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), NetworkError> { pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
self.stopping.store(true, AtomicOrdering::Release); self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new(); let mut to_kill = Vec::new();
for e in self.sessions.write().iter_mut() { for e in self.sessions.write().iter_mut() {
@ -563,7 +563,7 @@ impl Host {
peers peers
} }
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), NetworkError> { fn init_public_interface(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
if self.info.read().public_endpoint.is_some() { if self.info.read().public_endpoint.is_some() {
return Ok(()); return Ok(());
} }
@ -746,7 +746,7 @@ impl Host {
} }
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))] #[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage>) -> Result<(), NetworkError> { fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
let nonce = self.info.write().next_nonce(); let nonce = self.info.write().next_nonce();
let mut sessions = self.sessions.write(); let mut sessions = self.sessions.write();
@ -775,7 +775,7 @@ impl Host {
let socket = match self.tcp_listener.lock().accept() { let socket = match self.tcp_listener.lock().accept() {
Ok((sock, _addr)) => sock, Ok((sock, _addr)) => sock,
Err(e) => { Err(e) => {
if e.kind() != ErrorKind::WouldBlock { if e.kind() != io::ErrorKind::WouldBlock {
debug!(target: "network", "Error accepting connection: {:?}", e); debug!(target: "network", "Error accepting connection: {:?}", e);
} }
break break
@ -821,7 +821,7 @@ impl Host {
match session_result { match session_result {
Err(e) => { Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol) = e { if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
if let Some(id) = s.id() { if let Some(id) = s.id() {
if !self.reserved_nodes.read().contains(id) { if !self.reserved_nodes.read().contains(id) {
self.nodes.write().mark_as_useless(id); self.nodes.write().mark_as_useless(id);

View File

@ -56,6 +56,7 @@
//TODO: use Poll from mio //TODO: use Poll from mio
#![allow(deprecated)] #![allow(deprecated)]
#![recursion_limit="128"]
extern crate ethcore_io as io; extern crate ethcore_io as io;
extern crate ethcore_util as util; extern crate ethcore_util as util;
@ -83,6 +84,9 @@ extern crate hash;
extern crate serde_json; extern crate serde_json;
extern crate snappy; extern crate snappy;
#[macro_use]
extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -109,7 +113,7 @@ mod tests;
pub use host::{HostInfo, PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration}; pub use host::{HostInfo, PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration};
pub use service::NetworkService; pub use service::NetworkService;
pub use error::NetworkError; pub use error::{Error, ErrorKind};
pub use stats::NetworkStats; pub use stats::NetworkStats;
pub use session::SessionInfo; pub use session::SessionInfo;
pub use connection_filter::{ConnectionFilter, ConnectionDirection}; pub use connection_filter::{ConnectionFilter, ConnectionDirection};

View File

@ -28,7 +28,7 @@ use std::io::{Read, Write};
use bigint::hash::*; use bigint::hash::*;
use rlp::*; use rlp::*;
use time::Tm; use time::Tm;
use NetworkError; use error::{Error, ErrorKind};
use {AllowIP, IpFilter}; use {AllowIP, IpFilter};
use discovery::{TableUpdates, NodeEntry}; use discovery::{TableUpdates, NodeEntry};
use ip_utils::*; use ip_utils::*;
@ -117,18 +117,18 @@ impl NodeEndpoint {
} }
impl FromStr for NodeEndpoint { impl FromStr for NodeEndpoint {
type Err = NetworkError; type Err = Error;
/// Create endpoint from string. Performs name resolution if given a host name. /// Create endpoint from string. Performs name resolution if given a host name.
fn from_str(s: &str) -> Result<NodeEndpoint, NetworkError> { fn from_str(s: &str) -> Result<NodeEndpoint, Error> {
let address = s.to_socket_addrs().map(|mut i| i.next()); let address = s.to_socket_addrs().map(|mut i| i.next());
match address { match address {
Ok(Some(a)) => Ok(NodeEndpoint { Ok(Some(a)) => Ok(NodeEndpoint {
address: a, address: a,
udp_port: a.port() udp_port: a.port()
}), }),
Ok(_) => Err(NetworkError::AddressResolve(None)), Ok(_) => Err(ErrorKind::AddressResolve(None).into()),
Err(e) => Err(NetworkError::AddressResolve(Some(e))) Err(e) => Err(ErrorKind::AddressResolve(Some(e)).into())
} }
} }
} }
@ -171,10 +171,10 @@ impl Display for Node {
} }
impl FromStr for Node { impl FromStr for Node {
type Err = NetworkError; type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
let (id, endpoint) = if s.len() > 136 && &s[0..8] == "enode://" && &s[136..137] == "@" { let (id, endpoint) = if s.len() > 136 && &s[0..8] == "enode://" && &s[136..137] == "@" {
(s[8..136].parse().map_err(|_| NetworkError::InvalidNodeId)?, NodeEndpoint::from_str(&s[137..])?) (s[8..136].parse().map_err(|_| ErrorKind::InvalidNodeId)?, NodeEndpoint::from_str(&s[137..])?)
} }
else { else {
(NodeId::new(), NodeEndpoint::from_str(s)?) (NodeId::new(), NodeEndpoint::from_str(s)?)
@ -363,7 +363,7 @@ impl Drop for NodeTable {
} }
/// Check if node url is valid /// Check if node url is valid
pub fn validate_node_url(url: &str) -> Option<NetworkError> { pub fn validate_node_url(url: &str) -> Option<Error> {
use std::str::FromStr; use std::str::FromStr;
match Node::from_str(url) { match Node::from_str(url) {
Ok(_) => None, Ok(_) => None,

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use {NetworkProtocolHandler, NetworkConfiguration, NonReservedPeerMode}; use {NetworkProtocolHandler, NetworkConfiguration, NonReservedPeerMode};
use error::NetworkError; use error::Error;
use host::{Host, NetworkContext, NetworkIoMessage, PeerId, ProtocolId}; use host::{Host, NetworkContext, NetworkIoMessage, PeerId, ProtocolId};
use stats::NetworkStats; use stats::NetworkStats;
use io::*; use io::*;
@ -54,7 +54,7 @@ pub struct NetworkService {
impl NetworkService { impl NetworkService {
/// Starts IO event loop /// Starts IO event loop
pub fn new(config: NetworkConfiguration, filter: Option<Arc<ConnectionFilter>>) -> Result<NetworkService, NetworkError> { pub fn new(config: NetworkConfiguration, filter: Option<Arc<ConnectionFilter>>) -> Result<NetworkService, Error> {
let host_handler = Arc::new(HostHandler { public_url: RwLock::new(None) }); let host_handler = Arc::new(HostHandler { public_url: RwLock::new(None) });
let io_service = IoService::<NetworkIoMessage>::start()?; let io_service = IoService::<NetworkIoMessage>::start()?;
@ -72,7 +72,7 @@ impl NetworkService {
} }
/// Regiter a new protocol handler with the event loop. /// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), NetworkError> { pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), Error> {
self.io_service.send_message(NetworkIoMessage::AddHandler { self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler, handler: handler,
protocol: protocol, protocol: protocol,
@ -115,7 +115,7 @@ impl NetworkService {
} }
/// Start network IO /// Start network IO
pub fn start(&self) -> Result<(), NetworkError> { pub fn start(&self) -> Result<(), Error> {
let mut host = self.host.write(); let mut host = self.host.write();
if host.is_none() { if host.is_none() {
let h = Arc::new(Host::new(self.config.clone(), self.stats.clone(), self.filter.clone())?); let h = Arc::new(Host::new(self.config.clone(), self.stats.clone(), self.filter.clone())?);
@ -131,7 +131,7 @@ impl NetworkService {
} }
/// Stop network IO /// Stop network IO
pub fn stop(&self) -> Result<(), NetworkError> { pub fn stop(&self) -> Result<(), Error> {
let mut host = self.host.write(); let mut host = self.host.write();
if let Some(ref host) = *host { if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
@ -147,7 +147,7 @@ impl NetworkService {
} }
/// Try to add a reserved peer. /// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), NetworkError> { pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let host = self.host.read(); let host = self.host.read();
if let Some(ref host) = *host { if let Some(ref host) = *host {
host.add_reserved_node(peer) host.add_reserved_node(peer)
@ -157,7 +157,7 @@ impl NetworkService {
} }
/// Try to remove a reserved peer. /// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), NetworkError> { pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let host = self.host.read(); let host = self.host.read();
if let Some(ref host) = *host { if let Some(ref host) = *host {
host.remove_reserved_node(peer) host.remove_reserved_node(peer)

View File

@ -28,7 +28,7 @@ use rlp::*;
use connection::{EncryptedConnection, Packet, Connection, MAX_PAYLOAD_SIZE}; use connection::{EncryptedConnection, Packet, Connection, MAX_PAYLOAD_SIZE};
use handshake::Handshake; use handshake::Handshake;
use io::{IoContext, StreamToken}; use io::{IoContext, StreamToken};
use error::{NetworkError, DisconnectReason}; use error::{Error, ErrorKind, DisconnectReason};
use host::*; use host::*;
use node_table::NodeId; use node_table::NodeId;
use stats::NetworkStats; use stats::NetworkStats;
@ -178,7 +178,7 @@ impl Session {
/// Create a new session out of comepleted handshake. This clones the handshake connection object /// Create a new session out of comepleted handshake. This clones the handshake connection object
/// and leaves the handhsake in limbo to be deregistered from the event loop. /// and leaves the handhsake in limbo to be deregistered from the event loop.
pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>, pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>,
nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, NetworkError> nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, Error>
where Message: Send + Clone + Sync + 'static { where Message: Send + Clone + Sync + 'static {
let originated = id.is_some(); let originated = id.is_some();
let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake"); let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake");
@ -206,7 +206,7 @@ impl Session {
}) })
} }
fn complete_handshake<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone { fn complete_handshake<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), Error> where Message: Send + Sync + Clone {
let connection = if let State::Handshake(ref mut h) = self.state { let connection = if let State::Handshake(ref mut h) = self.state {
self.info.id = Some(h.id.clone()); self.info.id = Some(h.id.clone());
self.info.remote_address = h.connection.remote_addr_str(); self.info.remote_address = h.connection.remote_addr_str();
@ -260,7 +260,7 @@ impl Session {
} }
/// Readable IO handler. Returns packet data if available. /// Readable IO handler. Returns packet data if available.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<SessionData, NetworkError> where Message: Send + Sync + Clone { pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<SessionData, Error> where Message: Send + Sync + Clone {
if self.expired() { if self.expired() {
return Ok(SessionData::None) return Ok(SessionData::None)
} }
@ -291,7 +291,7 @@ impl Session {
} }
/// Writable IO handler. Sends pending packets. /// Writable IO handler. Sends pending packets.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), Error> where Message: Send + Sync + Clone {
match self.state { match self.state {
State::Handshake(ref mut h) => h.writable(io), State::Handshake(ref mut h) => h.writable(io),
State::Session(ref mut s) => s.writable(io), State::Session(ref mut s) => s.writable(io),
@ -309,7 +309,7 @@ impl Session {
} }
/// Register the session socket with the event loop /// Register the session socket with the event loop
pub fn register_socket<Host:Handler<Timeout = Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn register_socket<Host:Handler<Timeout = Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
if self.expired() { if self.expired() {
return Ok(()); return Ok(());
} }
@ -318,26 +318,26 @@ impl Session {
} }
/// Update registration with the event loop. Should be called at the end of the IO handler. /// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
self.connection().update_socket(reg, event_loop)?; self.connection().update_socket(reg, event_loop)?;
Ok(()) Ok(())
} }
/// Delete registration /// Delete registration
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), NetworkError> { pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
self.connection().deregister_socket(event_loop)?; self.connection().deregister_socket(event_loop)?;
Ok(()) Ok(())
} }
/// Send a protocol packet to peer. /// Send a protocol packet to peer.
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: Option<[u8; 3]>, packet_id: u8, data: &[u8]) -> Result<(), NetworkError> pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: Option<[u8; 3]>, packet_id: u8, data: &[u8]) -> Result<(), Error>
where Message: Send + Sync + Clone { where Message: Send + Sync + Clone {
if protocol.is_some() && (self.info.capabilities.is_empty() || !self.had_hello) { if protocol.is_some() && (self.info.capabilities.is_empty() || !self.had_hello) {
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {:?}, packet: {}", self.token(), protocol.as_ref().map(|p| str::from_utf8(&p[..]).unwrap_or("??")), packet_id); debug!(target: "network", "Sending to unconfirmed session {}, protocol: {:?}, packet: {}", self.token(), protocol.as_ref().map(|p| str::from_utf8(&p[..]).unwrap_or("??")), packet_id);
return Err(From::from(NetworkError::BadProtocol)); bail!(ErrorKind::BadProtocol);
} }
if self.expired() { if self.expired() {
return Err(From::from(NetworkError::Expired)); return Err(ErrorKind::Expired.into());
} }
let mut i = 0usize; let mut i = 0usize;
let pid = match protocol { let pid = match protocol {
@ -359,7 +359,7 @@ impl Session {
let mut payload = data; // create a reference with local lifetime let mut payload = data; // create a reference with local lifetime
if self.compression { if self.compression {
if payload.len() > MAX_PAYLOAD_SIZE { if payload.len() > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket); bail!(ErrorKind::OversizedPacket);
} }
let len = snappy::compress_into(&payload, &mut compressed); let len = snappy::compress_into(&payload, &mut compressed);
trace!(target: "network", "compressed {} to {}", payload.len(), len); trace!(target: "network", "compressed {} to {}", payload.len(), len);
@ -406,19 +406,19 @@ impl Session {
} }
} }
fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError> fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, Error>
where Message: Send + Sync + Clone { where Message: Send + Sync + Clone {
if packet.data.len() < 2 { if packet.data.len() < 2 {
return Err(From::from(NetworkError::BadProtocol)); return Err(ErrorKind::BadProtocol.into());
} }
let packet_id = packet.data[0]; let packet_id = packet.data[0];
if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello { if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello {
return Err(From::from(NetworkError::BadProtocol)); return Err(ErrorKind::BadProtocol.into());
} }
let data = if self.compression { let data = if self.compression {
let compressed = &packet.data[1..]; let compressed = &packet.data[1..];
if snappy::decompressed_len(&compressed)? > MAX_PAYLOAD_SIZE { if snappy::decompressed_len(&compressed)? > MAX_PAYLOAD_SIZE {
return Err(NetworkError::OversizedPacket); bail!(ErrorKind::OversizedPacket);
} }
snappy::decompress(&compressed)? snappy::decompress(&compressed)?
} else { } else {
@ -436,7 +436,7 @@ impl Session {
if self.had_hello { if self.had_hello {
debug!(target:"network", "Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason)); debug!(target:"network", "Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason));
} }
Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason)))) Err(ErrorKind::Disconnect(DisconnectReason::from_u8(reason)).into())
} }
PACKET_PING => { PACKET_PING => {
self.send_pong(io)?; self.send_pong(io)?;
@ -484,7 +484,7 @@ impl Session {
} }
} }
fn write_hello<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone { fn write_hello<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), Error> where Message: Send + Sync + Clone {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append_raw(&[PACKET_HELLO as u8], 0); rlp.append_raw(&[PACKET_HELLO as u8], 0);
rlp.begin_list(5) rlp.begin_list(5)
@ -496,7 +496,7 @@ impl Session {
self.send(io, &rlp.drain()) self.send(io, &rlp.drain())
} }
fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), NetworkError> fn read_hello<Message>(&mut self, io: &IoContext<Message>, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error>
where Message: Send + Sync + Clone { where Message: Send + Sync + Clone {
let protocol = rlp.val_at::<u32>(0)?; let protocol = rlp.val_at::<u32>(0)?;
let client_version = rlp.val_at::<String>(1)?; let client_version = rlp.val_at::<String>(1)?;
@ -558,29 +558,29 @@ impl Session {
} }
/// Senf ping packet /// Senf ping packet
pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone { pub fn send_ping<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Sync + Clone {
self.send_packet(io, None, PACKET_PING, &EMPTY_LIST_RLP)?; self.send_packet(io, None, PACKET_PING, &EMPTY_LIST_RLP)?;
self.ping_time_ns = time::precise_time_ns(); self.ping_time_ns = time::precise_time_ns();
self.pong_time_ns = None; self.pong_time_ns = None;
Ok(()) Ok(())
} }
fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Sync + Clone { fn send_pong<Message>(&mut self, io: &IoContext<Message>) -> Result<(), Error> where Message: Send + Sync + Clone {
self.send_packet(io, None, PACKET_PONG, &EMPTY_LIST_RLP) self.send_packet(io, None, PACKET_PONG, &EMPTY_LIST_RLP)
} }
/// Disconnect this session /// Disconnect this session
pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> NetworkError where Message: Send + Sync + Clone { pub fn disconnect<Message>(&mut self, io: &IoContext<Message>, reason: DisconnectReason) -> Error where Message: Send + Sync + Clone {
if let State::Session(_) = self.state { if let State::Session(_) = self.state {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.begin_list(1); rlp.begin_list(1);
rlp.append(&(reason as u32)); rlp.append(&(reason as u32));
self.send_packet(io, None, PACKET_DISCONNECT, &rlp.drain()).ok(); self.send_packet(io, None, PACKET_DISCONNECT, &rlp.drain()).ok();
} }
NetworkError::Disconnect(reason) ErrorKind::Disconnect(reason).into()
} }
fn send<Message>(&mut self, io: &IoContext<Message>, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Sync + Clone { fn send<Message>(&mut self, io: &IoContext<Message>, data: &[u8]) -> Result<(), Error> where Message: Send + Sync + Clone {
match self.state { match self.state {
State::Handshake(_) => { State::Handshake(_) => {
warn!(target:"network", "Unexpected send request"); warn!(target:"network", "Unexpected send request");

View File

@ -5,3 +5,4 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
libc = "0.2.7" libc = "0.2.7"
rocksdb = { git = "https://github.com/paritytech/rust-rocksdb" }

View File

@ -24,6 +24,7 @@ const SNAPPY_OK: c_int = 0;
const SNAPPY_INVALID_INPUT: c_int = 1; const SNAPPY_INVALID_INPUT: c_int = 1;
const SNAPPY_BUFFER_TOO_SMALL: c_int = 2; const SNAPPY_BUFFER_TOO_SMALL: c_int = 2;
#[link(name = "snappy", kind = "static")]
extern { extern {
fn snappy_compress( fn snappy_compress(
input: *const c_char, input: *const c_char,

View File

@ -23,7 +23,7 @@ use std::time::{Duration, SystemTime};
use std::sync::Arc; use std::sync::Arc;
use bigint::hash::{H256, H512}; use bigint::hash::{H256, H512};
use network::{HostInfo, NetworkContext, NetworkError, NodeId, PeerId, ProtocolId, TimerToken}; use network::{self, HostInfo, NetworkContext, NodeId, PeerId, ProtocolId, TimerToken};
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use rlp::{DecoderError, RlpStream, UntrustedRlp}; use rlp::{DecoderError, RlpStream, UntrustedRlp};
@ -79,7 +79,7 @@ pub trait MessageHandler: Send + Sync {
#[derive(Debug)] #[derive(Debug)]
enum Error { enum Error {
Decoder(DecoderError), Decoder(DecoderError),
Network(NetworkError), Network(network::Error),
Message(MessageError), Message(MessageError),
UnknownPeer(PeerId), UnknownPeer(PeerId),
UnexpectedMessage, UnexpectedMessage,
@ -92,8 +92,8 @@ impl From<DecoderError> for Error {
} }
} }
impl From<NetworkError> for Error { impl From<network::Error> for Error {
fn from(err: NetworkError) -> Self { fn from(err: network::Error) -> Self {
Error::Network(err) Error::Network(err)
} }
} }