Abstract devp2p (#8048)

* Rename ethcore-network to ethcore-network-devp2p

* Fix typo

* Extract generic traits into util/network

* Simplify util/network

* Fix devp2p tests

* Remove old feature

* Fix RPC tests
This commit is contained in:
Pierre Krieger 2018-03-05 11:56:35 +01:00 committed by Rando
parent 47a02480c4
commit eeee90def5
25 changed files with 568 additions and 397 deletions

18
Cargo.lock generated
View File

@ -621,6 +621,20 @@ dependencies = [
[[package]]
name = "ethcore-network"
version = "1.11.0"
dependencies = [
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-io 1.11.0",
"ethcrypto 0.1.0",
"ethereum-types 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethkey 0.3.0",
"ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.2.1",
"snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)",
]
[[package]]
name = "ethcore-network-devp2p"
version = "1.11.0"
dependencies = [
"ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -628,6 +642,7 @@ dependencies = [
"ethcore-bytes 0.1.0",
"ethcore-io 1.11.0",
"ethcore-logger 1.11.0",
"ethcore-network 1.11.0",
"ethcrypto 0.1.0",
"ethereum-types 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethkey 0.3.0",
@ -849,6 +864,7 @@ dependencies = [
"ethcore-io 1.11.0",
"ethcore-light 1.11.0",
"ethcore-network 1.11.0",
"ethcore-network-devp2p 1.11.0",
"ethcore-transaction 0.1.0",
"ethereum-types 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethkey 0.3.0",
@ -1712,7 +1728,7 @@ dependencies = [
"ethcore 1.11.0",
"ethcore-bytes 0.1.0",
"ethcore-io 1.11.0",
"ethcore-network 1.11.0",
"ethcore-network-devp2p 1.11.0",
"ethereum-types 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb-memorydb 0.1.0",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -47,7 +47,7 @@ pub trait IoContext {
}
impl<'a> IoContext for NetworkContext<'a> {
impl<T> IoContext for T where T: ?Sized + NetworkContext {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
if let Err(e) = self.send(peer, packet_id, packet_body) {
debug!(target: "pip", "Error sending packet to peer {}: {}", peer, e);

View File

@ -1089,23 +1089,23 @@ impl NetworkProtocolHandler for LightProtocol {
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.handle_packet(io, peer, packet_id, data);
self.handle_packet(&io, peer, packet_id, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
self.on_connect(peer, io);
self.on_connect(peer, &io);
}
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
self.on_disconnect(*peer, io);
self.on_disconnect(*peer, &io);
}
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
TIMEOUT => self.timeout_check(io),
TICK_TIMEOUT => self.tick_handlers(io),
PROPAGATE_TIMEOUT => self.propagate_transactions(io),
RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(io),
TIMEOUT => self.timeout_check(&io),
TICK_TIMEOUT => self.tick_handlers(&io),
PROPAGATE_TIMEOUT => self.propagate_transactions(&io),
RECALCULATE_COSTS_TIMEOUT => self.begin_new_cost_period(&io),
_ => warn!(target: "pip", "received timeout on unknown token {}", timer),
}
}

View File

@ -9,7 +9,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
ethcore = { path = ".."}
ethcore-bytes = { path = "../../util/bytes" }
ethcore-network = { path = "../../util/network" }
ethcore-network-devp2p = { path = "../../util/network-devp2p" }
ethereum-types = "0.2"
log = "0.3"
parking_lot = "0.5"

View File

@ -19,7 +19,7 @@
extern crate ethabi;
extern crate ethcore;
extern crate ethcore_bytes as bytes;
extern crate ethcore_network as network;
extern crate ethcore_network_devp2p as network;
extern crate ethereum_types;
extern crate lru_cache;
extern crate parking_lot;

View File

@ -10,6 +10,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
ethcore-bytes = { path = "../util/bytes" }
ethcore-network = { path = "../util/network" }
ethcore-network-devp2p = { path = "../util/network-devp2p" }
ethcore-io = { path = "../util/io" }
ethcore-light = { path = "../ethcore/light" }
ethcore-transaction = { path = "../ethcore/transaction" }

View File

@ -18,8 +18,9 @@ use std::sync::Arc;
use std::collections::{HashMap, BTreeMap};
use std::io;
use bytes::Bytes;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, Error, ErrorKind, ConnectionFilter};
use devp2p::{NetworkService, ConnectionFilter};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, Error, ErrorKind};
use ethereum_types::{H256, H512, U256};
use io::{TimerToken};
use ethcore::ethstore::ethkey::Secret;
@ -393,7 +394,7 @@ impl ChainNotify for EthSync {
};
let chain_info = self.eth_handler.chain.chain_info();
light_proto.make_announcement(context, Announcement {
light_proto.make_announcement(&context, Announcement {
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
head_td: chain_info.total_difficulty,
@ -737,7 +738,7 @@ impl LightSync {
{
self.network.with_context_eval(
self.subprotocol_name,
move |ctx| self.proto.with_context(ctx, f),
move |ctx| self.proto.with_context(&ctx, f),
)
}
}

View File

@ -22,6 +22,7 @@
//!
extern crate ethcore_network as network;
extern crate ethcore_network_devp2p as devp2p;
extern crate ethcore_bytes as bytes;
extern crate ethcore_io as io;
extern crate ethcore_transaction as transaction;
@ -68,4 +69,5 @@ mod api;
pub use api::*;
pub use chain::{SyncStatus, SyncState};
pub use network::{validate_node_url, NonReservedPeerMode, Error, ErrorKind, ConnectionFilter, ConnectionDirection};
pub use devp2p::{validate_node_url, ConnectionFilter, ConnectionDirection};
pub use network::{NonReservedPeerMode, Error, ErrorKind};

View File

@ -61,19 +61,19 @@ pub trait SyncIo {
}
/// Wraps `NetworkContext` and the blockchain client
pub struct NetSyncIo<'s, 'h> where 'h: 's {
network: &'s NetworkContext<'h>,
pub struct NetSyncIo<'s> {
network: &'s NetworkContext,
chain: &'s BlockChainClient,
snapshot_service: &'s SnapshotService,
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>,
}
impl<'s, 'h> NetSyncIo<'s, 'h> {
impl<'s> NetSyncIo<'s> {
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
pub fn new(network: &'s NetworkContext<'h>,
pub fn new(network: &'s NetworkContext,
chain: &'s BlockChainClient,
snapshot_service: &'s SnapshotService,
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>) -> NetSyncIo<'s, 'h> {
chain_overlay: &'s RwLock<HashMap<BlockNumber, Bytes>>) -> NetSyncIo<'s> {
NetSyncIo {
network: network,
chain: chain,
@ -83,7 +83,7 @@ impl<'s, 'h> NetSyncIo<'s, 'h> {
}
}
impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
impl<'s> SyncIo for NetSyncIo<'s> {
fn disable_peer(&mut self, peer_id: PeerId) {
self.network.disable_peer(peer_id);
}

View File

@ -0,0 +1,44 @@
[package]
description = "DevP2P implementation of the ethcore network library"
homepage = "http://parity.io"
license = "GPL-3.0"
name = "ethcore-network-devp2p"
version = "1.11.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
log = "0.3"
mio = "0.6.8"
bytes = "0.4"
rand = "0.4"
time = "0.1.34"
tiny-keccak = "1.3"
rust-crypto = "0.2.34"
slab = "0.2"
igd = "0.6"
libc = "0.2.7"
parking_lot = "0.5"
ansi_term = "0.10"
rustc-hex = "1.0"
ethcore-io = { path = "../io" }
ethcore-bytes = { path = "../bytes" }
ethcore-network = { path = "../network" }
ethereum-types = "0.2"
ethkey = { path = "../../ethkey" }
ethcrypto = { path = "../../ethcrypto" }
rlp = { path = "../rlp" }
path = { path = "../path" }
ethcore-logger = { path ="../../logger" }
ipnetwork = "0.12.6"
keccak-hash = { path = "../hash" }
snappy = { git = "https://github.com/paritytech/rust-snappy" }
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
error-chain = { version = "0.11", default-features = false }
[dev-dependencies]
tempdir = "0.3"
[features]
default = []

View File

@ -36,7 +36,7 @@ use rcrypto::buffer::*;
use tiny_keccak::Keccak;
use bytes::{Buf, BufMut};
use crypto;
use error::{Error, ErrorKind};
use network::{Error, ErrorKind};
const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;

View File

@ -27,10 +27,10 @@ use time;
use ethereum_types::{H256, H520};
use rlp::*;
use node_table::*;
use error::{Error, ErrorKind};
use network::{Error, ErrorKind};
use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover};
use IpFilter;
use network::IpFilter;
use PROTOCOL_VERSION;

View File

@ -22,13 +22,12 @@ use ethereum_types::{H256, H520};
use ethcore_bytes::Bytes;
use rlp::*;
use connection::{Connection};
use host::{HostInfo};
use node_table::NodeId;
use stats::NetworkStats;
use io::{IoContext, StreamToken};
use ethkey::{KeyPair, Public, Secret, recover, sign, Generator, Random};
use crypto::{ecdh, ecies};
use error::{Error, ErrorKind};
use network::{Error, ErrorKind, HostInfo};
#[derive(PartialEq, Eq, Debug)]
enum HandshakeState {

View File

@ -31,17 +31,20 @@ use mio::deprecated::{EventLoop};
use mio::tcp::*;
use ethereum_types::H256;
use rlp::*;
use session::{Session, SessionInfo, SessionData};
use session::{Session, SessionData};
use io::*;
use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION, IpFilter};
use PROTOCOL_VERSION;
use node_table::*;
use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId};
use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait};
use network::HostInfo as HostInfoTrait;
use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler};
use stats::NetworkStats;
use discovery::{Discovery, TableUpdates, NodeEntry};
use ip_utils::{map_external_address, select_public_address};
use path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use connection_filter::{ConnectionFilter, ConnectionDirection};
use error::{Error, ErrorKind, DisconnectReason};
type Slab<T> = ::slab::Slab<T, usize>;
@ -72,132 +75,6 @@ const DISCOVERY_ROUND_TIMEOUT: u64 = 300;
// for NODE_TABLE TimerToken
const NODE_TABLE_TIMEOUT: u64 = 300_000;
#[derive(Debug, PartialEq, Clone)]
/// Network service configuration
pub struct NetworkConfiguration {
/// Directory path to store general network configuration. None means nothing will be saved
pub config_path: Option<String>,
/// Directory path to store network-specific configuration. None means nothing will be saved
pub net_config_path: Option<String>,
/// IP address to listen for incoming connections. Listen to all connections by default
pub listen_address: Option<SocketAddr>,
/// IP address to advertise. Detected automatically if none.
pub public_address: Option<SocketAddr>,
/// Port for UDP connections, same as TCP by default
pub udp_port: Option<u16>,
/// Enable NAT configuration
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// Use provided node key instead of default
pub use_secret: Option<Secret>,
/// Minimum number of connected peers to maintain
pub min_peers: u32,
/// Maximum allowed number of peers
pub max_peers: u32,
/// Maximum handshakes
pub max_handshakes: u32,
/// Reserved protocols. Peers with <key> protocol get additional <value> connection slots.
pub reserved_protocols: HashMap<ProtocolId, u32>,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// IP filter
pub ip_filter: IpFilter,
/// Client identifier
pub client_version: String,
}
impl Default for NetworkConfiguration {
fn default() -> Self {
NetworkConfiguration::new()
}
}
impl NetworkConfiguration {
/// Create a new instance of default settings.
pub fn new() -> Self {
NetworkConfiguration {
config_path: None,
net_config_path: None,
listen_address: None,
public_address: None,
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
boot_nodes: Vec::new(),
use_secret: None,
min_peers: 25,
max_peers: 50,
max_handshakes: 64,
reserved_protocols: HashMap::new(),
ip_filter: IpFilter::default(),
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "Parity-network".into(),
}
}
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
config
}
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
config.nat_enabled = false;
config
}
}
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = [u8; 3];
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
pub enum NetworkIoMessage {
/// Register a new protocol handler.
AddHandler {
/// Handler shared instance.
handler: Arc<NetworkProtocolHandler + Sync>,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions.
versions: Vec<u8>,
/// Number of packet IDs reserved by the protocol.
packet_count: u8,
},
/// Register a new protocol timer
AddTimer {
/// Protocol Id.
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay in milliseconds.
delay: u64,
},
/// Initliaze public interface.
InitPublicInterface,
/// Disconnect a peer.
Disconnect(PeerId),
/// Disconnect and temporary disable peer.
DisablePeer(PeerId),
/// Network has been started with the host as the given enode.
NetworkStarted(String),
}
/// Local (temporary) peer session ID.
pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo {
@ -248,14 +125,14 @@ impl<'s> NetworkContext<'s> {
_ => self.sessions.read().get(peer).cloned(),
}
}
}
/// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
impl<'s> NetworkContextTrait for NetworkContext<'s> {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
/// Send a packet over the network to another peer using specified protocol.
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
@ -265,36 +142,30 @@ impl<'s> NetworkContext<'s> {
Ok(())
}
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
assert!(self.session.is_some(), "Respond called without network context");
self.session_id.map_or_else(|| Err(ErrorKind::Expired.into()), |id| self.send(id, packet_id, data))
}
/// Get an IoChannel.
pub fn io_channel(&self) -> IoChannel<NetworkIoMessage> {
fn io_channel(&self) -> IoChannel<NetworkIoMessage> {
self.io.channel()
}
/// Disconnect a peer and prevent it from connecting again.
pub fn disable_peer(&self, peer: PeerId) {
fn disable_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
/// Disconnect peer. Reconnect can be attempted later.
pub fn disconnect_peer(&self, peer: PeerId) {
fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
/// Check if the session is still active.
pub fn is_expired(&self) -> bool {
fn is_expired(&self) -> bool {
self.session.as_ref().map_or(false, |s| s.lock().expired())
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
self.io.message(NetworkIoMessage::AddTimer {
token: token,
delay: ms,
@ -303,24 +174,20 @@ impl<'s> NetworkContext<'s> {
Ok(())
}
/// Returns peer identification string
pub fn peer_client_version(&self, peer: PeerId) -> String {
fn peer_client_version(&self, peer: PeerId) -> String {
self.resolve_session(peer).map_or("unknown".to_owned(), |s| s.lock().info.client_version.clone())
}
/// Returns information on p2p session
pub fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
self.resolve_session(peer).map(|s| s.lock().info.clone())
}
/// Returns max version for a given protocol.
pub fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
/// Returns this object's subprotocol name.
pub fn subprotocol_name(&self) -> ProtocolId { self.protocol }
fn subprotocol_name(&self) -> ProtocolId { self.protocol }
}
/// Shared host information
@ -341,24 +208,21 @@ pub struct HostInfo {
pub public_endpoint: Option<NodeEndpoint>,
}
impl HostInfo {
/// Returns public key
pub fn id(&self) -> &NodeId {
impl HostInfoTrait for HostInfo {
fn id(&self) -> &NodeId {
self.keys.public()
}
/// Returns secret key
pub fn secret(&self) -> &Secret {
fn secret(&self) -> &Secret {
self.keys.secret()
}
/// Increments and returns connection nonce.
pub fn next_nonce(&mut self) -> H256 {
fn next_nonce(&mut self) -> H256 {
self.nonce = keccak(&self.nonce);
self.nonce
}
pub fn client_version(&self) -> &str {
fn client_version(&self) -> &str {
&self.config.client_version
}
}
@ -378,7 +242,7 @@ pub struct Host {
sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery>>,
nodes: RwLock<NodeTable>,
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler>>>,
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler + Sync>>>,
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
timer_counter: RwLock<usize>,
stats: Arc<NetworkStats>,
@ -1006,14 +870,14 @@ impl Host {
self.nodes.write().update(node_changes, &*self.reserved_nodes.read());
}
pub fn with_context<F>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) where F: FnOnce(&NetworkContext) {
pub fn with_context<F>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) where F: FnOnce(&NetworkContextTrait) {
let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context);
}
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) -> T where F: FnOnce(&NetworkContext) -> T {
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) -> T where F: FnOnce(&NetworkContextTrait) -> T {
let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);

View File

@ -0,0 +1,118 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Network and general IO module.
//!
//! Example usage for creating a network service and adding an IO handler:
//!
//! ```rust
//! extern crate ethcore_network as net;
//! extern crate ethcore_network_devp2p as devp2p;
//! use net::*;
//! use devp2p::NetworkService;
//! use std::sync::Arc;
//!
//! struct MyHandler;
//!
//! impl NetworkProtocolHandler for MyHandler {
//! fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
//! io.register_timer(0, 1000);
//! }
//!
//! fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
//! println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer);
//! }
//!
//! fn connected(&self, io: &NetworkContext, peer: &PeerId) {
//! println!("Connected {}", peer);
//! }
//!
//! fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
//! println!("Disconnected {}", peer);
//! }
//! }
//!
//! fn main () {
//! let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
//! service.start().expect("Error starting service");
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
//!
//! // Wait for quit condition
//! // ...
//! // Drop the service
//! }
//! ```
//TODO: use Poll from mio
#![allow(deprecated)]
extern crate ethcore_io as io;
extern crate ethcore_bytes;
extern crate ethereum_types;
extern crate parking_lot;
extern crate mio;
extern crate tiny_keccak;
extern crate crypto as rcrypto;
extern crate rand;
extern crate time;
extern crate ansi_term; //TODO: remove this
extern crate rustc_hex;
extern crate igd;
extern crate libc;
extern crate slab;
extern crate ethkey;
extern crate ethcrypto as crypto;
extern crate rlp;
extern crate bytes;
extern crate path;
extern crate ethcore_logger;
extern crate ethcore_network as network;
extern crate ipnetwork;
extern crate keccak_hash as hash;
extern crate serde;
extern crate serde_json;
extern crate snappy;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
extern crate tempdir;
mod host;
mod connection;
mod handshake;
mod session;
mod discovery;
mod service;
mod node_table;
mod stats;
mod ip_utils;
mod connection_filter;
pub use service::NetworkService;
pub use stats::NetworkStats;
pub use connection_filter::{ConnectionFilter, ConnectionDirection};
pub use host::NetworkContext;
pub use io::TimerToken;
pub use node_table::{validate_node_url, NodeId};
const PROTOCOL_VERSION: u32 = 5;

View File

@ -23,8 +23,7 @@ use std::str::FromStr;
use std::{fs, mem, slice};
use ethereum_types::H512;
use rlp::*;
use error::{Error, ErrorKind};
use {AllowIP, IpFilter};
use network::{Error, ErrorKind, AllowIP, IpFilter};
use discovery::{TableUpdates, NodeEntry};
use ip_utils::*;
use serde_json;

View File

@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use {NetworkProtocolHandler, NetworkConfiguration, NonReservedPeerMode};
use error::Error;
use host::{Host, NetworkContext, NetworkIoMessage, PeerId, ProtocolId};
use network::{Error, NetworkConfiguration, NetworkProtocolHandler, NonReservedPeerMode};
use network::{NetworkContext, PeerId, ProtocolId, NetworkIoMessage};
use host::Host;
use stats::NetworkStats;
use io::*;
use parking_lot::RwLock;

View File

@ -16,7 +16,6 @@
use std::{str, io};
use std::net::SocketAddr;
use std::cmp::Ordering;
use std::sync::*;
use std::collections::HashMap;
@ -28,7 +27,8 @@ use rlp::*;
use connection::{EncryptedConnection, Packet, Connection, MAX_PAYLOAD_SIZE};
use handshake::Handshake;
use io::{IoContext, StreamToken};
use error::{Error, ErrorKind, DisconnectReason};
use network::{Error, ErrorKind, DisconnectReason, SessionInfo, ProtocolId, PeerCapabilityInfo};
use network::{SessionCapabilityInfo, HostInfo as HostInfoTrait};
use host::*;
use node_table::NodeId;
use stats::NetworkStats;
@ -90,81 +90,6 @@ pub enum SessionData {
Continue,
}
/// Shared session information
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// Peer public key
pub id: Option<NodeId>,
/// Peer client ID
pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32,
/// Session protocol capabilities
pub capabilities: Vec<SessionCapabilityInfo>,
/// Peer protocol capabilities
pub peer_capabilities: Vec<PeerCapabilityInfo>,
/// Peer ping delay in milliseconds
pub ping_ms: Option<u64>,
/// True if this session was originated by us.
pub originated: bool,
/// Remote endpoint address of the session
pub remote_address: String,
/// Local endpoint address of the session
pub local_address: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
}
impl Decodable for PeerCapabilityInfo {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let p: Vec<u8> = rlp.val_at(0)?;
if p.len() != 3 {
return Err(DecoderError::Custom("Invalid subprotocol string length. Should be 3"));
}
let mut p2: ProtocolId = [0u8; 3];
p2.clone_from_slice(&p);
Ok(PeerCapabilityInfo {
protocol: p2,
version: rlp.val_at(1)?
})
}
}
impl ToString for PeerCapabilityInfo {
fn to_string(&self) -> String {
format!("{}/{}", str::from_utf8(&self.protocol[..]).unwrap_or("???"), self.version)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SessionCapabilityInfo {
pub protocol: [u8; 3],
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
}
impl PartialOrd for SessionCapabilityInfo {
fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SessionCapabilityInfo {
fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering {
// By protocol id first
if self.protocol != b.protocol {
return self.protocol.cmp(&b.protocol);
}
// By version
self.version.cmp(&b.version)
}
}
const PACKET_HELLO: u8 = 0x80;
const PACKET_DISCONNECT: u8 = 0x01;
const PACKET_PING: u8 = 0x02;

View File

@ -21,6 +21,7 @@ extern crate ethcore_bytes;
extern crate ethcore_io as io;
extern crate ethcore_logger;
extern crate ethcore_network;
extern crate ethcore_network_devp2p;
extern crate ethkey;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
@ -30,6 +31,7 @@ use std::time::*;
use parking_lot::Mutex;
use ethcore_bytes::Bytes;
use ethcore_network::*;
use ethcore_network_devp2p::NetworkService;
use ethkey::{Random, Generator};
use io::TimerToken;

View File

@ -7,37 +7,11 @@ version = "1.11.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
log = "0.3"
mio = "0.6.8"
bytes = "0.4"
rand = "0.4"
time = "0.1.34"
tiny-keccak = "1.3"
rust-crypto = "0.2.34"
slab = "0.2"
igd = "0.6"
libc = "0.2.7"
parking_lot = "0.5"
ansi_term = "0.10"
rustc-hex = "1.0"
ethcore-io = { path = "../io" }
ethcore-bytes = { path = "../bytes" }
ethereum-types = "0.2"
ethkey = { path = "../../ethkey" }
ethcrypto = { path = "../../ethcrypto" }
rlp = { path = "../rlp" }
path = { path = "../path" }
ethcore-logger = { path ="../../logger" }
ipnetwork = "0.12.6"
keccak-hash = { path = "../hash" }
snappy = { git = "https://github.com/paritytech/rust-snappy" }
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
error-chain = { version = "0.11", default-features = false }
[dev-dependencies]
tempdir = "0.3"
[features]
default = []

View File

@ -1,4 +1,4 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
@ -14,111 +14,338 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Network and general IO module.
//!
//! Example usage for craeting a network service and adding an IO handler:
//!
//! ```rust
//! extern crate ethcore_network as net;
//! use net::*;
//! use std::sync::Arc;
//!
//! struct MyHandler;
//!
//! impl NetworkProtocolHandler for MyHandler {
//! fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
//! io.register_timer(0, 1000);
//! }
//!
//! fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
//! println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer);
//! }
//!
//! fn connected(&self, io: &NetworkContext, peer: &PeerId) {
//! println!("Connected {}", peer);
//! }
//!
//! fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
//! println!("Disconnected {}", peer);
//! }
//! }
//!
//! fn main () {
//! let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
//! service.start().expect("Error starting service");
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
//!
//! // Wait for quit condition
//! // ...
//! // Drop the service
//! }
//! ```
//TODO: use Poll from mio
#![allow(deprecated)]
#![recursion_limit="128"]
extern crate ethcore_io as io;
extern crate ethcore_bytes;
extern crate ethereum_types;
extern crate parking_lot;
extern crate mio;
extern crate tiny_keccak;
extern crate crypto as rcrypto;
extern crate rand;
extern crate time;
extern crate ansi_term; //TODO: remove this
extern crate rustc_hex;
extern crate igd;
extern crate libc;
extern crate slab;
extern crate ethkey;
extern crate ethcrypto as crypto;
extern crate ethereum_types;
extern crate ethkey;
extern crate rlp;
extern crate bytes;
extern crate path;
extern crate ethcore_logger;
extern crate ipnetwork;
extern crate keccak_hash as hash;
extern crate serde;
extern crate serde_json;
extern crate snappy;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
extern crate tempdir;
mod host;
mod connection;
mod handshake;
mod session;
mod discovery;
mod service;
mod error;
mod node_table;
mod stats;
mod ip_utils;
mod connection_filter;
pub use host::{HostInfo, PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration};
pub use service::NetworkService;
pub use error::{Error, ErrorKind};
pub use stats::NetworkStats;
pub use session::SessionInfo;
pub use connection_filter::{ConnectionFilter, ConnectionDirection};
pub use io::TimerToken;
pub use node_table::{validate_node_url, NodeId};
use ipnetwork::{IpNetwork, IpNetworkError};
use std::str::FromStr;
pub use error::{Error, ErrorKind, DisconnectReason};
const PROTOCOL_VERSION: u32 = 5;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::str::{self, FromStr};
use std::sync::Arc;
use ipnetwork::{IpNetwork, IpNetworkError};
use io::IoChannel;
use ethkey::Secret;
use ethereum_types::{H256, H512};
use rlp::{Decodable, DecoderError, UntrustedRlp};
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = [u8; 3];
/// Node public key
pub type NodeId = H512;
/// Local (temporary) peer session ID.
pub type PeerId = usize;
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
pub enum NetworkIoMessage {
/// Register a new protocol handler.
AddHandler {
/// Handler shared instance.
handler: Arc<NetworkProtocolHandler + Sync>,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions.
versions: Vec<u8>,
/// Number of packet IDs reserved by the protocol.
packet_count: u8,
},
/// Register a new protocol timer
AddTimer {
/// Protocol Id.
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay in milliseconds.
delay: u64,
},
/// Initliaze public interface.
InitPublicInterface,
/// Disconnect a peer.
Disconnect(PeerId),
/// Disconnect and temporary disable peer.
DisablePeer(PeerId),
/// Network has been started with the host as the given enode.
NetworkStarted(String),
}
/// Shared session information
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// Peer public key
pub id: Option<NodeId>,
/// Peer client ID
pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32,
/// Session protocol capabilities
pub capabilities: Vec<SessionCapabilityInfo>,
/// Peer protocol capabilities
pub peer_capabilities: Vec<PeerCapabilityInfo>,
/// Peer ping delay in milliseconds
pub ping_ms: Option<u64>,
/// True if this session was originated by us.
pub originated: bool,
/// Remote endpoint address of the session
pub remote_address: String,
/// Local endpoint address of the session
pub local_address: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
}
impl Decodable for PeerCapabilityInfo {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let p: Vec<u8> = rlp.val_at(0)?;
if p.len() != 3 {
return Err(DecoderError::Custom("Invalid subprotocol string length. Should be 3"));
}
let mut p2: ProtocolId = [0u8; 3];
p2.clone_from_slice(&p);
Ok(PeerCapabilityInfo {
protocol: p2,
version: rlp.val_at(1)?
})
}
}
impl ToString for PeerCapabilityInfo {
fn to_string(&self) -> String {
format!("{}/{}", str::from_utf8(&self.protocol[..]).unwrap_or("???"), self.version)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SessionCapabilityInfo {
pub protocol: [u8; 3],
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
}
impl PartialOrd for SessionCapabilityInfo {
fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SessionCapabilityInfo {
fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering {
// By protocol id first
if self.protocol != b.protocol {
return self.protocol.cmp(&b.protocol);
}
// By version
self.version.cmp(&b.version)
}
}
/// Network service configuration
#[derive(Debug, PartialEq, Clone)]
pub struct NetworkConfiguration {
/// Directory path to store general network configuration. None means nothing will be saved
pub config_path: Option<String>,
/// Directory path to store network-specific configuration. None means nothing will be saved
pub net_config_path: Option<String>,
/// IP address to listen for incoming connections. Listen to all connections by default
pub listen_address: Option<SocketAddr>,
/// IP address to advertise. Detected automatically if none.
pub public_address: Option<SocketAddr>,
/// Port for UDP connections, same as TCP by default
pub udp_port: Option<u16>,
/// Enable NAT configuration
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// Use provided node key instead of default
pub use_secret: Option<Secret>,
/// Minimum number of connected peers to maintain
pub min_peers: u32,
/// Maximum allowed number of peers
pub max_peers: u32,
/// Maximum handshakes
pub max_handshakes: u32,
/// Reserved protocols. Peers with <key> protocol get additional <value> connection slots.
pub reserved_protocols: HashMap<ProtocolId, u32>,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// IP filter
pub ip_filter: IpFilter,
/// Client identifier
pub client_version: String,
}
impl Default for NetworkConfiguration {
fn default() -> Self {
NetworkConfiguration::new()
}
}
impl NetworkConfiguration {
/// Create a new instance of default settings.
pub fn new() -> Self {
NetworkConfiguration {
config_path: None,
net_config_path: None,
listen_address: None,
public_address: None,
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
boot_nodes: Vec::new(),
use_secret: None,
min_peers: 25,
max_peers: 50,
max_handshakes: 64,
reserved_protocols: HashMap::new(),
ip_filter: IpFilter::default(),
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "Parity-network".into(),
}
}
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
config
}
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
config.nat_enabled = false;
config
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub trait NetworkContext {
/// Send a packet over the network to another peer.
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
/// Send a packet over the network to another peer using specified protocol.
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
/// Get an IoChannel.
fn io_channel(&self) -> IoChannel<NetworkIoMessage>;
/// Disconnect a peer and prevent it from connecting again.
fn disable_peer(&self, peer: PeerId);
/// Disconnect peer. Reconnect can be attempted later.
fn disconnect_peer(&self, peer: PeerId);
/// Check if the session is still active.
fn is_expired(&self) -> bool;
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error>;
/// Returns peer identification string
fn peer_client_version(&self, peer: PeerId) -> String;
/// Returns information on p2p session
fn session_info(&self, peer: PeerId) -> Option<SessionInfo>;
/// Returns max version for a given protocol.
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8>;
/// Returns this object's subprotocol name.
fn subprotocol_name(&self) -> ProtocolId;
}
impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
(**self).send(peer, packet_id, data)
}
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
(**self).send_protocol(protocol, peer, packet_id, data)
}
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
(**self).respond(packet_id, data)
}
fn io_channel(&self) -> IoChannel<NetworkIoMessage> {
(**self).io_channel()
}
fn disable_peer(&self, peer: PeerId) {
(**self).disable_peer(peer)
}
fn disconnect_peer(&self, peer: PeerId) {
(**self).disconnect_peer(peer)
}
fn is_expired(&self) -> bool {
(**self).is_expired()
}
fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), Error> {
(**self).register_timer(token, ms)
}
fn peer_client_version(&self, peer: PeerId) -> String {
(**self).peer_client_version(peer)
}
fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
(**self).session_info(peer)
}
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
(**self).protocol_version(protocol, peer)
}
fn subprotocol_name(&self) -> ProtocolId {
(**self).subprotocol_name()
}
}
pub trait HostInfo {
/// Returns public key
fn id(&self) -> &NodeId;
/// Returns secret key
fn secret(&self) -> &Secret;
/// Increments and returns connection nonce.
fn next_nonce(&mut self) -> H256;
/// Returns the client version.
fn client_version(&self) -> &str;
}
/// Network IO protocol handler. This needs to be implemented for each new subprotocol.
/// All the handler function are called from within IO event loop.
@ -157,7 +384,6 @@ impl NonReservedPeerMode {
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "ipc", binary)]
pub struct IpFilter {
pub predefined: AllowIP,
pub custom_allow: Vec<IpNetwork>,

View File

@ -392,7 +392,7 @@ pub trait Context {
fn send(&self, PeerId, u8, Vec<u8>);
}
impl<'a> Context for NetworkContext<'a> {
impl<T> Context for T where T: ?Sized + NetworkContext {
fn disconnect_peer(&self, peer: PeerId) {
NetworkContext::disconnect_peer(self, peer);
}
@ -437,7 +437,7 @@ impl<T> Network<T> {
}
/// Post a message to the whisper network to be relayed.
pub fn post_message<C: Context>(&self, message: Message, context: &C) -> bool
pub fn post_message<C: ?Sized + Context>(&self, message: Message, context: &C) -> bool
where T: MessageHandler
{
let ok = self.messages.write().insert(message);
@ -452,7 +452,7 @@ impl<T> Network<T> {
}
impl<T: MessageHandler> Network<T> {
fn rally<C: Context>(&self, io: &C) {
fn rally<C: ?Sized + Context>(&self, io: &C) {
// cannot be greater than 16MB (protocol limitation)
const MAX_MESSAGES_PACKET_SIZE: usize = 8 * 1024 * 1024;
@ -627,7 +627,7 @@ impl<T: MessageHandler> Network<T> {
Ok(())
}
fn on_connect<C: Context>(&self, io: &C, peer: &PeerId) {
fn on_connect<C: ?Sized + Context>(&self, io: &C, peer: &PeerId) {
trace!(target: "whisper", "Connecting peer {}", peer);
let node_key = match io.node_key(*peer) {
@ -660,7 +660,7 @@ impl<T: MessageHandler> Network<T> {
io.send(*peer, packet::STATUS, ::rlp::EMPTY_LIST_RLP.to_vec());
}
fn on_packet<C: Context>(&self, io: &C, peer: &PeerId, packet_id: u8, data: &[u8]) {
fn on_packet<C: ?Sized + Context>(&self, io: &C, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);
let res = match packet_id {
packet::STATUS => self.on_status(peer, rlp),
@ -708,7 +708,7 @@ impl<T: MessageHandler> ::network::NetworkProtocolHandler for Network<T> {
// rally with each peer and handle timeouts.
match timer {
RALLY_TOKEN => self.rally(io),
other => debug!(target: "whisper", "Timout triggered on unknown token {}", other),
other => debug!(target: "whisper", "Timeout triggered on unknown token {}", other),
}
}
}