Merge pull request #1350 from ethcore/revert-1349-revert-1347-reserved-peers
Reopen "reserved peers and reserved-only flag"
This commit is contained in:
commit
5f7bdc028d
@ -67,6 +67,10 @@ Networking Options:
|
|||||||
--no-discovery Disable new peer discovery.
|
--no-discovery Disable new peer discovery.
|
||||||
--node-key KEY Specify node secret key, either as 64-character hex
|
--node-key KEY Specify node secret key, either as 64-character hex
|
||||||
string or input to SHA3 operation.
|
string or input to SHA3 operation.
|
||||||
|
--reserved-peers FILE Provide a file containing enodes, one per line.
|
||||||
|
These nodes will always have a reserved slot on top
|
||||||
|
of the normal maximum peers.
|
||||||
|
--reserved-only Connect only to reserved nodes.
|
||||||
|
|
||||||
API and Console Options:
|
API and Console Options:
|
||||||
--jsonrpc-off Disable the JSON-RPC API server.
|
--jsonrpc-off Disable the JSON-RPC API server.
|
||||||
@ -239,6 +243,8 @@ pub struct Args {
|
|||||||
pub flag_no_discovery: bool,
|
pub flag_no_discovery: bool,
|
||||||
pub flag_nat: String,
|
pub flag_nat: String,
|
||||||
pub flag_node_key: Option<String>,
|
pub flag_node_key: Option<String>,
|
||||||
|
pub flag_reserved_peers: Option<String>,
|
||||||
|
pub flag_reserved_only: bool,
|
||||||
pub flag_cache_pref_size: usize,
|
pub flag_cache_pref_size: usize,
|
||||||
pub flag_cache_max_size: usize,
|
pub flag_cache_max_size: usize,
|
||||||
pub flag_queue_max_size: usize,
|
pub flag_queue_max_size: usize,
|
||||||
|
@ -153,6 +153,26 @@ impl Configuration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn init_reserved_nodes(&self) -> Vec<String> {
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::BufRead;
|
||||||
|
|
||||||
|
if let Some(ref path) = self.args.flag_reserved_peers {
|
||||||
|
let mut buffer = String::new();
|
||||||
|
let mut node_file = File::open(path).unwrap_or_else(|e| {
|
||||||
|
die!("Error opening reserved nodes file: {}", e);
|
||||||
|
});
|
||||||
|
node_file.read_to_string(&mut buffer).expect("Error reading reserved node file");
|
||||||
|
buffer.lines().map(|s| {
|
||||||
|
Self::normalize_enode(s).unwrap_or_else(|| {
|
||||||
|
die!("{}: Invalid node address format given for a reserved node.", s);
|
||||||
|
})
|
||||||
|
}).collect()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn net_addresses(&self) -> (Option<SocketAddr>, Option<SocketAddr>) {
|
pub fn net_addresses(&self) -> (Option<SocketAddr>, Option<SocketAddr>) {
|
||||||
let port = self.net_port();
|
let port = self.net_port();
|
||||||
let listen_address = Some(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), port));
|
let listen_address = Some(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), port));
|
||||||
@ -179,6 +199,8 @@ impl Configuration {
|
|||||||
let mut net_path = PathBuf::from(&self.path());
|
let mut net_path = PathBuf::from(&self.path());
|
||||||
net_path.push("network");
|
net_path.push("network");
|
||||||
ret.config_path = Some(net_path.to_str().unwrap().to_owned());
|
ret.config_path = Some(net_path.to_str().unwrap().to_owned());
|
||||||
|
ret.reserved_nodes = self.init_reserved_nodes();
|
||||||
|
ret.reserved_only = self.args.flag_reserved_only;
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,10 +315,11 @@ fn execute_export(conf: Configuration) {
|
|||||||
udp_port: None,
|
udp_port: None,
|
||||||
nat_enabled: false,
|
nat_enabled: false,
|
||||||
discovery_enabled: false,
|
discovery_enabled: false,
|
||||||
pin: true,
|
reserved_only: true,
|
||||||
boot_nodes: Vec::new(),
|
boot_nodes: Vec::new(),
|
||||||
use_secret: None,
|
use_secret: None,
|
||||||
ideal_peers: 0,
|
ideal_peers: 0,
|
||||||
|
reserved_nodes: Vec::new(),
|
||||||
};
|
};
|
||||||
let client_config = conf.client_config(&spec);
|
let client_config = conf.client_config(&spec);
|
||||||
|
|
||||||
@ -386,10 +387,11 @@ fn execute_import(conf: Configuration) {
|
|||||||
udp_port: None,
|
udp_port: None,
|
||||||
nat_enabled: false,
|
nat_enabled: false,
|
||||||
discovery_enabled: false,
|
discovery_enabled: false,
|
||||||
pin: true,
|
reserved_only: true,
|
||||||
boot_nodes: Vec::new(),
|
boot_nodes: Vec::new(),
|
||||||
use_secret: None,
|
use_secret: None,
|
||||||
ideal_peers: 0,
|
ideal_peers: 0,
|
||||||
|
reserved_nodes: Vec::new(),
|
||||||
};
|
};
|
||||||
let client_config = conf.client_config(&spec);
|
let client_config = conf.client_config(&spec);
|
||||||
|
|
||||||
|
@ -74,5 +74,4 @@ impl<M> EthcoreSet for EthcoreSetClient<M> where M: MinerService + 'static {
|
|||||||
to_value(&true)
|
to_value(&true)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -65,14 +65,16 @@ pub struct NetworkConfiguration {
|
|||||||
pub nat_enabled: bool,
|
pub nat_enabled: bool,
|
||||||
/// Enable discovery
|
/// Enable discovery
|
||||||
pub discovery_enabled: bool,
|
pub discovery_enabled: bool,
|
||||||
/// Pin to boot nodes only
|
/// Pin to reserved nodes only
|
||||||
pub pin: bool,
|
pub reserved_only: bool,
|
||||||
/// List of initial node addresses
|
/// List of initial node addresses
|
||||||
pub boot_nodes: Vec<String>,
|
pub boot_nodes: Vec<String>,
|
||||||
/// Use provided node key instead of default
|
/// Use provided node key instead of default
|
||||||
pub use_secret: Option<Secret>,
|
pub use_secret: Option<Secret>,
|
||||||
/// Number of connected peers to maintain
|
/// Number of connected peers to maintain
|
||||||
pub ideal_peers: u32,
|
pub ideal_peers: u32,
|
||||||
|
/// List of reserved node addresses.
|
||||||
|
pub reserved_nodes: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for NetworkConfiguration {
|
impl Default for NetworkConfiguration {
|
||||||
@ -91,10 +93,11 @@ impl NetworkConfiguration {
|
|||||||
udp_port: None,
|
udp_port: None,
|
||||||
nat_enabled: true,
|
nat_enabled: true,
|
||||||
discovery_enabled: true,
|
discovery_enabled: true,
|
||||||
pin: false,
|
reserved_only: false,
|
||||||
boot_nodes: Vec::new(),
|
boot_nodes: Vec::new(),
|
||||||
use_secret: None,
|
use_secret: None,
|
||||||
ideal_peers: 25,
|
ideal_peers: 25,
|
||||||
|
reserved_nodes: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,7 +219,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|||||||
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||||
let session = self.resolve_session(peer);
|
let session = self.resolve_session(peer);
|
||||||
if let Some(session) = session {
|
if let Some(session) = session {
|
||||||
try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data));
|
try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data));
|
||||||
} else {
|
} else {
|
||||||
trace!(target: "network", "Send: Peer no longer exist")
|
trace!(target: "network", "Send: Peer no longer exist")
|
||||||
}
|
}
|
||||||
@ -365,6 +368,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
let udp_port = config.udp_port.unwrap_or(listen_address.port());
|
let udp_port = config.udp_port.unwrap_or(listen_address.port());
|
||||||
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
|
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
|
||||||
|
|
||||||
|
let boot_nodes = config.boot_nodes.clone();
|
||||||
|
let reserved_nodes = config.reserved_nodes.clone();
|
||||||
|
|
||||||
let mut host = Host::<Message> {
|
let mut host = Host::<Message> {
|
||||||
info: RwLock::new(HostInfo {
|
info: RwLock::new(HostInfo {
|
||||||
keys: keys,
|
keys: keys,
|
||||||
@ -389,21 +395,26 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
stopping: AtomicBool::new(false),
|
stopping: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
|
|
||||||
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
|
||||||
for n in boot_nodes {
|
for n in boot_nodes {
|
||||||
host.add_node(&n);
|
// don't pin boot nodes.
|
||||||
|
host.add_node(&n, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
for n in reserved_nodes {
|
||||||
|
host.add_node(&n, true);
|
||||||
}
|
}
|
||||||
Ok(host)
|
Ok(host)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_node(&mut self, id: &str) {
|
pub fn add_node(&mut self, id: &str, pin: bool) {
|
||||||
match Node::from_str(id) {
|
match Node::from_str(id) {
|
||||||
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
|
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
||||||
self.pinned_nodes.push(n.id.clone());
|
if pin { self.pinned_nodes.push(n.id.clone()) }
|
||||||
|
|
||||||
self.nodes.write().unwrap().add_node(n);
|
self.nodes.write().unwrap().add_node(n);
|
||||||
if let Some(ref mut discovery) = *self.discovery.lock().unwrap().deref_mut() {
|
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
|
||||||
discovery.add_node(entry);
|
discovery.add_node(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -472,7 +483,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
// Initialize discovery.
|
// Initialize discovery.
|
||||||
let discovery = {
|
let discovery = {
|
||||||
let info = self.info.read().unwrap();
|
let info = self.info.read().unwrap();
|
||||||
if info.config.discovery_enabled && !info.config.pin {
|
if info.config.discovery_enabled && !info.config.reserved_only {
|
||||||
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
|
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
|
||||||
} else { None }
|
} else { None }
|
||||||
};
|
};
|
||||||
@ -482,7 +493,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
for n in self.nodes.read().unwrap().unordered_entries() {
|
for n in self.nodes.read().unwrap().unordered_entries() {
|
||||||
discovery.add_node(n.clone());
|
discovery.add_node(n.clone());
|
||||||
}
|
}
|
||||||
*self.discovery.lock().unwrap().deref_mut() = Some(discovery);
|
*self.discovery.lock().unwrap() = Some(discovery);
|
||||||
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
|
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
|
||||||
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
|
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
|
||||||
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
|
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
|
||||||
@ -529,15 +540,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
if self.info.read().unwrap().deref().capabilities.is_empty() {
|
if self.info.read().unwrap().capabilities.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
|
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
|
||||||
let pin = { self.info.read().unwrap().deref().config.pin };
|
let pin = { self.info.read().unwrap().config.reserved_only };
|
||||||
let session_count = self.session_count();
|
let session_count = self.session_count();
|
||||||
if session_count >= ideal_peers as usize {
|
if session_count >= ideal_peers as usize + self.pinned_nodes.len() {
|
||||||
|
// check if all pinned nodes are connected.
|
||||||
|
if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let handshake_count = self.handshake_count();
|
let handshake_count = self.handshake_count();
|
||||||
// allow 16 slots for incoming connections
|
// allow 16 slots for incoming connections
|
||||||
@ -546,9 +560,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() };
|
// iterate over all nodes, reserved ones coming first.
|
||||||
|
// if we are pinned to only reserved nodes, ignore all others.
|
||||||
|
let nodes = self.pinned_nodes.iter().cloned().chain(if !pin {
|
||||||
|
self.nodes.read().unwrap().nodes()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
});
|
||||||
|
|
||||||
let mut started: usize = 0;
|
let mut started: usize = 0;
|
||||||
for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
|
for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
|
||||||
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
|
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
|
||||||
self.connect_peer(&id, io);
|
self.connect_peer(&id, io);
|
||||||
started += 1;
|
started += 1;
|
||||||
@ -678,7 +699,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
|
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
|
||||||
if !s.info.originated {
|
if !s.info.originated {
|
||||||
let session_count = self.session_count();
|
let session_count = self.session_count();
|
||||||
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
|
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
|
||||||
if session_count >= ideal_peers as usize {
|
if session_count >= ideal_peers as usize {
|
||||||
s.disconnect(io, DisconnectReason::TooManyPeers);
|
s.disconnect(io, DisconnectReason::TooManyPeers);
|
||||||
return;
|
return;
|
||||||
@ -900,7 +921,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
} => {
|
} => {
|
||||||
let handler_token = {
|
let handler_token = {
|
||||||
let mut timer_counter = self.timer_counter.write().unwrap();
|
let mut timer_counter = self.timer_counter.write().unwrap();
|
||||||
let counter = timer_counter.deref_mut();
|
let counter = &mut *timer_counter;
|
||||||
let handler_token = *counter;
|
let handler_token = *counter;
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
handler_token
|
handler_token
|
||||||
@ -944,7 +965,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
||||||
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||||
_ => warn!("Unexpected stream registration")
|
_ => warn!("Unexpected stream registration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -972,7 +993,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
||||||
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||||
_ => warn!("Unexpected stream update")
|
_ => warn!("Unexpected stream update")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user