Node table limiting and cache for node filter (#10288)

* Fix nasty typo in NodeTable::update (add ;)

* Add limiting for NodeTable

* Add cache for NodeFilter

* Use expect instead of unwrap

* Move node in ordered_ids if it exists there in note_failure and note_success + fix expect msg

* Add comment

* Improve code style

* DRY in note_failure and note_success

* Fix nodes ordering

* Simplify match expression

* Add tests for get_index_to_insert

* Remove get_mut method from NodeTable, Add get method to NodeTable

* Fix table_last_contact_order for macos failing because of lost nanosecond precision
This commit is contained in:
Vladyslav Lupashevskyi 2019-04-05 14:30:31 +03:00 committed by Talha Cross
parent 10e1787ad1
commit 8132d38b50
5 changed files with 200 additions and 49 deletions

View File

@ -37,13 +37,17 @@ extern crate tempdir;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
use std::collections::{HashMap, VecDeque};
use std::sync::Weak; use std::sync::Weak;
use ethcore::client::{BlockChainClient, BlockId}; use ethcore::client::{BlockChainClient, BlockId, ChainNotify, NewBlocks};
use ethereum_types::{H256, Address}; use ethereum_types::{H256, Address};
use ethabi::FunctionOutputDecoder; use ethabi::FunctionOutputDecoder;
use network::{ConnectionFilter, ConnectionDirection}; use network::{ConnectionFilter, ConnectionDirection};
use devp2p::NodeId; use devp2p::NodeId;
use devp2p::MAX_NODES_IN_TABLE;
use parking_lot::RwLock;
use_contract!(peer_set, "res/peer_set.json"); use_contract!(peer_set, "res/peer_set.json");
@ -51,14 +55,27 @@ use_contract!(peer_set, "res/peer_set.json");
pub struct NodeFilter { pub struct NodeFilter {
client: Weak<BlockChainClient>, client: Weak<BlockChainClient>,
contract_address: Address, contract_address: Address,
cache: RwLock<Cache>
} }
struct Cache {
cache: HashMap<NodeId, bool>,
order: VecDeque<NodeId>
}
// Increase cache size due to possible reserved peers, which do not count in the node table size
pub const CACHE_SIZE: usize = MAX_NODES_IN_TABLE + 1024;
impl NodeFilter { impl NodeFilter {
/// Create a new instance. Accepts a contract address. /// Create a new instance. Accepts a contract address.
pub fn new(client: Weak<BlockChainClient>, contract_address: Address) -> NodeFilter { pub fn new(client: Weak<BlockChainClient>, contract_address: Address) -> NodeFilter {
NodeFilter { NodeFilter {
client, client,
contract_address, contract_address,
cache: RwLock::new(Cache{
cache: HashMap::with_capacity(CACHE_SIZE),
order: VecDeque::with_capacity(CACHE_SIZE)
})
} }
} }
} }
@ -70,6 +87,10 @@ impl ConnectionFilter for NodeFilter {
None => return false, None => return false,
}; };
if let Some(allowed) = self.cache.read().cache.get(connecting_id) {
return *allowed;
}
let address = self.contract_address; let address = self.contract_address;
let own_low = H256::from_slice(&own_id[0..32]); let own_low = H256::from_slice(&own_id[0..32]);
let own_high = H256::from_slice(&own_id[32..64]); let own_high = H256::from_slice(&own_id[32..64]);
@ -83,11 +104,26 @@ impl ConnectionFilter for NodeFilter {
debug!("Error callling peer set contract: {:?}", e); debug!("Error callling peer set contract: {:?}", e);
false false
}); });
let mut cache = self.cache.write();
if cache.cache.len() == CACHE_SIZE {
let poped = cache.order.pop_front().unwrap();
cache.cache.remove(&poped).is_none();
};
if cache.cache.insert(*connecting_id, allowed).is_none() {
cache.order.push_back(*connecting_id);
}
allowed allowed
} }
} }
impl ChainNotify for NodeFilter {
fn new_blocks(&self, _new_blocks: NewBlocks) {
let mut cache = self.cache.write();
cache.cache.clear();
cache.order.clear();
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};

View File

@ -583,7 +583,9 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
let private_tx_provider = private_tx_service.provider(); let private_tx_provider = private_tx_service.provider();
let connection_filter = connection_filter_address.map(|a| Arc::new(NodeFilter::new(Arc::downgrade(&client) as Weak<BlockChainClient>, a))); let connection_filter = connection_filter_address.map(|a| Arc::new(NodeFilter::new(Arc::downgrade(&client) as Weak<BlockChainClient>, a)));
let snapshot_service = service.snapshot_service(); let snapshot_service = service.snapshot_service();
if let Some(filter) = connection_filter.clone() {
service.add_notify(filter.clone());
}
// initialize the local node information store. // initialize the local node information store.
let store = { let store = {
let db = service.db(); let db = service.db();

View File

@ -616,8 +616,8 @@ impl Host {
let socket = { let socket = {
let address = { let address = {
let mut nodes = self.nodes.write(); let mut nodes = self.nodes.read();
if let Some(node) = nodes.get_mut(id) { if let Some(node) = nodes.get(id) {
node.endpoint.address node.endpoint.address
} else { } else {
debug!(target: "network", "Connection to expired node aborted"); debug!(target: "network", "Connection to expired node aborted");

View File

@ -113,6 +113,6 @@ pub use service::NetworkService;
pub use host::NetworkContext; pub use host::NetworkContext;
pub use io::TimerToken; pub use io::TimerToken;
pub use node_table::{validate_node_url, NodeId}; pub use node_table::{validate_node_url, NodeId, MAX_NODES_IN_TABLE};
const PROTOCOL_VERSION: u32 = 5; const PROTOCOL_VERSION: u32 = 5;

View File

@ -23,6 +23,7 @@ use serde_json;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display, Formatter}; use std::fmt::{self, Display, Formatter};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::iter::FromIterator;
use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr};
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
@ -235,22 +236,27 @@ impl Hash for Node {
} }
} }
const MAX_NODES: usize = 1024; pub const MAX_NODES_IN_TABLE: usize = 4096;
const MAX_NODES_IN_FILE: usize = 1024;
const NODES_FILE: &str = "nodes.json"; const NODES_FILE: &str = "nodes.json";
/// Node table backed by disk file. /// Node table backed by disk file.
pub struct NodeTable { pub struct NodeTable {
nodes: HashMap<NodeId, Node>, nodes: HashMap<NodeId, Node>,
ordered_ids: Vec<NodeId>,
useless_nodes: HashSet<NodeId>, useless_nodes: HashSet<NodeId>,
path: Option<String>, path: Option<String>,
} }
impl NodeTable { impl NodeTable {
pub fn new(path: Option<String>) -> NodeTable { pub fn new(path: Option<String>) -> NodeTable {
let nodes = NodeTable::load(path.clone());
let ordered_ids = NodeTable::make_ordered_entries(&nodes).iter().map(|m| m.id).collect();
NodeTable { NodeTable {
path: path.clone(), path,
nodes: NodeTable::load(path), nodes,
useless_nodes: HashSet::new(), useless_nodes: HashSet::new(),
ordered_ids
} }
} }
@ -258,24 +264,72 @@ impl NodeTable {
pub fn add_node(&mut self, mut node: Node) { pub fn add_node(&mut self, mut node: Node) {
// preserve node last_contact // preserve node last_contact
node.last_contact = self.nodes.get(&node.id).and_then(|n| n.last_contact); node.last_contact = self.nodes.get(&node.id).and_then(|n| n.last_contact);
self.nodes.insert(node.id, node); let id = node.id;
if self.ordered_ids.len() == MAX_NODES_IN_TABLE {
self.nodes.remove(&self.ordered_ids.pop().expect("ordered_ids is not empty; qed"));
};
let index = self.get_index_to_insert(node.last_contact);
if self.nodes.insert(node.id, node).is_none() {
self.ordered_ids.insert(index, id);
};
} }
/// Returns a list of ordered nodes according to their most recent contact /// Get index in the ordered entries vector to insert node based on its last contact value
/// and filtering useless nodes. The algorithm for creating the sorted nodes fn get_index_to_insert(&self, last_contact: Option<NodeContact>) -> usize {
/// is: let len = self.ordered_ids.len();
let mut index = len;
match last_contact {
Some(NodeContact::Success(last_contact_time)) => {
if let Some(i) = self.ordered_ids.iter().position(|&item| {
match self.nodes.get(&item).expect("nodes and ordered_ids do not get out of sync; qed").last_contact {
Some(NodeContact::Success(last)) => last < last_contact_time,
_ => true
}
}) { index = i; };
},
None => {
if let Some(i) = self.ordered_ids.iter().position(|&item| {
match self.nodes.get(&item).expect("nodes and ordered_ids do not get out of sync; qed").last_contact {
Some(NodeContact::Success(_)) => false,
_ => true
}
}) { index = i; };
},
Some(NodeContact::Failure(last_contact_time)) => {
if let Some(i) = self.ordered_ids.iter().rev().position(|&item| {
match self.nodes.get(&item).expect("nodes and ordered_ids do not get out of sync; qed").last_contact {
Some(NodeContact::Failure(last)) => last < last_contact_time,
_ => true
}
}) { index = len - i; };
}
};
index
}
/// Returns a list of ordered entries from table
fn ordered(&self) -> Vec<&Node> {
Vec::from_iter(
self.ordered_ids
.iter()
.filter(|id| !self.useless_nodes.contains(&id))
.map(|id| self.nodes.get(&id).expect("nodes and ordered_ids do not get out of sync; qed"))
)
}
/// Makes a list of ordered nodes according to their most recent contact.
/// The algorithm for creating the sorted nodes is:
/// - Contacts that aren't recent (older than 1 week) are discarded /// - Contacts that aren't recent (older than 1 week) are discarded
/// - (1) Nodes with a successful contact are ordered (most recent success first) /// - (1) Nodes with a successful contact are ordered (most recent success first)
/// - (2) Nodes with unknown contact (older than 1 week or new nodes) are randomly shuffled /// - (2) Nodes with unknown contact (older than 1 week or new nodes) are randomly shuffled
/// - (3) Nodes with a failed contact are ordered (oldest failure first) /// - (3) Nodes with a failed contact are ordered (oldest failure first)
/// - The final result is the concatenation of (1), (2) and (3) /// - The final result is the concatenation of (1), (2) and (3)
fn ordered_entries(&self) -> Vec<&Node> { fn make_ordered_entries(node_table: &HashMap<NodeId, Node>) -> Vec<&Node> {
let mut success = Vec::new(); let mut success = Vec::new();
let mut failures = Vec::new(); let mut failures = Vec::new();
let mut unknown = Vec::new(); let mut unknown = Vec::new();
let nodes = self.nodes.values() let nodes = node_table.values();
.filter(|n| !self.useless_nodes.contains(&n.id));
for node in nodes { for node in nodes {
// discard contact points older that aren't recent // discard contact points older that aren't recent
@ -316,7 +370,7 @@ impl NodeTable {
/// Returns node ids sorted by failure percentage, for nodes with the same failure percentage the absolute number of /// Returns node ids sorted by failure percentage, for nodes with the same failure percentage the absolute number of
/// failures is considered. /// failures is considered.
pub fn nodes(&self, filter: &IpFilter) -> Vec<NodeId> { pub fn nodes(&self, filter: &IpFilter) -> Vec<NodeId> {
self.ordered_entries().iter() self.ordered().iter()
.filter(|n| n.endpoint.is_allowed(&filter)) .filter(|n| n.endpoint.is_allowed(&filter))
.map(|n| n.id) .map(|n| n.id)
.collect() .collect()
@ -325,15 +379,15 @@ impl NodeTable {
/// Ordered list of all entries by failure percentage, for nodes with the same failure percentage the absolute /// Ordered list of all entries by failure percentage, for nodes with the same failure percentage the absolute
/// number of failures is considered. /// number of failures is considered.
pub fn entries(&self) -> Vec<NodeEntry> { pub fn entries(&self) -> Vec<NodeEntry> {
self.ordered_entries().iter().map(|n| NodeEntry { self.ordered().iter().map(|n| NodeEntry {
endpoint: n.endpoint.clone(), endpoint: n.endpoint.clone(),
id: n.id, id: n.id,
}).collect() }).collect()
} }
/// Get particular node /// Get particular node
pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut Node> { pub fn get(&self, id: &NodeId) -> Option<&Node> {
self.nodes.get_mut(id) self.nodes.get(id)
} }
/// Check if a node exists in the table. /// Check if a node exists in the table.
@ -344,28 +398,49 @@ impl NodeTable {
/// Apply table changes coming from discovery /// Apply table changes coming from discovery
pub fn update(&mut self, mut update: TableUpdates, reserved: &HashSet<NodeId>) { pub fn update(&mut self, mut update: TableUpdates, reserved: &HashSet<NodeId>) {
for (_, node) in update.added.drain() { for (_, node) in update.added.drain() {
let entry = self.nodes.entry(node.id).or_insert_with(|| Node::new(node.id, node.endpoint.clone())); let mut add = false;
entry.endpoint = node.endpoint; {
} let entry = self.nodes.entry(node.id).or_insert_with(|| {
add = true;
Node::new(node.id, node.endpoint.clone())
});
entry.endpoint = node.endpoint;
}
if add {
if self.ordered_ids.len() == MAX_NODES_IN_TABLE {
self.nodes.remove(&self.ordered_ids.pop().expect("ordered_ids is not empty; qed"));
};
let index = self.get_index_to_insert(None);
self.ordered_ids.insert(index, node.id);
};
};
for r in update.removed { for r in update.removed {
if !reserved.contains(&r) { if !reserved.contains(&r) {
self.ordered_ids.iter().position(|&i| r == i).map(|p| self.ordered_ids.remove(p));
self.nodes.remove(&r); self.nodes.remove(&r);
} }
} }
} }
fn update_ordered_ids(&mut self, id: &NodeId, last_contact: Option<NodeContact>) {
if let Some(node) = self.nodes.get_mut(id) {
node.last_contact = last_contact;
}
if let Some(pos) = self.ordered_ids.iter().position(|i| id == i) {
self.ordered_ids.remove(pos);
let index = self.get_index_to_insert(last_contact);
self.ordered_ids.insert(index, *id);
}
}
/// Set last contact as failure for a node /// Set last contact as failure for a node
pub fn note_failure(&mut self, id: &NodeId) { pub fn note_failure(&mut self, id: &NodeId) {
if let Some(node) = self.nodes.get_mut(id) { self.update_ordered_ids(id, Some(NodeContact::failure()));
node.last_contact = Some(NodeContact::failure());
}
} }
/// Set last contact as success for a node /// Set last contact as success for a node
pub fn note_success(&mut self, id: &NodeId) { pub fn note_success(&mut self, id: &NodeId) {
if let Some(node) = self.nodes.get_mut(id) { self.update_ordered_ids(id, Some(NodeContact::success()));
node.last_contact = Some(NodeContact::success());
}
} }
/// Mark as useless, no further attempts to connect until next call to `clear_useless`. /// Mark as useless, no further attempts to connect until next call to `clear_useless`.
@ -392,7 +467,7 @@ impl NodeTable {
let node_ids = self.nodes(&IpFilter::default()); let node_ids = self.nodes(&IpFilter::default());
let nodes = node_ids.into_iter() let nodes = node_ids.into_iter()
.map(|id| self.nodes.get(&id).expect("self.nodes() only returns node IDs from self.nodes")) .map(|id| self.nodes.get(&id).expect("self.nodes() only returns node IDs from self.nodes"))
.take(MAX_NODES) .take(MAX_NODES_IN_FILE)
.map(Into::into) .map(Into::into)
.collect(); .collect();
let table = json::NodeTable { nodes }; let table = json::NodeTable { nodes };
@ -523,6 +598,8 @@ mod tests {
use super::*; use super::*;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr}; use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use ethereum_types::H512; use ethereum_types::H512;
use std::thread::sleep;
use std::time::Duration;
use std::str::FromStr; use std::str::FromStr;
use tempdir::TempDir; use tempdir::TempDir;
use ipnetwork::IpNetwork; use ipnetwork::IpNetwork;
@ -604,49 +681,85 @@ mod tests {
let id6 = H512::from_str("f979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap(); let id6 = H512::from_str("f979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap();
let mut table = NodeTable::new(None); let mut table = NodeTable::new(None);
assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0);
assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 0);
assert_eq!(table.get_index_to_insert(None), 0);
// sleep 1 mcs is added because nanosecond precision was lost since mac os x high sierra update
// https://github.com/paritytech/parity-ethereum/issues/9632
table.add_node(node1); table.add_node(node1);
sleep(Duration::from_micros(1));
assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0);
assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 1);
assert_eq!(table.get_index_to_insert(None), 0);
table.add_node(node2); table.add_node(node2);
sleep(Duration::from_micros(1));
assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0);
assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 2);
assert_eq!(table.get_index_to_insert(None), 0);
table.add_node(node3); table.add_node(node3);
sleep(Duration::from_micros(1));
table.add_node(node4); table.add_node(node4);
sleep(Duration::from_micros(1));
table.add_node(node5); table.add_node(node5);
sleep(Duration::from_micros(1));
table.add_node(node6); table.add_node(node6);
sleep(Duration::from_micros(1));
// failures - nodes 1 & 2 // failures - nodes 1 & 2
table.note_failure(&id1); table.note_failure(&id1);
sleep(Duration::from_micros(1));
let time_in_between = SystemTime::now();
sleep(Duration::from_micros(1));
table.note_failure(&id2); table.note_failure(&id2);
sleep(Duration::from_micros(1));
// success - nodes 3 & 4 assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0);
assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 6);
assert_eq!(table.get_index_to_insert(Some(NodeContact::Failure(time_in_between))), 5);
assert_eq!(table.get_index_to_insert(Some(NodeContact::Failure(time::UNIX_EPOCH))), 4);
assert_eq!(table.get_index_to_insert(None), 0);
// success - nodes 3,4,5 (5 - the oldest)
table.note_success(&id5);
sleep(Duration::from_micros(1));
table.note_success(&id3); table.note_success(&id3);
table.note_success(&id4); sleep(Duration::from_micros(1));
// success - node 5 (old contact) assert_eq!(table.get_index_to_insert(Some(NodeContact::Success(time::UNIX_EPOCH))), 2);
table.get_mut(&id5).unwrap().last_contact = Some(NodeContact::Success(time::UNIX_EPOCH)); assert_eq!(table.get_index_to_insert(None), 2);
let time_in_between = SystemTime::now();
sleep(Duration::from_micros(1));
table.note_success(&id4);
sleep(Duration::from_micros(1));
assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0);
assert_eq!(table.get_index_to_insert(Some(NodeContact::Success(time_in_between))), 1);
assert_eq!(table.get_index_to_insert(Some(NodeContact::Success(time::UNIX_EPOCH))), 3);
assert_eq!(table.get_index_to_insert(None), 3);
// unknown - node 6 // unknown - node 6
// nodes are also ordered according to their addition time // nodes are also ordered according to their addition time
//
// nanosecond precision lost since mac os x high sierra update so let's not compare their order
// https://github.com/paritytech/parity-ethereum/issues/9632
let r = table.nodes(&IpFilter::default()); let r = table.nodes(&IpFilter::default());
// most recent success assert_eq!(r[0][..], id4[..]); // most recent success
assert!( assert_eq!(r[1][..], id3[..]);
(r[0] == id4 && r[1] == id3) ||
(r[0] == id3 && r[1] == id4)
);
// unknown (old contacts and new nodes), randomly shuffled // unknown (old contacts and new nodes), randomly shuffled
assert!( assert!(
(r[2] == id5 && r[3] == id6) || r[2][..] == id5[..] && r[3][..] == id6[..] ||
(r[2] == id6 && r[3] == id5) r[2][..] == id6[..] && r[3][..] == id5[..]
); );
// oldest failure assert_eq!(r[4][..], id1[..]); // oldest failure
assert!( assert_eq!(r[5][..], id2[..]);
(r[4] == id1 && r[5] == id2) ||
(r[4] == id2 && r[5] == id1)
);
} }
#[test] #[test]