Node table sorting according to last contact data (#8541)
* network-devp2p: sort nodes in node table using last contact data * network-devp2p: rename node contact types in node table json output * network-devp2p: fix node table tests * network-devp2p: note node failure when failed to establish connection * network-devp2p: handle UselessPeer error * network-devp2p: note failure when marking node as useless
This commit is contained in:
parent
528497b86a
commit
a7a46f4253
@ -105,10 +105,13 @@ pub struct NetworkContext<'s> {
|
||||
|
||||
impl<'s> NetworkContext<'s> {
|
||||
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
|
||||
fn new(io: &'s IoContext<NetworkIoMessage>,
|
||||
fn new(
|
||||
io: &'s IoContext<NetworkIoMessage>,
|
||||
protocol: ProtocolId,
|
||||
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>,
|
||||
reserved_peers: &'s HashSet<NodeId>) -> NetworkContext<'s> {
|
||||
session: Option<SharedSession>,
|
||||
sessions: Arc<RwLock<Slab<SharedSession>>>,
|
||||
reserved_peers: &'s HashSet<NodeId>,
|
||||
) -> NetworkContext<'s> {
|
||||
let id = session.as_ref().map(|s| s.lock().token());
|
||||
NetworkContext {
|
||||
io: io,
|
||||
@ -585,10 +588,8 @@ impl Host {
|
||||
let address = {
|
||||
let mut nodes = self.nodes.write();
|
||||
if let Some(node) = nodes.get_mut(id) {
|
||||
node.attempts += 1;
|
||||
node.endpoint.address
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
debug!(target: "network", "Connection to expired node aborted");
|
||||
return;
|
||||
}
|
||||
@ -600,6 +601,7 @@ impl Host {
|
||||
},
|
||||
Err(e) => {
|
||||
debug!(target: "network", "{}: Can't connect to address {:?}: {:?}", id, address, e);
|
||||
self.nodes.write().note_failure(&id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -685,10 +687,12 @@ impl Host {
|
||||
Err(e) => {
|
||||
let s = session.lock();
|
||||
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
|
||||
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
|
||||
if let ErrorKind::Disconnect(DisconnectReason::UselessPeer) = *e.kind() {
|
||||
if let Some(id) = s.id() {
|
||||
if !self.reserved_nodes.read().contains(id) {
|
||||
self.nodes.write().mark_as_useless(id);
|
||||
let mut nodes = self.nodes.write();
|
||||
nodes.note_failure(&id);
|
||||
nodes.mark_as_useless(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -754,6 +758,10 @@ impl Host {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Note connection success
|
||||
self.nodes.write().note_success(&id);
|
||||
|
||||
for (p, _) in self.handlers.read().iter() {
|
||||
if s.have_capability(*p) {
|
||||
ready_data.push(*p);
|
||||
@ -1024,7 +1032,9 @@ impl IoHandler<NetworkIoMessage> for Host {
|
||||
if let Some(session) = session {
|
||||
session.lock().disconnect(io, DisconnectReason::DisconnectRequested);
|
||||
if let Some(id) = session.lock().id() {
|
||||
self.nodes.write().mark_as_useless(id)
|
||||
let mut nodes = self.nodes.write();
|
||||
nodes.note_failure(&id);
|
||||
nodes.mark_as_useless(id);
|
||||
}
|
||||
}
|
||||
trace!(target: "network", "Disabling peer {}", peer);
|
||||
|
@ -21,6 +21,8 @@ use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr,
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::{fs, mem, slice};
|
||||
use std::time::{self, Duration, SystemTime};
|
||||
use rand::{self, Rng};
|
||||
use ethereum_types::H512;
|
||||
use rlp::{Rlp, RlpStream, DecoderError};
|
||||
use network::{Error, ErrorKind, AllowIP, IpFilter};
|
||||
@ -128,40 +130,64 @@ impl FromStr for NodeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Copy, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
||||
pub enum PeerType {
|
||||
_Required,
|
||||
Optional
|
||||
}
|
||||
|
||||
/// A type for representing an interaction (contact) with a node at a given time
|
||||
/// that was either a success or a failure.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum NodeContact {
|
||||
Success(SystemTime),
|
||||
Failure(SystemTime),
|
||||
}
|
||||
|
||||
impl NodeContact {
|
||||
fn success() -> NodeContact {
|
||||
NodeContact::Success(SystemTime::now())
|
||||
}
|
||||
|
||||
fn failure() -> NodeContact {
|
||||
NodeContact::Failure(SystemTime::now())
|
||||
}
|
||||
|
||||
fn time(&self) -> SystemTime {
|
||||
match *self {
|
||||
NodeContact::Success(t) | NodeContact::Failure(t) => t
|
||||
}
|
||||
}
|
||||
|
||||
/// Filters and old contact, returning `None` if it happened longer than a
|
||||
/// week ago.
|
||||
fn recent(&self) -> Option<&NodeContact> {
|
||||
let t = self.time();
|
||||
if let Ok(d) = t.elapsed() {
|
||||
if d < Duration::from_secs(60 * 60 * 24 * 7) {
|
||||
return Some(self);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Node {
|
||||
pub id: NodeId,
|
||||
pub endpoint: NodeEndpoint,
|
||||
pub peer_type: PeerType,
|
||||
pub attempts: u32,
|
||||
pub failures: u32,
|
||||
pub last_contact: Option<NodeContact>,
|
||||
}
|
||||
|
||||
const DEFAULT_FAILURE_PERCENTAGE: usize = 50;
|
||||
|
||||
impl Node {
|
||||
pub fn new(id: NodeId, endpoint: NodeEndpoint) -> Node {
|
||||
Node {
|
||||
id: id,
|
||||
endpoint: endpoint,
|
||||
peer_type: PeerType::Optional,
|
||||
attempts: 0,
|
||||
failures: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the node's failure percentage (0..100) in buckets of 5%. If there are 0 connection attempts for this
|
||||
/// node the default failure percentage is returned (50%).
|
||||
pub fn failure_percentage(&self) -> usize {
|
||||
if self.attempts == 0 {
|
||||
DEFAULT_FAILURE_PERCENTAGE
|
||||
} else {
|
||||
(self.failures * 100 / self.attempts / 5 * 5) as usize
|
||||
last_contact: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -191,8 +217,7 @@ impl FromStr for Node {
|
||||
id: id,
|
||||
endpoint: endpoint,
|
||||
peer_type: PeerType::Optional,
|
||||
attempts: 0,
|
||||
failures: 0,
|
||||
last_contact: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -231,28 +256,61 @@ impl NodeTable {
|
||||
|
||||
/// Add a node to table
|
||||
pub fn add_node(&mut self, mut node: Node) {
|
||||
// preserve attempts and failure counter
|
||||
let (attempts, failures) =
|
||||
self.nodes.get(&node.id).map_or((0, 0), |n| (n.attempts, n.failures));
|
||||
|
||||
node.attempts = attempts;
|
||||
node.failures = failures;
|
||||
|
||||
// preserve node last_contact
|
||||
node.last_contact = self.nodes.get(&node.id).and_then(|n| n.last_contact);
|
||||
self.nodes.insert(node.id.clone(), node);
|
||||
}
|
||||
|
||||
/// Returns a list of ordered nodes according to their most recent contact
|
||||
/// and filtering useless nodes. The algorithm for creating the sorted nodes
|
||||
/// is:
|
||||
/// - Contacts that aren't recent (older than 1 week) are discarded
|
||||
/// - (1) Nodes with a successful contact are ordered (most recent success first)
|
||||
/// - (2) Nodes with unknown contact (older than 1 week or new nodes) are randomly shuffled
|
||||
/// - (3) Nodes with a failed contact are ordered (oldest failure first)
|
||||
/// - The final result is the concatenation of (1), (2) and (3)
|
||||
fn ordered_entries(&self) -> Vec<&Node> {
|
||||
let mut refs: Vec<&Node> = self.nodes.values()
|
||||
.filter(|n| !self.useless_nodes.contains(&n.id))
|
||||
.collect();
|
||||
let mut success = Vec::new();
|
||||
let mut failures = Vec::new();
|
||||
let mut unknown = Vec::new();
|
||||
|
||||
refs.sort_by(|a, b| {
|
||||
a.failure_percentage().cmp(&b.failure_percentage())
|
||||
.then_with(|| a.failures.cmp(&b.failures))
|
||||
.then_with(|| b.attempts.cmp(&a.attempts)) // we use reverse ordering for number of attempts
|
||||
let nodes = self.nodes.values()
|
||||
.filter(|n| !self.useless_nodes.contains(&n.id));
|
||||
|
||||
for node in nodes {
|
||||
// discard contact points older that aren't recent
|
||||
match node.last_contact.as_ref().and_then(|c| c.recent()) {
|
||||
Some(&NodeContact::Success(_)) => {
|
||||
success.push(node);
|
||||
},
|
||||
Some(&NodeContact::Failure(_)) => {
|
||||
failures.push(node);
|
||||
},
|
||||
None => {
|
||||
unknown.push(node);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
success.sort_by(|a, b| {
|
||||
let a = a.last_contact.expect("vector only contains values with defined last_contact; qed");
|
||||
let b = b.last_contact.expect("vector only contains values with defined last_contact; qed");
|
||||
// inverse ordering, most recent successes come first
|
||||
b.time().cmp(&a.time())
|
||||
});
|
||||
|
||||
refs
|
||||
failures.sort_by(|a, b| {
|
||||
let a = a.last_contact.expect("vector only contains values with defined last_contact; qed");
|
||||
let b = b.last_contact.expect("vector only contains values with defined last_contact; qed");
|
||||
// normal ordering, most distant failures come first
|
||||
a.time().cmp(&b.time())
|
||||
});
|
||||
|
||||
rand::thread_rng().shuffle(&mut unknown);
|
||||
|
||||
success.append(&mut unknown);
|
||||
success.append(&mut failures);
|
||||
success
|
||||
}
|
||||
|
||||
/// Returns node ids sorted by failure percentage, for nodes with the same failure percentage the absolute number of
|
||||
@ -296,10 +354,17 @@ impl NodeTable {
|
||||
}
|
||||
}
|
||||
|
||||
/// Increase failure counte for a node
|
||||
/// Set last contact as failure for a node
|
||||
pub fn note_failure(&mut self, id: &NodeId) {
|
||||
if let Some(node) = self.nodes.get_mut(id) {
|
||||
node.failures += 1;
|
||||
node.last_contact = Some(NodeContact::failure());
|
||||
}
|
||||
}
|
||||
|
||||
/// Set last contact as success for a node
|
||||
pub fn note_success(&mut self, id: &NodeId) {
|
||||
if let Some(node) = self.nodes.get_mut(id) {
|
||||
node.last_contact = Some(NodeContact::success());
|
||||
}
|
||||
}
|
||||
|
||||
@ -396,19 +461,38 @@ mod json {
|
||||
pub nodes: Vec<Node>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum NodeContact {
|
||||
#[serde(rename = "success")]
|
||||
Success(u64),
|
||||
#[serde(rename = "failure")]
|
||||
Failure(u64),
|
||||
}
|
||||
|
||||
impl NodeContact {
|
||||
pub fn into_node_contact(self) -> super::NodeContact {
|
||||
match self {
|
||||
NodeContact::Success(s) => super::NodeContact::Success(
|
||||
time::UNIX_EPOCH + Duration::from_secs(s)
|
||||
),
|
||||
NodeContact::Failure(s) => super::NodeContact::Failure(
|
||||
time::UNIX_EPOCH + Duration::from_secs(s)
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Node {
|
||||
pub url: String,
|
||||
pub attempts: u32,
|
||||
pub failures: u32,
|
||||
pub last_contact: Option<NodeContact>,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub fn into_node(self) -> Option<super::Node> {
|
||||
match super::Node::from_str(&self.url) {
|
||||
Ok(mut node) => {
|
||||
node.attempts = self.attempts;
|
||||
node.failures = self.failures;
|
||||
node.last_contact = self.last_contact.map(|c| c.into_node_contact());
|
||||
Some(node)
|
||||
},
|
||||
_ => None,
|
||||
@ -418,10 +502,18 @@ mod json {
|
||||
|
||||
impl<'a> From<&'a super::Node> for Node {
|
||||
fn from(node: &'a super::Node) -> Self {
|
||||
let last_contact = node.last_contact.and_then(|c| {
|
||||
match c {
|
||||
super::NodeContact::Success(t) =>
|
||||
t.duration_since(time::UNIX_EPOCH).ok().map(|d| NodeContact::Success(d.as_secs())),
|
||||
super::NodeContact::Failure(t) =>
|
||||
t.duration_since(time::UNIX_EPOCH).ok().map(|d| NodeContact::Failure(d.as_secs())),
|
||||
}
|
||||
});
|
||||
|
||||
Node {
|
||||
url: format!("{}", node),
|
||||
attempts: node.attempts,
|
||||
failures: node.failures,
|
||||
last_contact
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -464,42 +556,54 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn table_failure_percentage_order() {
|
||||
fn table_last_contact_order() {
|
||||
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node3 = Node::from_str("enode://c979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node4 = Node::from_str("enode://d979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node5 = Node::from_str("enode://e979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node6 = Node::from_str("enode://f979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let id1 = H512::from_str("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id2 = H512::from_str("b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id3 = H512::from_str("c979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id4 = H512::from_str("d979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id5 = H512::from_str("e979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id6 = H512::from_str("f979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let mut table = NodeTable::new(None);
|
||||
|
||||
table.add_node(node1);
|
||||
table.add_node(node2);
|
||||
table.add_node(node3);
|
||||
table.add_node(node4);
|
||||
table.add_node(node5);
|
||||
table.add_node(node6);
|
||||
|
||||
// node 1 - failure percentage 100%
|
||||
table.get_mut(&id1).unwrap().attempts = 2;
|
||||
// failures - nodes 1 & 2
|
||||
table.note_failure(&id1);
|
||||
table.note_failure(&id1);
|
||||
|
||||
// node2 - failure percentage 33%
|
||||
table.get_mut(&id2).unwrap().attempts = 3;
|
||||
table.note_failure(&id2);
|
||||
|
||||
// node3 - failure percentage 0%
|
||||
table.get_mut(&id3).unwrap().attempts = 1;
|
||||
// success - nodes 3 & 4
|
||||
table.note_success(&id3);
|
||||
table.note_success(&id4);
|
||||
|
||||
// node4 - failure percentage 50% (default when no attempts)
|
||||
// success - node 5 (old contact)
|
||||
table.get_mut(&id5).unwrap().last_contact = Some(NodeContact::Success(time::UNIX_EPOCH));
|
||||
|
||||
// unknown - node 6
|
||||
|
||||
let r = table.nodes(IpFilter::default());
|
||||
|
||||
assert_eq!(r[0][..], id3[..]);
|
||||
assert_eq!(r[1][..], id2[..]);
|
||||
assert_eq!(r[2][..], id4[..]);
|
||||
assert_eq!(r[3][..], id1[..]);
|
||||
assert_eq!(r[0][..], id4[..]); // most recent success
|
||||
assert_eq!(r[1][..], id3[..]);
|
||||
|
||||
// unknown (old contacts and new nodes), randomly shuffled
|
||||
assert!(
|
||||
r[2][..] == id5[..] && r[3][..] == id6[..] ||
|
||||
r[2][..] == id6[..] && r[3][..] == id5[..]
|
||||
);
|
||||
|
||||
assert_eq!(r[4][..], id1[..]); // oldest failure
|
||||
assert_eq!(r[5][..], id2[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -507,23 +611,27 @@ mod tests {
|
||||
let tempdir = TempDir::new("").unwrap();
|
||||
let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let node3 = Node::from_str("enode://c979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@22.99.55.44:7770").unwrap();
|
||||
let id1 = H512::from_str("a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id2 = H512::from_str("b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
let id3 = H512::from_str("c979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
|
||||
|
||||
{
|
||||
let mut table = NodeTable::new(Some(tempdir.path().to_str().unwrap().to_owned()));
|
||||
table.add_node(node1);
|
||||
table.add_node(node2);
|
||||
table.add_node(node3);
|
||||
|
||||
table.get_mut(&id1).unwrap().attempts = 1;
|
||||
table.get_mut(&id2).unwrap().attempts = 1;
|
||||
table.note_failure(&id2);
|
||||
table.note_success(&id2);
|
||||
table.note_failure(&id3);
|
||||
}
|
||||
|
||||
{
|
||||
let table = NodeTable::new(Some(tempdir.path().to_str().unwrap().to_owned()));
|
||||
let r = table.nodes(IpFilter::default());
|
||||
assert_eq!(r[0][..], id1[..]);
|
||||
assert_eq!(r[1][..], id2[..]);
|
||||
assert_eq!(r[0][..], id2[..]); // latest success
|
||||
assert_eq!(r[1][..], id1[..]); // unknown
|
||||
assert_eq!(r[2][..], id3[..]); // oldest failure
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user