Extended network options (#2845)

* More network configuration options

* Filter UDP requests

* Fixed tests

* Fixed test warning
This commit is contained in:
Arkadiy Paronyan
2016-10-24 18:25:27 +02:00
committed by Gav Wood
parent 7f210b05bb
commit 1a5bae8ef1
14 changed files with 204 additions and 46 deletions

View File

@@ -29,6 +29,7 @@ use node_table::*;
use error::NetworkError;
use io::{StreamToken, IoContext};
use ethkey::{Secret, KeyPair, sign, recover};
use AllowIP;
use PROTOCOL_VERSION;
@@ -95,6 +96,7 @@ pub struct Discovery {
send_queue: VecDeque<Datagramm>,
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
allow_ips: AllowIP,
}
pub struct TableUpdates {
@@ -103,7 +105,7 @@ pub struct TableUpdates {
}
impl Discovery {
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken) -> Discovery {
pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, allow_ips: AllowIP) -> Discovery {
let socket = UdpSocket::bound(&listen).expect("Error binding UDP socket");
Discovery {
id: key.public().clone(),
@@ -118,14 +120,17 @@ impl Discovery {
send_queue: VecDeque::new(),
check_timestamps: true,
adding_nodes: Vec::new(),
allow_ips: allow_ips,
}
}
/// Add a new node to discovery table. Pings the node.
pub fn add_node(&mut self, e: NodeEntry) {
let endpoint = e.endpoint.clone();
self.update_node(e);
self.ping(&endpoint);
if e.endpoint.is_allowed(self.allow_ips) {
let endpoint = e.endpoint.clone();
self.update_node(e);
self.ping(&endpoint);
}
}
/// Add a list of nodes. Pings a few nodes each round
@@ -137,7 +142,9 @@ impl Discovery {
/// Add a list of known nodes to the table.
pub fn init_node_list(&mut self, mut nodes: Vec<NodeEntry>) {
for n in nodes.drain(..) {
self.update_node(n);
if n.endpoint.is_allowed(self.allow_ips) {
self.update_node(n);
}
}
}
@@ -394,10 +401,11 @@ impl Discovery {
try!(self.check_timestamp(timestamp));
let mut added_map = HashMap::new();
let entry = NodeEntry { id: node.clone(), endpoint: source.clone() };
if !entry.endpoint.is_valid() || !entry.endpoint.is_global() {
if !entry.endpoint.is_valid() {
debug!(target: "discovery", "Got bad address: {:?}", entry);
}
else {
} else if !entry.endpoint.is_allowed(self.allow_ips) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
} else {
self.update_node(entry.clone());
added_map.insert(node.clone(), entry);
}
@@ -470,6 +478,10 @@ impl Discovery {
debug!(target: "discovery", "Bad address: {:?}", endpoint);
continue;
}
if !endpoint.is_allowed(self.allow_ips) {
debug!(target: "discovery", "Address not allowed: {:?}", endpoint);
continue;
}
let node_id: NodeId = try!(r.val_at(3));
if node_id == self.id {
continue;
@@ -539,6 +551,7 @@ mod tests {
use std::str::FromStr;
use rustc_serialize::hex::FromHex;
use ethkey::{Random, Generator};
use AllowIP;
#[test]
fn find_node() {
@@ -563,8 +576,8 @@ mod tests {
let key2 = Random.generate().unwrap();
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0);
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0);
let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, AllowIP::All);
let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, AllowIP::All);
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap();
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap();
@@ -596,7 +609,7 @@ mod tests {
fn removes_expired() {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0);
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, AllowIP::All);
for _ in 0..1200 {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
}
@@ -624,7 +637,7 @@ mod tests {
fn packets() {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0);
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, AllowIP::All);
discovery.check_timestamps = false;
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();

View File

@@ -34,7 +34,7 @@ use rlp::*;
use session::{Session, SessionInfo, SessionData};
use error::*;
use io::*;
use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION};
use {NetworkProtocolHandler, NonReservedPeerMode, AllowIP, PROTOCOL_VERSION};
use node_table::*;
use stats::NetworkStats;
use discovery::{Discovery, TableUpdates, NodeEntry};
@@ -45,8 +45,7 @@ use parking_lot::{Mutex, RwLock};
type Slab<T> = ::slab::Slab<T, usize>;
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
const MAX_HANDSHAKES: usize = 1024;
// Tokens
const TCP_ACCEPT: usize = SYS_TIMER + 1;
@@ -89,12 +88,18 @@ pub struct NetworkConfiguration {
pub use_secret: Option<Secret>,
/// Minimum number of connected peers to maintain
pub min_peers: u32,
/// Maximum allowd number of peers
/// Maximum allowed number of peers
pub max_peers: u32,
/// Maximum handshakes
pub max_handshakes: u32,
/// Reserved protocols. Peers with <key> protocol get additional <value> connection slots.
pub reserved_protocols: HashMap<ProtocolId, u32>,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// IP filter
pub allow_ips: AllowIP,
}
impl Default for NetworkConfiguration {
@@ -118,6 +123,9 @@ impl NetworkConfiguration {
use_secret: None,
min_peers: 25,
max_peers: 50,
max_handshakes: 64,
reserved_protocols: HashMap::new(),
allow_ips: AllowIP::All,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
}
@@ -364,7 +372,7 @@ pub struct Host {
impl Host {
/// Create a new instance
pub fn new(config: NetworkConfiguration, stats: Arc<NetworkStats>) -> Result<Host, NetworkError> {
pub fn new(mut config: NetworkConfiguration, stats: Arc<NetworkStats>) -> Result<Host, NetworkError> {
trace!(target: "host", "Creating new Host object");
let mut listen_address = match config.listen_address {
@@ -394,6 +402,7 @@ impl Host {
let boot_nodes = config.boot_nodes.clone();
let reserved_nodes = config.reserved_nodes.clone();
config.max_handshakes = min(config.max_handshakes, MAX_HANDSHAKES as u32);
let mut host = Host {
info: RwLock::new(HostInfo {
@@ -532,6 +541,7 @@ impl Host {
}
let local_endpoint = self.info.read().local_endpoint.clone();
let public_address = self.info.read().config.public_address.clone();
let allow_ips = self.info.read().config.allow_ips;
let public_endpoint = match public_address {
None => {
let public_address = select_public_address(local_endpoint.address.port());
@@ -563,7 +573,7 @@ impl Host {
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
let mut udp_addr = local_endpoint.address.clone();
udp_addr.set_port(local_endpoint.udp_port);
Some(Discovery::new(&info.keys, udp_addr, public_endpoint, DISCOVERY))
Some(Discovery::new(&info.keys, udp_addr, public_endpoint, DISCOVERY, allow_ips))
} else { None }
};
@@ -618,14 +628,14 @@ impl Host {
}
fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
let (min_peers, mut pin) = {
let (min_peers, mut pin, max_handshakes, allow_ips) = {
let info = self.info.read();
if info.capabilities.is_empty() {
return;
}
let config = &info.config;
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny)
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.allow_ips)
};
let session_count = self.session_count();
@@ -642,22 +652,22 @@ impl Host {
let handshake_count = self.handshake_count();
// allow 16 slots for incoming connections
let handshake_limit = MAX_HANDSHAKES - 16;
if handshake_count >= handshake_limit {
if handshake_count >= max_handshakes {
return;
}
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.read().nodes()
self.nodes.read().nodes(allow_ips)
} else {
Vec::new()
});
let max_handshakes_per_round = max_handshakes / 2;
let mut started: usize = 0;
for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
.take(min(max_handshakes_per_round, max_handshakes - handshake_count)) {
self.connect_peer(&id, io);
started += 1;
}
@@ -790,7 +800,14 @@ impl Host {
let session_count = self.session_count();
let (max_peers, reserved_only) = {
let info = self.info.read();
(info.config.max_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
let mut max_peers = info.config.max_peers;
for cap in s.info.capabilities.iter() {
if let Some(num) = info.config.reserved_protocols.get(&cap.protocol) {
max_peers += *num;
break;
}
}
(max_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
if session_count >= max_peers as usize || reserved_only {

View File

@@ -56,6 +56,22 @@ impl SocketAddrExt for Ipv6Addr {
}
}
impl SocketAddrExt for IpAddr {
fn is_unspecified_s(&self) -> bool {
match *self {
IpAddr::V4(ref ip) => ip.is_unspecified_s(),
IpAddr::V6(ref ip) => ip.is_unspecified_s(),
}
}
fn is_global_s(&self) -> bool {
match *self {
IpAddr::V4(ref ip) => ip.is_global_s(),
IpAddr::V6(ref ip) => ip.is_global_s(),
}
}
}
#[cfg(not(windows))]
mod getinterfaces {
use std::{mem, io, ptr};

View File

@@ -137,3 +137,15 @@ impl NonReservedPeerMode {
}
}
}
/// IP fiter
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
pub enum AllowIP {
/// Connect to any address
All,
/// Connect to private network only
Private,
/// Connect to public network only
Public,
}

View File

@@ -30,6 +30,7 @@ use util::UtilError;
use rlp::*;
use time::Tm;
use error::NetworkError;
use AllowIP;
use discovery::{TableUpdates, NodeEntry};
use ip_utils::*;
pub use rustc_serialize::json::Json;
@@ -53,9 +54,15 @@ impl NodeEndpoint {
SocketAddr::V6(a) => SocketAddr::V6(SocketAddrV6::new(a.ip().clone(), self.udp_port, a.flowinfo(), a.scope_id())),
}
}
}
impl NodeEndpoint {
pub fn is_allowed(&self, filter: AllowIP) -> bool {
match filter {
AllowIP::All => true,
AllowIP::Private => !self.address.ip().is_global_s(),
AllowIP::Public => self.address.ip().is_global_s(),
}
}
pub fn from_rlp(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let tcp_port = try!(rlp.val_at::<u16>(2));
let udp_port = try!(rlp.val_at::<u16>(1));
@@ -98,13 +105,6 @@ impl NodeEndpoint {
SocketAddr::V6(a) => !a.ip().is_unspecified_s()
}
}
pub fn is_global(&self) -> bool {
match self.address {
SocketAddr::V4(a) => a.ip().is_global_s(),
SocketAddr::V6(a) => a.ip().is_global_s()
}
}
}
impl FromStr for NodeEndpoint {
@@ -219,8 +219,8 @@ impl NodeTable {
}
/// Returns node ids sorted by number of failures
pub fn nodes(&self) -> Vec<NodeId> {
let mut refs: Vec<&Node> = self.nodes.values().filter(|n| !self.useless_nodes.contains(&n.id)).collect();
pub fn nodes(&self, filter: AllowIP) -> Vec<NodeId> {
let mut refs: Vec<&Node> = self.nodes.values().filter(|n| !self.useless_nodes.contains(&n.id) && n.endpoint.is_allowed(filter)).collect();
refs.sort_by(|a, b| a.failures.cmp(&b.failures));
refs.iter().map(|n| n.id.clone()).collect()
}
@@ -278,7 +278,7 @@ impl NodeTable {
let mut json = String::new();
json.push_str("{\n");
json.push_str("\"nodes\": [\n");
let node_ids = self.nodes();
let node_ids = self.nodes(AllowIP::All);
for i in 0 .. node_ids.len() {
let node = self.nodes.get(&node_ids[i]).unwrap();
json.push_str(&format!("\t{{ \"url\": \"{}\", \"failures\": {} }}{}\n", node, node.failures, if i == node_ids.len() - 1 {""} else {","}))
@@ -361,6 +361,7 @@ mod tests {
use std::net::*;
use util::hash::*;
use devtools::*;
use AllowIP;
#[test]
fn endpoint_parse() {
@@ -406,7 +407,7 @@ mod tests {
table.note_failure(&id1);
table.note_failure(&id2);
let r = table.nodes();
let r = table.nodes(AllowIP::All);
assert_eq!(r[0][..], id3[..]);
assert_eq!(r[1][..], id2[..]);
assert_eq!(r[2][..], id1[..]);
@@ -428,7 +429,7 @@ mod tests {
{
let table = NodeTable::new(Some(temp_path.as_path().to_str().unwrap().to_owned()));
let r = table.nodes();
let r = table.nodes(AllowIP::All);
assert_eq!(r[0][..], id1[..]);
assert_eq!(r[1][..], id2[..]);
}