prefer pinned nodes when establishing peer connections.
This commit is contained in:
parent
a4dacca262
commit
2ea45134ab
@ -97,6 +97,7 @@ impl NetworkConfiguration {
|
||||
boot_nodes: Vec::new(),
|
||||
use_secret: None,
|
||||
ideal_peers: 25,
|
||||
reserved_nodes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -218,7 +219,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
||||
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||
let session = self.resolve_session(peer);
|
||||
if let Some(session) = session {
|
||||
try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data));
|
||||
try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data));
|
||||
} else {
|
||||
trace!(target: "network", "Send: Peer no longer exist")
|
||||
}
|
||||
@ -367,6 +368,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
let udp_port = config.udp_port.unwrap_or(listen_address.port());
|
||||
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
|
||||
|
||||
let boot_nodes = config.boot_nodes.clone();
|
||||
let reserved_nodes = config.reserved_nodes.clone();
|
||||
|
||||
let mut host = Host::<Message> {
|
||||
info: RwLock::new(HostInfo {
|
||||
keys: keys,
|
||||
@ -391,21 +395,26 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
stopping: AtomicBool::new(false),
|
||||
};
|
||||
|
||||
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
||||
for n in boot_nodes {
|
||||
host.add_node(&n);
|
||||
// don't pin boot nodes.
|
||||
host.add_node(&n, false);
|
||||
}
|
||||
|
||||
for n in reserved_nodes {
|
||||
host.add_node(&n, true);
|
||||
}
|
||||
Ok(host)
|
||||
}
|
||||
|
||||
pub fn add_node(&mut self, id: &str) {
|
||||
pub fn add_node(&mut self, id: &str, pin: bool) {
|
||||
match Node::from_str(id) {
|
||||
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
|
||||
Ok(n) => {
|
||||
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
||||
self.pinned_nodes.push(n.id.clone());
|
||||
if pin { self.pinned_nodes.push(n.id.clone()) }
|
||||
|
||||
self.nodes.write().unwrap().add_node(n);
|
||||
if let Some(ref mut discovery) = *self.discovery.lock().unwrap().deref_mut() {
|
||||
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
|
||||
discovery.add_node(entry);
|
||||
}
|
||||
}
|
||||
@ -484,7 +493,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
for n in self.nodes.read().unwrap().unordered_entries() {
|
||||
discovery.add_node(n.clone());
|
||||
}
|
||||
*self.discovery.lock().unwrap().deref_mut() = Some(discovery);
|
||||
*self.discovery.lock().unwrap() = Some(discovery);
|
||||
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
|
||||
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
|
||||
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
|
||||
@ -531,15 +540,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
}
|
||||
|
||||
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||
if self.info.read().unwrap().deref().capabilities.is_empty() {
|
||||
if self.info.read().unwrap().capabilities.is_empty() {
|
||||
return;
|
||||
}
|
||||
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
|
||||
let pin = { self.info.read().unwrap().deref().config.pin };
|
||||
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
|
||||
let pin = { self.info.read().unwrap().config.pin };
|
||||
let session_count = self.session_count();
|
||||
if session_count >= ideal_peers as usize {
|
||||
if session_count >= ideal_peers as usize + self.pinned_nodes.len() {
|
||||
// check if all pinned nodes are connected.
|
||||
if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let handshake_count = self.handshake_count();
|
||||
// allow 16 slots for incoming connections
|
||||
@ -548,9 +560,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
return;
|
||||
}
|
||||
|
||||
let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() };
|
||||
// iterate over all nodes, reserved ones coming first.
|
||||
// if we are pinned to only reserved nodes, ignore all others.
|
||||
let nodes = self.pinned_nodes.iter().cloned().chain(if !pin {
|
||||
self.nodes.read().unwrap().nodes()
|
||||
} else {
|
||||
Vec::new()
|
||||
});
|
||||
|
||||
let mut started: usize = 0;
|
||||
for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
|
||||
for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
|
||||
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
|
||||
self.connect_peer(&id, io);
|
||||
started += 1;
|
||||
@ -680,7 +699,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
|
||||
if !s.info.originated {
|
||||
let session_count = self.session_count();
|
||||
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
|
||||
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
|
||||
if session_count >= ideal_peers as usize {
|
||||
s.disconnect(io, DisconnectReason::TooManyPeers);
|
||||
return;
|
||||
@ -902,7 +921,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
||||
} => {
|
||||
let handler_token = {
|
||||
let mut timer_counter = self.timer_counter.write().unwrap();
|
||||
let counter = timer_counter.deref_mut();
|
||||
let counter = &mut *timer_counter;
|
||||
let handler_token = *counter;
|
||||
*counter += 1;
|
||||
handler_token
|
||||
@ -946,7 +965,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
||||
}
|
||||
}
|
||||
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
||||
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||
_ => warn!("Unexpected stream registration")
|
||||
}
|
||||
}
|
||||
@ -974,7 +993,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
||||
}
|
||||
}
|
||||
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
||||
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||
_ => warn!("Unexpected stream update")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user