Merge pull request #1349 from ethcore/revert-1347-reserved-peers

Revert "Reserved peers, reserved-only flag"
This commit is contained in:
Robert Habermeier 2016-06-20 14:10:30 +02:00 committed by GitHub
commit b05c218338
5 changed files with 25 additions and 75 deletions

View File

@ -67,10 +67,6 @@ Networking Options:
--no-discovery Disable new peer discovery. --no-discovery Disable new peer discovery.
--node-key KEY Specify node secret key, either as 64-character hex --node-key KEY Specify node secret key, either as 64-character hex
string or input to SHA3 operation. string or input to SHA3 operation.
--reserved-peers FILE Provide a file containing enodes, one per line.
These nodes will always have a reserved slot on top
of the normal maximum peers.
--reserved-only Connect only to reserved nodes.
API and Console Options: API and Console Options:
--jsonrpc-off Disable the JSON-RPC API server. --jsonrpc-off Disable the JSON-RPC API server.
@ -242,8 +238,6 @@ pub struct Args {
pub flag_no_discovery: bool, pub flag_no_discovery: bool,
pub flag_nat: String, pub flag_nat: String,
pub flag_node_key: Option<String>, pub flag_node_key: Option<String>,
pub flag_reserved_peers: Option<String>,
pub flag_reserved_only: bool,
pub flag_cache_pref_size: usize, pub flag_cache_pref_size: usize,
pub flag_cache_max_size: usize, pub flag_cache_max_size: usize,
pub flag_queue_max_size: usize, pub flag_queue_max_size: usize,

View File

@ -153,26 +153,6 @@ impl Configuration {
} }
} }
pub fn init_reserved_nodes(&self) -> Vec<String> {
use std::fs::File;
use std::io::BufRead;
if let Some(ref path) = self.args.flag_reserved_peers {
let mut buffer = String::new();
let mut node_file = File::open(path).unwrap_or_else(|e| {
die!("Error opening reserved nodes file: {}", e);
});
node_file.read_to_string(&mut buffer).expect("Error reading reserved node file");
buffer.lines().map(|s| {
Self::normalize_enode(s).unwrap_or_else(|| {
die!("{}: Invalid node address format given for a reserved node.", s);
})
}).collect()
} else {
Vec::new()
}
}
pub fn net_addresses(&self) -> (Option<SocketAddr>, Option<SocketAddr>) { pub fn net_addresses(&self) -> (Option<SocketAddr>, Option<SocketAddr>) {
let port = self.net_port(); let port = self.net_port();
let listen_address = Some(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), port)); let listen_address = Some(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), port));
@ -199,8 +179,6 @@ impl Configuration {
let mut net_path = PathBuf::from(&self.path()); let mut net_path = PathBuf::from(&self.path());
net_path.push("network"); net_path.push("network");
ret.config_path = Some(net_path.to_str().unwrap().to_owned()); ret.config_path = Some(net_path.to_str().unwrap().to_owned());
ret.reserved_nodes = self.init_reserved_nodes();
ret.reserved_only = self.args.flag_reserved_only;
ret ret
} }

View File

@ -315,11 +315,10 @@ fn execute_export(conf: Configuration) {
udp_port: None, udp_port: None,
nat_enabled: false, nat_enabled: false,
discovery_enabled: false, discovery_enabled: false,
reserved_only: true, pin: true,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 0, ideal_peers: 0,
reserved_nodes: Vec::new(),
}; };
let client_config = conf.client_config(&spec); let client_config = conf.client_config(&spec);
@ -387,11 +386,10 @@ fn execute_import(conf: Configuration) {
udp_port: None, udp_port: None,
nat_enabled: false, nat_enabled: false,
discovery_enabled: false, discovery_enabled: false,
reserved_only: true, pin: true,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 0, ideal_peers: 0,
reserved_nodes: Vec::new(),
}; };
let client_config = conf.client_config(&spec); let client_config = conf.client_config(&spec);

View File

@ -74,4 +74,5 @@ impl<M> EthcoreSet for EthcoreSetClient<M> where M: MinerService + 'static {
to_value(&true) to_value(&true)
}) })
} }
} }

View File

