Backports for Beta 2.3.3 (#10333)
* version: bump beta to 2.3.3 * import rpc transactions sequentially (#10051) * import rpc transactions sequentially * use impl trait in argument position, renamed ProspectiveDispatcher to WithPostSign * grouped imports * integrates PostSign with ProspectiveSigner * fix spaces, removed unnecessary type cast and duplicate polling * clean up code style * Apply suggestions from code review * Fix Windows build (#10284) * Don't run the CPP example on CI (#10285) * Don't run the CPP example on CI * Add comment * CI optimizations (#10297) * CI optimizations * fix stripping * new dockerfile * no need n submodule upd * review * moved dockerfile * it becomes large * onchain update depends on s3 * fix dependency * fix cache status * fix cache status * new cache status * fix publish job (#10317) * fix publish job * dashes and colonels * Add Statetest support for Constantinople Fix (#10323) * Update Ethereum tests repo to v6.0.0-beta.3 tag * Add spec for St.Peter's / ConstantinopleFix statetests * Properly handle check_epoch_end_signal errors (#10015) * Make check_epoch_end_signal to only use immutable data * Move check_epoch_end_signals out of commit_block * Make check_epoch_end_signals possible to fail * Actually return the error from check_epoch_end_signals * Remove a clone * Fix import error * cargo: fix compilation * fix(add helper for timestamp overflows) (#10330) * fix(add helper timestamp overflows) * fix(simplify code) * fix(make helper private) * Remove CallContract and RegistryInfo re-exports from `ethcore/client` (#10205) * Remove re-export of `CallContract` and `RegistryInfo` from `ethcore/client` * Remove CallContract and RegistryInfo re-exports again This was missed while fixing merge conflicts * fix(docker): fix not receives SIGINT (#10059) * fix(docker): fix not receives SIGINT * fix: update with reviews * update with review * update * update * snap: official image / test (#10168) * official image / test * fix / test * bit more necromancy * fix paths * add source bin/df /test * add source bin/df /test2 * something w paths /test * something w paths /test * add source-type /test * show paths /test * copy plugin /test * plugin -> nil * install rhash * no questions while installing rhash * publish snap only for release * Don't add discovery initiators to the node table (#10305) * Don't add discovery initiators to the node table * Use enums for tracking state of the nodes in discovery * Dont try to ping ourselves * Fix minor nits * Update timeouts when observing an outdated node * Extracted update_bucket_record from update_node * Fixed typo * Fix two final nits from @todr * Extract CallContract and RegistryInfo traits into their own crate (#10178) * Create call-contract crate * Add license * First attempt at using extracted CallContract trait * Remove unneeded `extern crate` calls * Move RegistryInfo trait into call-contract crate * Move service-transaction-checker from ethcore to ethcore-miner * Update Cargo.lock file * Re-export call_contract * Merge CallContract and RegistryInfo imports * Remove commented code * Add documentation to call_contract crate * Add TODO for removal of re-exports * Update call-contract crate description Co-Authored-By: HCastano <HCastano@users.noreply.github.com> * Rename call-contract crate to ethcore-call-contract * Remove CallContract and RegistryInfo re-exports from `ethcore/client` (#10205) * Remove re-export of `CallContract` and `RegistryInfo` from `ethcore/client` * Remove CallContract and RegistryInfo re-exports again This was missed while fixing merge conflicts * fixed: types::transaction::SignedTransaction; (#10229) * fix daemonize dependency * fix build * change docker image based on debian instead of ubuntu due to the chan… (#10336) * change docker image based on debian instead of ubuntu due to the changes of the build container * role back docker build image and docker deploy image to ubuntu:xenial based (#10338) * perform stripping during build (#10208) * perform stripping during build (#10208) * perform stripping during build * var RUSTFLAGS
This commit is contained in:
@@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, VecDeque};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::default::Default;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use lru_cache::LruCache;
|
||||
use hash::keccak;
|
||||
use ethereum_types::{H256, H520};
|
||||
use rlp::{Rlp, RlpStream};
|
||||
@@ -55,6 +56,8 @@ const REQUEST_BACKOFF: [Duration; 4] = [
|
||||
|
||||
const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60);
|
||||
|
||||
const OBSERVED_NODES_MAX_SIZE: usize = 10_000;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct NodeEntry {
|
||||
pub id: NodeId,
|
||||
@@ -95,7 +98,27 @@ struct FindNodeRequest {
|
||||
#[derive(Clone, Copy)]
|
||||
enum PingReason {
|
||||
Default,
|
||||
FromDiscoveryRequest(NodeId)
|
||||
FromDiscoveryRequest(NodeId, NodeValidity),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
enum NodeCategory {
|
||||
Bucket,
|
||||
Observed
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
enum NodeValidity {
|
||||
Ourselves,
|
||||
ValidNode(NodeCategory),
|
||||
ExpiredNode(NodeCategory),
|
||||
UnknownNode
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum BucketError {
|
||||
Ourselves,
|
||||
NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize},
|
||||
}
|
||||
|
||||
struct PingRequest {
|
||||
@@ -145,6 +168,12 @@ pub struct Discovery<'a> {
|
||||
discovery_id: NodeId,
|
||||
discovery_nodes: HashSet<NodeId>,
|
||||
node_buckets: Vec<NodeBucket>,
|
||||
|
||||
// Sometimes we don't want to add nodes to the NodeTable, but still want to
|
||||
// keep track of them to avoid excessive pinging (happens when an unknown node sends
|
||||
// a discovery request to us -- the node might be on a different net).
|
||||
other_observed_nodes: LruCache<NodeId, (NodeEndpoint, Instant)>,
|
||||
|
||||
in_flight_pings: HashMap<NodeId, PingRequest>,
|
||||
in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>,
|
||||
send_queue: VecDeque<Datagram>,
|
||||
@@ -171,6 +200,7 @@ impl<'a> Discovery<'a> {
|
||||
discovery_id: NodeId::new(),
|
||||
discovery_nodes: HashSet::new(),
|
||||
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
|
||||
other_observed_nodes: LruCache::new(OBSERVED_NODES_MAX_SIZE),
|
||||
in_flight_pings: HashMap::new(),
|
||||
in_flight_find_nodes: HashMap::new(),
|
||||
send_queue: VecDeque::new(),
|
||||
@@ -200,41 +230,53 @@ impl<'a> Discovery<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
|
||||
trace!(target: "discovery", "Inserting {:?}", &e);
|
||||
fn update_bucket_record(&mut self, e: NodeEntry) -> Result<(), BucketError> {
|
||||
let id_hash = keccak(e.id);
|
||||
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
|
||||
Some(dist) => dist,
|
||||
None => {
|
||||
debug!(target: "discovery", "Attempted to update own entry: {:?}", e);
|
||||
return None;
|
||||
return Err(BucketError::Ourselves);
|
||||
}
|
||||
};
|
||||
let bucket = &mut self.node_buckets[dist];
|
||||
bucket.nodes.iter_mut().find(|n| n.address.id == e.id)
|
||||
.map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| {
|
||||
entry.address = e;
|
||||
entry.last_seen = Instant::now();
|
||||
entry.backoff_until = Instant::now();
|
||||
entry.fail_count = 0;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
let mut added_map = HashMap::new();
|
||||
let ping = {
|
||||
let bucket = &mut self.node_buckets[dist];
|
||||
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
|
||||
node.address = e.clone();
|
||||
node.last_seen = Instant::now();
|
||||
node.backoff_until = Instant::now();
|
||||
node.fail_count = 0;
|
||||
true
|
||||
} else { false };
|
||||
fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
|
||||
trace!(target: "discovery", "Inserting {:?}", &e);
|
||||
|
||||
if !updated {
|
||||
added_map.insert(e.id, e.clone());
|
||||
bucket.nodes.push_front(BucketEntry::new(e));
|
||||
match self.update_bucket_record(e) {
|
||||
Ok(()) => None,
|
||||
Err(BucketError::Ourselves) => None,
|
||||
Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance))
|
||||
}.map(|(node_entry, bucket_distance)| {
|
||||
trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance);
|
||||
|
||||
let mut added = HashMap::with_capacity(1);
|
||||
added.insert(node_entry.id, node_entry.clone());
|
||||
|
||||
let node_to_ping = {
|
||||
let bucket = &mut self.node_buckets[bucket_distance];
|
||||
bucket.nodes.push_front(BucketEntry::new(node_entry));
|
||||
if bucket.nodes.len() > BUCKET_SIZE {
|
||||
select_bucket_ping(bucket.nodes.iter())
|
||||
} else { None }
|
||||
} else { None }
|
||||
};
|
||||
if let Some(node) = ping {
|
||||
self.try_ping(node, PingReason::Default);
|
||||
}
|
||||
Some(TableUpdates { added: added_map, removed: HashSet::new() })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(node) = node_to_ping {
|
||||
self.try_ping(node, PingReason::Default);
|
||||
};
|
||||
TableUpdates{added, removed: HashSet::new()}
|
||||
})
|
||||
}
|
||||
|
||||
/// Starts the discovery process at round 0
|
||||
@@ -541,10 +583,28 @@ impl<'a> Discovery<'a> {
|
||||
};
|
||||
|
||||
if let Some((node, ping_reason)) = expected_node {
|
||||
if let PingReason::FromDiscoveryRequest(target) = ping_reason {
|
||||
if let PingReason::FromDiscoveryRequest(target, validity) = ping_reason {
|
||||
self.respond_with_discovery(target, &node)?;
|
||||
// kirushik: I would prefer to probe the network id of the remote node here, and add it to the nodes list if it's on "our" net --
|
||||
// but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea.
|
||||
// So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging
|
||||
match validity {
|
||||
NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => {
|
||||
trace!(target: "discovery", "Updating node {:?} in our Kad buckets", &node);
|
||||
self.update_bucket_record(node).unwrap_or_else(|error| {
|
||||
debug!(target: "discovery", "Error occured when processing ping from a bucket node: {:?}", &error);
|
||||
});
|
||||
},
|
||||
NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) | NodeValidity::ValidNode(NodeCategory::Observed)=> {
|
||||
trace!(target: "discovery", "Updating node {:?} in the list of other_observed_nodes", &node);
|
||||
self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now()));
|
||||
},
|
||||
NodeValidity::Ourselves => (),
|
||||
}
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(self.update_node(node))
|
||||
}
|
||||
Ok(self.update_node(node))
|
||||
} else {
|
||||
debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from);
|
||||
Ok(None)
|
||||
@@ -565,31 +625,41 @@ impl<'a> Discovery<'a> {
|
||||
}
|
||||
};
|
||||
|
||||
if self.is_a_valid_known_node(&node) {
|
||||
self.respond_with_discovery(target, &node)?;
|
||||
} else {
|
||||
match self.check_validity(&node) {
|
||||
NodeValidity::Ourselves => (), // It makes no sense to respond to the discovery request from ourselves
|
||||
NodeValidity::ValidNode(_) => self.respond_with_discovery(target, &node)?,
|
||||
// Make sure the request source is actually there and responds to pings before actually responding
|
||||
self.try_ping(node, PingReason::FromDiscoveryRequest(target));
|
||||
invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(target, invalidity_reason))
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool {
|
||||
fn check_validity(&mut self, node: &NodeEntry) -> NodeValidity {
|
||||
let id_hash = keccak(node.id);
|
||||
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
|
||||
Some(dist) => dist,
|
||||
None => {
|
||||
debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node);
|
||||
return false;
|
||||
return NodeValidity::Ourselves;
|
||||
}
|
||||
};
|
||||
|
||||
let bucket = &self.node_buckets[dist];
|
||||
if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) {
|
||||
debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node);
|
||||
(known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)
|
||||
match ((known_node.address.endpoint == node.endpoint), (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
|
||||
(true, true) => NodeValidity::ValidNode(NodeCategory::Bucket),
|
||||
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Bucket),
|
||||
_ => NodeValidity::UnknownNode
|
||||
}
|
||||
} else {
|
||||
false
|
||||
self.other_observed_nodes.get_mut(&node.id).map_or(NodeValidity::UnknownNode, |(endpoint, observed_at)| {
|
||||
match ((node.endpoint==*endpoint), (observed_at.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
|
||||
(true, true) => NodeValidity::ValidNode(NodeCategory::Observed),
|
||||
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Observed),
|
||||
_ => NodeValidity::UnknownNode
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user