Node Discovery v4 ENR Extension (EIP-868) (#11540)

This commit is contained in:
Artem Vorotnikov 2020-04-04 11:52:22 +03:00 committed by GitHub
parent 47637538e4
commit e047bb4bb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 295 additions and 79 deletions

38
Cargo.lock generated
View File

@ -263,6 +263,12 @@ dependencies = [
"byteorder",
]
[[package]]
name = "base64"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3"
[[package]]
name = "basic-authority"
version = "0.1.0"
@ -440,6 +446,12 @@ dependencies = [
"rustc-hex 2.1.0",
]
[[package]]
name = "bs58"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b170cd256a3f9fa6b9edae3e44a7dfdfc77e8124dbc3e2612d75f9c3e2396dae"
[[package]]
name = "bstr"
version = "0.2.12"
@ -1011,6 +1023,23 @@ dependencies = [
"vm",
]
[[package]]
name = "enr"
version = "0.1.0-alpha.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "486a4699cf13c63af330d7b5ba854cb062b427a218cbbb09fdc7082b76fb54e7"
dependencies = [
"base64 0.12.0",
"bs58",
"hex",
"log",
"rand 0.7.3",
"rlp",
"secp256k1",
"tiny-keccak 2.0.1",
"zeroize",
]
[[package]]
name = "enum_primitive"
version = "0.1.1"
@ -1463,6 +1492,8 @@ dependencies = [
"ansi_term",
"assert_matches",
"bytes",
"derive_more",
"enr",
"env_logger 0.5.13",
"ethcore-io",
"ethcore-network",
@ -1483,6 +1514,7 @@ dependencies = [
"parking_lot 0.10.0",
"rand 0.7.3",
"rlp",
"secp256k1",
"serde",
"serde_json",
"slab 0.2.0",
@ -2090,6 +2122,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "hex-literal"
version = "0.2.1"

View File

@ -10,6 +10,8 @@ edition = "2018"
[dependencies]
ansi_term = "0.11"
bytes = "0.4"
derive_more = "0.99"
enr = { version = "0.1.0-alpha.5", default-features = false, features = ["rust-secp256k1"] }
ethcore-io = { path = "../io", features = ["mio"] }
ethereum-types = "0.8.0"
igd = "0.10.0"
@ -28,6 +30,7 @@ parity-snappy = "0.1"
parking_lot = "0.10.0"
rand = "0.7"
rlp = "0.4.0"
secp256k1 = "0.17"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
slab = "0.2"

View File

@ -31,6 +31,7 @@ use parity_crypto::publickey::{KeyPair, recover, Secret, sign};
use network::Error;
use network::IpFilter;
use crate::node_record::*;
use crate::node_table::*;
use crate::PROTOCOL_VERSION;
@ -45,6 +46,8 @@ const PACKET_PING: u8 = 1;
const PACKET_PONG: u8 = 2;
const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4;
const PACKET_ENR_REQUEST: u8 = 5;
const PACKET_ENR_RESPONSE: u8 = 6;
const PING_TIMEOUT: Duration = Duration::from_millis(500);
const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2);
@ -155,6 +158,7 @@ pub struct Discovery {
id_hash: H256,
secret: Secret,
public_endpoint: NodeEndpoint,
enr: Enr,
discovery_initiated: bool,
discovery_round: Option<u16>,
discovery_id: NodeId,
@ -180,11 +184,12 @@ pub struct TableUpdates {
}
impl Discovery {
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
pub fn new(key: &KeyPair, public: NodeEndpoint, enr: Enr, ip_filter: IpFilter) -> Discovery {
Discovery {
id: *key.public(),
id_hash: keccak(key.public()),
secret: key.secret().clone(),
enr,
public_endpoint: public,
discovery_initiated: false,
discovery_round: None,
@ -372,6 +377,7 @@ impl Discovery {
self.public_endpoint.to_rlp_list(&mut rlp);
node.endpoint.to_rlp_list(&mut rlp);
append_expiration(&mut rlp);
rlp.append(&self.enr.seq());
let hash = self.send_packet(PACKET_PING, node.endpoint.udp_address(), rlp.drain())?;
self.in_flight_pings.insert(node.id, PingRequest {
@ -484,6 +490,11 @@ impl Discovery {
PACKET_PONG => self.on_pong(&rlp, node_id, from),
PACKET_FIND_NODE => self.on_find_node(&rlp, node_id, from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, node_id, from),
PACKET_ENR_REQUEST => self.on_enr_request(&rlp, node_id, from, hash_signed.as_bytes()),
PACKET_ENR_RESPONSE => {
debug!(target: "discovery", "ENR response handling is not implemented");
Ok(None)
}
_ => {
debug!(target: "discovery", "Unknown UDP packet: {}", packet_id);
Ok(None)
@ -522,7 +533,12 @@ impl Discovery {
let ping_to = NodeEndpoint::from_rlp(&rlp.at(2)?)?;
let timestamp: u64 = rlp.val_at(3)?;
self.check_timestamp(timestamp)?;
let mut response = RlpStream::new_list(3);
let enr_seq = rlp.val_at::<u64>(4).ok();
let mut response = RlpStream::new_list(3 + if enr_seq.is_some() {
1
} else {
0
});
let pong_to = NodeEndpoint {
address: from,
udp_port: ping_from.udp_port
@ -537,6 +553,9 @@ impl Discovery {
response.append(&echo_hash);
append_expiration(&mut response);
if enr_seq.is_some() {
response.append(&self.enr.seq());
}
self.send_packet(PACKET_PONG, from, response.drain())?;
let entry = NodeEntry { id: node_id, endpoint: pong_to };
@ -556,6 +575,7 @@ impl Discovery {
let echo_hash: H256 = rlp.val_at(1)?;
let timestamp: u64 = rlp.val_at(2)?;
self.check_timestamp(timestamp)?;
// let enr_seq = rlp.val_at::<u64>(3).ok();
let expected_node = match self.in_flight_pings.entry(node_id) {
Entry::Occupied(entry) if entry.get().echo_hash != echo_hash => {
@ -733,6 +753,32 @@ impl Discovery {
Ok(None)
}
fn on_enr_request(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, request_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
let timestamp = rlp.val_at::<u64>(0)?;
self.check_timestamp(timestamp)?;
let node = NodeEntry {
id: node_id.clone(),
endpoint: NodeEndpoint {
address: from,
udp_port: from.port()
}
};
match self.check_validity(&node) {
NodeValidity::Ourselves => (), // It makes no sense to respond to the discovery request from ourselves
NodeValidity::ValidNode(_) => {
let mut response = RlpStream::new_list(2);
response.append(&request_hash);
response.append(&self.enr);
self.send_packet(PACKET_ENR_RESPONSE, from, response.drain())?;
}
// Make sure the request source is actually there and responds to pings before actually responding
invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(node_id, invalidity_reason))
}
Ok(None)
}
fn check_expired(&mut self, time: Instant) {
let mut nodes_to_expire = Vec::new();
self.in_flight_pings.retain(|node_id, ping_request| {
@ -895,7 +941,8 @@ mod tests {
fn ping_queue() {
let key = Random.generate();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 };
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
let enr = EnrManager::new(key.secret().clone(), 0).unwrap().with_node_endpoint(&ep).into_enr();
let mut discovery = Discovery::new(&key, ep.clone(), enr, IpFilter::default());
for i in 1..(MAX_NODES_PING+1) {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
@ -919,7 +966,8 @@ mod tests {
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 41000 + i),
udp_port: 41000 + i,
};
Discovery::new(&key, ep, IpFilter::default())
let enr = EnrManager::new(key.secret().clone(), 0).unwrap().with_node_endpoint(&ep).into_enr();
Discovery::new(&key, ep, enr, IpFilter::default())
})
.collect::<Vec<_>>();
@ -966,7 +1014,8 @@ mod tests {
fn removes_expired() {
let key = Random.generate();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
let enr = EnrManager::new(key.secret().clone(), 0).unwrap().with_node_endpoint(&ep).into_enr();
let discovery = Discovery::new(&key, ep.clone(), enr, IpFilter::default());
let mut discovery = Discovery { request_backoff: &[], ..discovery };
@ -1058,7 +1107,8 @@ mod tests {
let key = Random.generate();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
let enr = EnrManager::new(key.secret().clone(), 0).unwrap().with_node_endpoint(&ep).into_enr();
let mut discovery = Discovery::new(&key, ep.clone(), enr, IpFilter::default());
for _ in 0..(16 + 10) {
let entry = BucketEntry::new(NodeEntry { id: NodeId::zero(), endpoint: ep.clone() });
@ -1115,7 +1165,8 @@ mod tests {
let key = Secret::from_str(secret_hex)
.and_then(|secret| KeyPair::from_secret(secret))
.unwrap();
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
let enr = EnrManager::new(key.secret().clone(), 0).unwrap().with_node_endpoint(&ep).into_enr();
let mut discovery = Discovery::new(&key, ep.clone(), enr, IpFilter::default());
discovery.init_node_list(node_entries.clone());
@ -1160,7 +1211,8 @@ mod tests {
fn packets() {
let key = Random.generate();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 };
let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default());
let enr = EnrManager::new(key.secret().clone(), 0).unwrap().with_node_endpoint(&ep).into_enr();
let mut discovery = Discovery::new(&key, ep.clone(), enr, IpFilter::default());
discovery.check_timestamps = false;
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();
@ -1229,8 +1281,10 @@ mod tests {
let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 };
let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 };
let ep3 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40346").unwrap(), udp_port: 40345 };
let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default());
let enr1 = EnrManager::new(key1.secret().clone(), 0).unwrap().with_node_endpoint(&ep1).into_enr();
let enr2 = EnrManager::new(key2.secret().clone(), 0).unwrap().with_node_endpoint(&ep2).into_enr();
let mut discovery1 = Discovery::new(&key1, ep1.clone(), enr1, IpFilter::default());
let mut discovery2 = Discovery::new(&key2, ep2.clone(), enr2, IpFilter::default());
discovery1.ping(&NodeEntry { id: discovery2.id, endpoint: ep2.clone() }, PingReason::Default).unwrap();
let ping_data = discovery1.dequeue_send().unwrap();

View File

@ -16,11 +16,10 @@
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io::{self, Read, Write};
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::*;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
@ -34,7 +33,6 @@ use mio::{
Token,
udp::UdpSocket
};
use parity_path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock};
use rlp::{Encodable, RlpStream};
@ -50,7 +48,9 @@ use crate::{
connection::PAYLOAD_SOFT_LIMIT,
discovery::{Discovery, MAX_DATAGRAM_SIZE, NodeEntry, TableUpdates},
ip_utils::{map_external_address, select_public_address},
node_record::*,
node_table::*,
persistence::{save, load},
PROTOCOL_VERSION,
session::{Session, SessionData}
};
@ -218,6 +218,8 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> {
pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair,
/// Node record.
enr: EnrManager,
/// Current network configuration
config: NetworkConfiguration,
/// Connection nonce.
@ -285,19 +287,30 @@ impl Host {
Some(addr) => addr,
};
let mut key_created = false;
let keys = if let Some(ref secret) = config.use_secret {
KeyPair::from_secret(secret.clone())?
} else {
config.config_path.clone().and_then(|ref p| load_key(Path::new(&p)))
config.config_path.clone().and_then(|ref p| load(Path::new(&p)))
.map_or_else(|| {
key_created = true;
let key = Random.generate();
if let Some(path) = config.config_path.clone() {
save_key(Path::new(&path), key.secret());
save(Path::new(&path), key.secret());
}
key
},
|s| KeyPair::from_secret(s).expect("Error creating node secret key"))
};
let mut enr = None;
if !key_created {
if let Some(path) = &config.config_path {
if let Some(data) = load(Path::new(&path)) {
enr = EnrManager::load(keys.secret().clone(), data);
}
}
}
let enr = enr.unwrap_or_else(|| EnrManager::new(keys.secret().clone(), 0).expect("keys.secret() is a valid secp256k1 secret; Enr does not fail given valid secp256k1 secret; qed"));
let path = config.net_config_path.clone();
// Setup the server socket
let tcp_listener = TcpListener::bind(&listen_address)?;
@ -313,6 +326,7 @@ impl Host {
let mut host = Host {
info: RwLock::new(HostInfo {
keys,
enr,
config,
nonce: H256::random(),
protocol_version: PROTOCOL_VERSION,
@ -475,7 +489,11 @@ impl Host {
Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
};
self.info.write().public_endpoint = Some(public_endpoint.clone());
{
let mut info = self.info.write();
info.public_endpoint = Some(public_endpoint.clone());
info.enr.set_node_endpoint(&public_endpoint);
}
if let Some(url) = self.external_url() {
io.message(NetworkIoMessage::NetworkStarted(url)).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
@ -485,7 +503,7 @@ impl Host {
let discovery = {
let info = self.info.read();
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
Some(Discovery::new(&info.keys, public_endpoint, allow_ips))
Some(Discovery::new(&info.keys, public_endpoint, info.enr.as_enr().clone(), allow_ips))
} else { None }
};
@ -1218,67 +1236,6 @@ impl IoHandler<NetworkIoMessage> for Host {
}
}
fn save_key(path: &Path, key: &Secret) {
let mut path_buf = PathBuf::from(path);
if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
warn!("Error creating key directory: {:?}", e);
return;
};
path_buf.push("key");
let path = path_buf.as_path();
let mut file = match fs::File::create(&path) {
Ok(file) => file,
Err(e) => {
warn!("Error creating key file: {:?}", e);
return;
}
};
if let Err(e) = restrict_permissions_owner(path, true, false) {
warn!(target: "network", "Failed to modify permissions of the file ({})", e);
}
if let Err(e) = file.write(&key.to_hex().into_bytes()) {
warn!("Error writing key file: {:?}", e);
}
}
fn load_key(path: &Path) -> Option<Secret> {
let mut path_buf = PathBuf::from(path);
path_buf.push("key");
let mut file = match fs::File::open(path_buf.as_path()) {
Ok(file) => file,
Err(e) => {
debug!("Error opening key file: {:?}", e);
return None;
}
};
let mut buf = String::new();
match file.read_to_string(&mut buf) {
Ok(_) => {},
Err(e) => {
warn!("Error reading key file: {:?}", e);
return None;
}
}
match Secret::from_str(&buf) {
Ok(key) => Some(key),
Err(e) => {
warn!("Error parsing key file: {:?}", e);
None
}
}
}
#[test]
fn key_save_load() {
use tempfile::TempDir;
let tempdir = TempDir::new().unwrap();
let key = H256::random().into();
save_key(tempdir.path(), &key);
let r = load_key(tempdir.path());
assert_eq!(key, r.unwrap());
}
#[test]
fn host_client_url() {
let mut config = NetworkConfiguration::new_local();

View File

@ -71,7 +71,9 @@ mod handshake;
mod session;
mod discovery;
mod service;
mod node_record;
mod node_table;
mod ip_utils;
mod persistence;
const PROTOCOL_VERSION: u32 = 5;

View File

@ -0,0 +1,71 @@
use log::*;
use parity_crypto::publickey::Secret;
use crate::{persistence::DiskEntity, node_table::NodeEndpoint};
pub type Enr = enr::Enr<secp256k1::SecretKey>;
const ENR_VERSION: &str = "v4";
pub struct EnrManager {
secret: secp256k1::SecretKey,
inner: Enr,
}
impl EnrManager {
pub fn new(key: Secret, seq: u64) -> Option<Self> {
let secret = key.to_secp256k1_secret().ok()?;
let mut b = enr::EnrBuilder::new(ENR_VERSION);
b.seq(seq);
let inner = b.build(&secret).ok()?;
Some(Self { secret, inner })
}
pub fn load(key: Secret, inner: Enr) -> Option<Self> {
let secret = key.to_secp256k1_secret().ok()?;
let public = secp256k1::PublicKey::from_secret_key(&secp256k1::Secp256k1::new(), &secret);
if inner.public_key() != public {
warn!("ENR does not match the provided key");
return None;
}
Some(Self { secret, inner })
}
#[cfg(test)]
pub fn with_node_endpoint(mut self, endpoint: &NodeEndpoint) -> Self {
self.set_node_endpoint(endpoint);
self
}
pub fn set_node_endpoint(&mut self, endpoint: &NodeEndpoint) {
const ENR_PROOF: &str = "Not enough data to go over the limit; qed";
let seq = self.inner.seq();
self.inner.set_tcp_socket(endpoint.address, &self.secret).expect(ENR_PROOF);
self.inner.set_udp(endpoint.udp_port, &self.secret).expect(ENR_PROOF);
// We just wrap here, unlikely to be a problem in our lifetimes unless the user sets seq high enough on purpose.
self.inner.set_seq(seq.wrapping_add(1), &self.secret).expect(ENR_PROOF);
}
pub fn as_enr(&self) -> &Enr {
&self.inner
}
#[cfg(test)]
pub fn into_enr(self) -> Enr {
self.inner
}
}
impl DiskEntity for Enr {
const FILENAME: &'static str = "enr";
const DESCRIPTION: &'static str = "Ethereum Node Record";
fn to_repr(&self) -> String {
self.to_base64()
}
fn from_repr(s: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(s.parse()?)
}
}

View File

@ -0,0 +1,91 @@
//! This module is a utility for when you need to persist some small amount of data on disk,
//! e.g. a secret or a snippet of user configuration. Implement `DiskEntity` for your type
//! and call `save` to persist it to disk or `load` to retrieve it again.
use log::*;
use parity_crypto::publickey::Secret;
use parity_path::restrict_permissions_owner;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
/// An entity that can be persisted on disk.
pub trait DiskEntity: Sized {
const FILENAME: &'static str;
/// Description of what kind of data that is stored in the file
const DESCRIPTION: &'static str;
/// Convert to string representation that will be written to disk.
fn to_repr(&self) -> String;
/// Convert from string representation loaded from disk.
fn from_repr(s: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>>;
}
impl DiskEntity for Secret {
const FILENAME: &'static str = "key";
const DESCRIPTION: &'static str = "node key";
fn to_repr(&self) -> String {
self.to_hex()
}
fn from_repr(s: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(s.parse()?)
}
}
/// Persist item to disk. It does not perform synchronization and should not be called from multiple threads simultaneously.
pub(crate) fn save<E: DiskEntity>(path: &Path, entity: &E) {
let mut path_buf = PathBuf::from(path);
if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
warn!("Error creating {} directory: {:?}", E::DESCRIPTION, e);
return;
};
path_buf.push(E::FILENAME);
let path = path_buf.as_path();
let mut file = match fs::File::create(&path) {
Ok(file) => file,
Err(e) => {
warn!("Error creating {}: {:?}", E::DESCRIPTION, e);
return;
}
};
if let Err(e) = restrict_permissions_owner(path, true, false) {
warn!("Failed to modify permissions of the file ({})", e);
}
if let Err(e) = file.write(&entity.to_repr().into_bytes()) {
warn!("Failed to persist {} to disk: {:?}", E::DESCRIPTION, e);
}
}
/// Load item from disk. It does not modify data on disk and is thread-safe to call.
pub(crate) fn load<E>(path: &Path) -> Option<E>
where
E: DiskEntity,
{
let mut path_buf = PathBuf::from(path);
path_buf.push(E::FILENAME);
let buf = std::fs::read_to_string(path_buf).map_err(|e| warn!("Error reading {}: {:?}", E::DESCRIPTION, e)).ok()?;
let data = E::from_repr(&buf).map_err(|e| warn!("Error parsing {}: {:?}", E::DESCRIPTION, e)).ok()?;
Some(data)
}
#[cfg(test)]
mod tests {
#[test]
fn key_save_load() {
use super::*;
use ethereum_types::H256;
use tempfile::TempDir;
let tempdir = TempDir::new().unwrap();
let key = Secret::from(H256::random());
save(tempdir.path(), &key);
let r = load(tempdir.path());
assert_eq!(key, r.unwrap());
}
}