@ -65,16 +65,14 @@ pub struct NetworkConfiguration {
pub nat_enabled: bool, pub nat_enabled: bool,
/// Enable discovery /// Enable discovery
pub discovery_enabled: bool, pub discovery_enabled: bool,
/// Pin to reserved nodes only /// Pin to boot nodes only
pub reserved_only: bool, pub pin: bool,
/// List of initial node addresses /// List of initial node addresses
pub boot_nodes: Vec<String>, pub boot_nodes: Vec<String>,
/// Use provided node key instead of default /// Use provided node key instead of default
pub use_secret: Option<Secret>, pub use_secret: Option<Secret>,
/// Number of connected peers to maintain /// Number of connected peers to maintain
pub ideal_peers: u32, pub ideal_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
} }
impl Default for NetworkConfiguration { impl Default for NetworkConfiguration {
@ -93,11 +91,10 @@ impl NetworkConfiguration {
udp_port: None, udp_port: None,
nat_enabled: true, nat_enabled: true,
discovery_enabled: true, discovery_enabled: true,
reserved_only: false, pin: false,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 25, ideal_peers: 25,
reserved_nodes: Vec::new(),
} }
} }
@ -219,7 +216,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> { pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
let session = self.resolve_session(peer); let session = self.resolve_session(peer);
if let Some(session) = session { if let Some(session) = session {
try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data)); try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data));
} else { } else {
trace!(target: "network", "Send: Peer no longer exist") trace!(target: "network", "Send: Peer no longer exist")
} }
@ -368,9 +365,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let udp_port = config.udp_port.unwrap_or(listen_address.port()); let udp_port = config.udp_port.unwrap_or(listen_address.port());
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
let boot_nodes = config.boot_nodes.clone();
let reserved_nodes = config.reserved_nodes.clone();
let mut host = Host::<Message> { let mut host = Host::<Message> {
info: RwLock::new(HostInfo { info: RwLock::new(HostInfo {
keys: keys, keys: keys,
@ -395,26 +389,21 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
stopping: AtomicBool::new(false), stopping: AtomicBool::new(false),
}; };
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
for n in boot_nodes { for n in boot_nodes {
// don't pin boot nodes. host.add_node(&n);
host.add_node(&n, false);
}
for n in reserved_nodes {
host.add_node(&n, true);
} }
Ok(host) Ok(host)
} }
pub fn add_node(&mut self, id: &str, pin: bool) { pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) { match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
Ok(n) => { Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
if pin { self.pinned_nodes.push(n.id.clone()) } self.pinned_nodes.push(n.id.clone());
self.nodes.write().unwrap().add_node(n); self.nodes.write().unwrap().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { if let Some(ref mut discovery) = *self.discovery.lock().unwrap().deref_mut() {
discovery.add_node(entry); discovery.add_node(entry);
} }
} }
@ -483,7 +472,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Initialize discovery. // Initialize discovery.
let discovery = { let discovery = {
let info = self.info.read().unwrap(); let info = self.info.read().unwrap();
if info.config.discovery_enabled && !info.config.reserved_only { if info.config.discovery_enabled && !info.config.pin {
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None } } else { None }
}; };
@ -493,7 +482,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
for n in self.nodes.read().unwrap().unordered_entries() { for n in self.nodes.read().unwrap().unordered_entries() {
discovery.add_node(n.clone()); discovery.add_node(n.clone());
} }
*self.discovery.lock().unwrap() = Some(discovery); *self.discovery.lock().unwrap().deref_mut() = Some(discovery);
io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
@ -540,18 +529,15 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) { fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
if self.info.read().unwrap().capabilities.is_empty() { if self.info.read().unwrap().deref().capabilities.is_empty() {
return; return;
} }
let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
let pin = { self.info.read().unwrap().config.reserved_only }; let pin = { self.info.read().unwrap().deref().config.pin };
let session_count = self.session_count(); let session_count = self.session_count();
if session_count >= ideal_peers as usize + self.pinned_nodes.len() { if session_count >= ideal_peers as usize {
// check if all pinned nodes are connected.
if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
return; return;
} }
}
let handshake_count = self.handshake_count(); let handshake_count = self.handshake_count();
// allow 16 slots for incoming connections // allow 16 slots for incoming connections
@ -560,16 +546,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
return; return;
} }
// iterate over all nodes, reserved ones coming first. let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() };
// if we are pinned to only reserved nodes, ignore all others.
let nodes = self.pinned_nodes.iter().cloned().chain(if !pin {
self.nodes.read().unwrap().nodes()
} else {
Vec::new()
});
let mut started: usize = 0; let mut started: usize = 0;
for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id)) for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) { .take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
self.connect_peer(&id, io); self.connect_peer(&id, io);
started += 1; started += 1;
@ -699,7 +678,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated { if !s.info.originated {
let session_count = self.session_count(); let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
if session_count >= ideal_peers as usize { if session_count >= ideal_peers as usize {
s.disconnect(io, DisconnectReason::TooManyPeers); s.disconnect(io, DisconnectReason::TooManyPeers);
return; return;
@ -921,7 +900,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} => { } => {
let handler_token = { let handler_token = {
let mut timer_counter = self.timer_counter.write().unwrap(); let mut timer_counter = self.timer_counter.write().unwrap();
let counter = &mut *timer_counter; let counter = timer_counter.deref_mut();
let handler_token = *counter; let handler_token = *counter;
*counter += 1; *counter += 1;
handler_token handler_token
@ -965,7 +944,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} }
} }
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration") _ => warn!("Unexpected stream registration")
} }
} }
@ -993,7 +972,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} }
} }
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update") _ => warn!("Unexpected stream update")
} }
} }