diff --git a/ethcore/node-filter/src/lib.rs b/ethcore/node-filter/src/lib.rs index 816bb84a8..92abb8974 100644 --- a/ethcore/node-filter/src/lib.rs +++ b/ethcore/node-filter/src/lib.rs @@ -37,13 +37,17 @@ extern crate tempdir; #[macro_use] extern crate log; +use std::collections::{HashMap, VecDeque}; use std::sync::Weak; -use ethcore::client::{BlockChainClient, BlockId}; +use ethcore::client::{BlockChainClient, BlockId, ChainNotify, NewBlocks}; + use ethereum_types::{H256, Address}; use ethabi::FunctionOutputDecoder; use network::{ConnectionFilter, ConnectionDirection}; use devp2p::NodeId; +use devp2p::MAX_NODES_IN_TABLE; +use parking_lot::RwLock; use_contract!(peer_set, "res/peer_set.json"); @@ -51,14 +55,27 @@ use_contract!(peer_set, "res/peer_set.json"); pub struct NodeFilter { client: Weak, contract_address: Address, + cache: RwLock } +struct Cache { + cache: HashMap, + order: VecDeque +} + +// Increase cache size due to possible reserved peers, which do not count in the node table size +pub const CACHE_SIZE: usize = MAX_NODES_IN_TABLE + 1024; + impl NodeFilter { /// Create a new instance. Accepts a contract address. pub fn new(client: Weak, contract_address: Address) -> NodeFilter { NodeFilter { client, contract_address, + cache: RwLock::new(Cache{ + cache: HashMap::with_capacity(CACHE_SIZE), + order: VecDeque::with_capacity(CACHE_SIZE) + }) } } } @@ -70,6 +87,10 @@ impl ConnectionFilter for NodeFilter { None => return false, }; + if let Some(allowed) = self.cache.read().cache.get(connecting_id) { + return *allowed; + } + let address = self.contract_address; let own_low = H256::from_slice(&own_id[0..32]); let own_high = H256::from_slice(&own_id[32..64]); @@ -83,11 +104,26 @@ impl ConnectionFilter for NodeFilter { debug!("Error callling peer set contract: {:?}", e); false }); - + let mut cache = self.cache.write(); + if cache.cache.len() == CACHE_SIZE { + let poped = cache.order.pop_front().unwrap(); + cache.cache.remove(&poped).is_none(); + }; + if cache.cache.insert(*connecting_id, allowed).is_none() { + cache.order.push_back(*connecting_id); + } allowed } } +impl ChainNotify for NodeFilter { + fn new_blocks(&self, _new_blocks: NewBlocks) { + let mut cache = self.cache.write(); + cache.cache.clear(); + cache.order.clear(); + } +} + #[cfg(test)] mod test { use std::sync::{Arc, Weak}; diff --git a/parity/run.rs b/parity/run.rs index b6f8c53a0..a4c24108c 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -583,7 +583,9 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: let private_tx_provider = private_tx_service.provider(); let connection_filter = connection_filter_address.map(|a| Arc::new(NodeFilter::new(Arc::downgrade(&client) as Weak, a))); let snapshot_service = service.snapshot_service(); - + if let Some(filter) = connection_filter.clone() { + service.add_notify(filter.clone()); + } // initialize the local node information store. let store = { let db = service.db(); diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index f067260ce..3dc107d2b 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -616,8 +616,8 @@ impl Host { let socket = { let address = { - let mut nodes = self.nodes.write(); - if let Some(node) = nodes.get_mut(id) { + let mut nodes = self.nodes.read(); + if let Some(node) = nodes.get(id) { node.endpoint.address } else { debug!(target: "network", "Connection to expired node aborted"); diff --git a/util/network-devp2p/src/lib.rs b/util/network-devp2p/src/lib.rs index 6082049e8..3f1999bc6 100644 --- a/util/network-devp2p/src/lib.rs +++ b/util/network-devp2p/src/lib.rs @@ -113,6 +113,6 @@ pub use service::NetworkService; pub use host::NetworkContext; pub use io::TimerToken; -pub use node_table::{validate_node_url, NodeId}; +pub use node_table::{validate_node_url, NodeId, MAX_NODES_IN_TABLE}; const PROTOCOL_VERSION: u32 = 5; diff --git a/util/network-devp2p/src/node_table.rs b/util/network-devp2p/src/node_table.rs index 3cee93fd9..db5189008 100644 --- a/util/network-devp2p/src/node_table.rs +++ b/util/network-devp2p/src/node_table.rs @@ -23,6 +23,7 @@ use serde_json; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display, Formatter}; use std::hash::{Hash, Hasher}; +use std::iter::FromIterator; use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use std::path::PathBuf; use std::str::FromStr; @@ -235,22 +236,27 @@ impl Hash for Node { } } -const MAX_NODES: usize = 1024; +pub const MAX_NODES_IN_TABLE: usize = 4096; +const MAX_NODES_IN_FILE: usize = 1024; const NODES_FILE: &str = "nodes.json"; /// Node table backed by disk file. pub struct NodeTable { nodes: HashMap, + ordered_ids: Vec, useless_nodes: HashSet, path: Option, } impl NodeTable { pub fn new(path: Option) -> NodeTable { + let nodes = NodeTable::load(path.clone()); + let ordered_ids = NodeTable::make_ordered_entries(&nodes).iter().map(|m| m.id).collect(); NodeTable { - path: path.clone(), - nodes: NodeTable::load(path), + path, + nodes, useless_nodes: HashSet::new(), + ordered_ids } } @@ -258,24 +264,72 @@ impl NodeTable { pub fn add_node(&mut self, mut node: Node) { // preserve node last_contact node.last_contact = self.nodes.get(&node.id).and_then(|n| n.last_contact); - self.nodes.insert(node.id, node); + let id = node.id; + if self.ordered_ids.len() == MAX_NODES_IN_TABLE { + self.nodes.remove(&self.ordered_ids.pop().expect("ordered_ids is not empty; qed")); + }; + let index = self.get_index_to_insert(node.last_contact); + if self.nodes.insert(node.id, node).is_none() { + self.ordered_ids.insert(index, id); + }; } - /// Returns a list of ordered nodes according to their most recent contact - /// and filtering useless nodes. The algorithm for creating the sorted nodes - /// is: + /// Get index in the ordered entries vector to insert node based on its last contact value + fn get_index_to_insert(&self, last_contact: Option) -> usize { + let len = self.ordered_ids.len(); + let mut index = len; + match last_contact { + Some(NodeContact::Success(last_contact_time)) => { + if let Some(i) = self.ordered_ids.iter().position(|&item| { + match self.nodes.get(&item).expect("nodes and ordered_ids do not get out of sync; qed").last_contact { + Some(NodeContact::Success(last)) => last < last_contact_time, + _ => true + } + }) { index = i; }; + }, + None => { + if let Some(i) = self.ordered_ids.iter().position(|&item| { + match self.nodes.get(&item).expect("nodes and ordered_ids do not get out of sync; qed").last_contact { + Some(NodeContact::Success(_)) => false, + _ => true + } + }) { index = i; }; + }, + Some(NodeContact::Failure(last_contact_time)) => { + if let Some(i) = self.ordered_ids.iter().rev().position(|&item| { + match self.nodes.get(&item).expect("nodes and ordered_ids do not get out of sync; qed").last_contact { + Some(NodeContact::Failure(last)) => last < last_contact_time, + _ => true + } + }) { index = len - i; }; + } + }; + index + } + + /// Returns a list of ordered entries from table + fn ordered(&self) -> Vec<&Node> { + Vec::from_iter( + self.ordered_ids + .iter() + .filter(|id| !self.useless_nodes.contains(&id)) + .map(|id| self.nodes.get(&id).expect("nodes and ordered_ids do not get out of sync; qed")) + ) + } + + /// Makes a list of ordered nodes according to their most recent contact. + /// The algorithm for creating the sorted nodes is: /// - Contacts that aren't recent (older than 1 week) are discarded /// - (1) Nodes with a successful contact are ordered (most recent success first) /// - (2) Nodes with unknown contact (older than 1 week or new nodes) are randomly shuffled /// - (3) Nodes with a failed contact are ordered (oldest failure first) /// - The final result is the concatenation of (1), (2) and (3) - fn ordered_entries(&self) -> Vec<&Node> { + fn make_ordered_entries(node_table: &HashMap) -> Vec<&Node> { let mut success = Vec::new(); let mut failures = Vec::new(); let mut unknown = Vec::new(); - let nodes = self.nodes.values() - .filter(|n| !self.useless_nodes.contains(&n.id)); + let nodes = node_table.values(); for node in nodes { // discard contact points older that aren't recent @@ -316,7 +370,7 @@ impl NodeTable { /// Returns node ids sorted by failure percentage, for nodes with the same failure percentage the absolute number of /// failures is considered. pub fn nodes(&self, filter: &IpFilter) -> Vec { - self.ordered_entries().iter() + self.ordered().iter() .filter(|n| n.endpoint.is_allowed(&filter)) .map(|n| n.id) .collect() @@ -325,15 +379,15 @@ impl NodeTable { /// Ordered list of all entries by failure percentage, for nodes with the same failure percentage the absolute /// number of failures is considered. pub fn entries(&self) -> Vec { - self.ordered_entries().iter().map(|n| NodeEntry { + self.ordered().iter().map(|n| NodeEntry { endpoint: n.endpoint.clone(), id: n.id, }).collect() } /// Get particular node - pub fn get_mut(&mut self, id: &NodeId) -> Option<&mut Node> { - self.nodes.get_mut(id) + pub fn get(&self, id: &NodeId) -> Option<&Node> { + self.nodes.get(id) } /// Check if a node exists in the table. @@ -344,28 +398,49 @@ impl NodeTable { /// Apply table changes coming from discovery pub fn update(&mut self, mut update: TableUpdates, reserved: &HashSet) { for (_, node) in update.added.drain() { - let entry = self.nodes.entry(node.id).or_insert_with(|| Node::new(node.id, node.endpoint.clone())); - entry.endpoint = node.endpoint; - } + let mut add = false; + { + let entry = self.nodes.entry(node.id).or_insert_with(|| { + add = true; + Node::new(node.id, node.endpoint.clone()) + }); + entry.endpoint = node.endpoint; + } + if add { + if self.ordered_ids.len() == MAX_NODES_IN_TABLE { + self.nodes.remove(&self.ordered_ids.pop().expect("ordered_ids is not empty; qed")); + }; + let index = self.get_index_to_insert(None); + self.ordered_ids.insert(index, node.id); + }; + }; for r in update.removed { if !reserved.contains(&r) { + self.ordered_ids.iter().position(|&i| r == i).map(|p| self.ordered_ids.remove(p)); self.nodes.remove(&r); } } } + fn update_ordered_ids(&mut self, id: &NodeId, last_contact: Option) { + if let Some(node) = self.nodes.get_mut(id) { + node.last_contact = last_contact; + } + if let Some(pos) = self.ordered_ids.iter().position(|i| id == i) { + self.ordered_ids.remove(pos); + let index = self.get_index_to_insert(last_contact); + self.ordered_ids.insert(index, *id); + } + } + /// Set last contact as failure for a node pub fn note_failure(&mut self, id: &NodeId) { - if let Some(node) = self.nodes.get_mut(id) { - node.last_contact = Some(NodeContact::failure()); - } + self.update_ordered_ids(id, Some(NodeContact::failure())); } /// Set last contact as success for a node pub fn note_success(&mut self, id: &NodeId) { - if let Some(node) = self.nodes.get_mut(id) { - node.last_contact = Some(NodeContact::success()); - } + self.update_ordered_ids(id, Some(NodeContact::success())); } /// Mark as useless, no further attempts to connect until next call to `clear_useless`. @@ -392,7 +467,7 @@ impl NodeTable { let node_ids = self.nodes(&IpFilter::default()); let nodes = node_ids.into_iter() .map(|id| self.nodes.get(&id).expect("self.nodes() only returns node IDs from self.nodes")) - .take(MAX_NODES) + .take(MAX_NODES_IN_FILE) .map(Into::into) .collect(); let table = json::NodeTable { nodes }; @@ -523,6 +598,8 @@ mod tests { use super::*; use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr}; use ethereum_types::H512; + use std::thread::sleep; + use std::time::Duration; use std::str::FromStr; use tempdir::TempDir; use ipnetwork::IpNetwork; @@ -604,49 +681,85 @@ mod tests { let id6 = H512::from_str("f979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c").unwrap(); let mut table = NodeTable::new(None); + assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0); + assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 0); + assert_eq!(table.get_index_to_insert(None), 0); + + // sleep 1 mcs is added because nanosecond precision was lost since mac os x high sierra update + // https://github.com/paritytech/parity-ethereum/issues/9632 + table.add_node(node1); + sleep(Duration::from_micros(1)); + + assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0); + assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 1); + assert_eq!(table.get_index_to_insert(None), 0); + table.add_node(node2); + sleep(Duration::from_micros(1)); + + assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0); + assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 2); + assert_eq!(table.get_index_to_insert(None), 0); + table.add_node(node3); + sleep(Duration::from_micros(1)); table.add_node(node4); + sleep(Duration::from_micros(1)); table.add_node(node5); + sleep(Duration::from_micros(1)); table.add_node(node6); + sleep(Duration::from_micros(1)); // failures - nodes 1 & 2 table.note_failure(&id1); + sleep(Duration::from_micros(1)); + let time_in_between = SystemTime::now(); + sleep(Duration::from_micros(1)); table.note_failure(&id2); + sleep(Duration::from_micros(1)); - // success - nodes 3 & 4 + assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0); + assert_eq!(table.get_index_to_insert(Some(NodeContact::failure())), 6); + assert_eq!(table.get_index_to_insert(Some(NodeContact::Failure(time_in_between))), 5); + assert_eq!(table.get_index_to_insert(Some(NodeContact::Failure(time::UNIX_EPOCH))), 4); + assert_eq!(table.get_index_to_insert(None), 0); + + // success - nodes 3,4,5 (5 - the oldest) + table.note_success(&id5); + sleep(Duration::from_micros(1)); table.note_success(&id3); - table.note_success(&id4); + sleep(Duration::from_micros(1)); - // success - node 5 (old contact) - table.get_mut(&id5).unwrap().last_contact = Some(NodeContact::Success(time::UNIX_EPOCH)); + assert_eq!(table.get_index_to_insert(Some(NodeContact::Success(time::UNIX_EPOCH))), 2); + assert_eq!(table.get_index_to_insert(None), 2); + + let time_in_between = SystemTime::now(); + sleep(Duration::from_micros(1)); + table.note_success(&id4); + sleep(Duration::from_micros(1)); + + assert_eq!(table.get_index_to_insert(Some(NodeContact::success())), 0); + assert_eq!(table.get_index_to_insert(Some(NodeContact::Success(time_in_between))), 1); + assert_eq!(table.get_index_to_insert(Some(NodeContact::Success(time::UNIX_EPOCH))), 3); + assert_eq!(table.get_index_to_insert(None), 3); // unknown - node 6 // nodes are also ordered according to their addition time - // - // nanosecond precision lost since mac os x high sierra update so let's not compare their order - // https://github.com/paritytech/parity-ethereum/issues/9632 let r = table.nodes(&IpFilter::default()); - // most recent success - assert!( - (r[0] == id4 && r[1] == id3) || - (r[0] == id3 && r[1] == id4) - ); + assert_eq!(r[0][..], id4[..]); // most recent success + assert_eq!(r[1][..], id3[..]); // unknown (old contacts and new nodes), randomly shuffled assert!( - (r[2] == id5 && r[3] == id6) || - (r[2] == id6 && r[3] == id5) + r[2][..] == id5[..] && r[3][..] == id6[..] || + r[2][..] == id6[..] && r[3][..] == id5[..] ); - // oldest failure - assert!( - (r[4] == id1 && r[5] == id2) || - (r[4] == id2 && r[5] == id1) - ); + assert_eq!(r[4][..], id1[..]); // oldest failure + assert_eq!(r[5][..], id2[..]); } #[test]