From 2ea45134ab33f8cf87b2ba614b0e0d422e784ab5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 19 Jun 2016 20:57:54 +0200 Subject: [PATCH] prefer pinned nodes when establishing peer connections. --- util/src/network/host.rs | 55 +++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/util/src/network/host.rs b/util/src/network/host.rs index c6810abb7..25b1629f8 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -97,6 +97,7 @@ impl NetworkConfiguration { boot_nodes: Vec::new(), use_secret: None, ideal_peers: 25, + reserved_nodes: Vec::new(), } } @@ -218,7 +219,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { let session = self.resolve_session(peer); if let Some(session) = session { - try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data)); + try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data)); } else { trace!(target: "network", "Send: Peer no longer exist") } @@ -367,6 +368,9 @@ impl Host where Message: Send + Sync + Clone { let udp_port = config.udp_port.unwrap_or(listen_address.port()); let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; + let boot_nodes = config.boot_nodes.clone(); + let reserved_nodes = config.reserved_nodes.clone(); + let mut host = Host:: { info: RwLock::new(HostInfo { keys: keys, @@ -391,21 +395,26 @@ impl Host where Message: Send + Sync + Clone { stopping: AtomicBool::new(false), }; - let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); for n in boot_nodes { - host.add_node(&n); + // don't pin boot nodes. + host.add_node(&n, false); + } + + for n in reserved_nodes { + host.add_node(&n, true); } Ok(host) } - pub fn add_node(&mut self, id: &str) { + pub fn add_node(&mut self, id: &str, pin: bool) { match Node::from_str(id) { Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - self.pinned_nodes.push(n.id.clone()); + if pin { self.pinned_nodes.push(n.id.clone()) } + self.nodes.write().unwrap().add_node(n); - if let Some(ref mut discovery) = *self.discovery.lock().unwrap().deref_mut() { + if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { discovery.add_node(entry); } } @@ -484,7 +493,7 @@ impl Host where Message: Send + Sync + Clone { for n in self.nodes.read().unwrap().unordered_entries() { discovery.add_node(n.clone()); } - *self.discovery.lock().unwrap().deref_mut() = Some(discovery); + *self.discovery.lock().unwrap() = Some(discovery); io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); @@ -531,14 +540,17 @@ impl Host where Message: Send + Sync + Clone { } fn connect_peers(&self, io: &IoContext>) { - if self.info.read().unwrap().deref().capabilities.is_empty() { + if self.info.read().unwrap().capabilities.is_empty() { return; } - let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; - let pin = { self.info.read().unwrap().deref().config.pin }; + let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; + let pin = { self.info.read().unwrap().config.pin }; let session_count = self.session_count(); - if session_count >= ideal_peers as usize { - return; + if session_count >= ideal_peers as usize + self.pinned_nodes.len() { + // check if all pinned nodes are connected. + if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { + return; + } } let handshake_count = self.handshake_count(); @@ -548,9 +560,16 @@ impl Host where Message: Send + Sync + Clone { return; } - let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() }; + // iterate over all nodes, reserved ones coming first. + // if we are pinned to only reserved nodes, ignore all others. + let nodes = self.pinned_nodes.iter().cloned().chain(if !pin { + self.nodes.read().unwrap().nodes() + } else { + Vec::new() + }); + let mut started: usize = 0; - for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id)) + for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id)) .take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) { self.connect_peer(&id, io); started += 1; @@ -680,7 +699,7 @@ impl Host where Message: Send + Sync + Clone { self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); - let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; + let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; if session_count >= ideal_peers as usize { s.disconnect(io, DisconnectReason::TooManyPeers); return; @@ -902,7 +921,7 @@ impl IoHandler> for Host where Messa } => { let handler_token = { let mut timer_counter = self.timer_counter.write().unwrap(); - let counter = timer_counter.deref_mut(); + let counter = &mut *timer_counter; let handler_token = *counter; *counter += 1; handler_token @@ -946,7 +965,7 @@ impl IoHandler> for Host where Messa } } DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -974,7 +993,7 @@ impl IoHandler> for Host where Messa } } DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } }