From 8beba717f8456fa205471a02eb3472a41fa320fd Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 23 Feb 2016 19:38:06 +0100 Subject: [PATCH] Delayed UPnP initialization --- parity/main.rs | 6 +- util/src/network/host.rs | 136 +++++++++++++++++++++--------------- util/src/network/service.rs | 1 - 3 files changed, 84 insertions(+), 59 deletions(-) diff --git a/parity/main.rs b/parity/main.rs index e25933f2d..79ee41590 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -73,7 +73,7 @@ Options: --address URL Equivalent to --listen-address URL --public-address URL. --peers NUM Try to manintain that many peers [default: 25]. --no-discovery Disable new peer discovery. - --upnp Use UPnP to try to figure out the correct network settings. + --no-upnp Disable trying to figure out the correct public adderss over UPnP. --node-key KEY Specify node secret key, either as 64-character hex string or input to SHA3 operation. --cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384]. @@ -101,7 +101,7 @@ struct Args { flag_address: Option, flag_peers: u32, flag_no_discovery: bool, - flag_upnp: bool, + flag_no_upnp: bool, flag_node_key: Option, flag_cache_pref_size: usize, flag_cache_max_size: usize, @@ -233,7 +233,7 @@ impl Configuration { fn net_settings(&self, spec: &Spec) -> NetworkConfiguration { let mut ret = NetworkConfiguration::new(); - ret.nat_enabled = self.args.flag_upnp; + ret.nat_enabled = !self.args.flag_no_upnp; ret.boot_nodes = self.init_nodes(spec); let (listen, public) = self.net_addresses(); ret.listen_address = listen; diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 8dd9eb9cc..feb342700 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -106,6 +106,7 @@ const IDLE: usize = LAST_HANDSHAKE + 2; const DISCOVERY: usize = LAST_HANDSHAKE + 3; const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4; const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5; +const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6; const FIRST_SESSION: usize = 0; const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1; const FIRST_HANDSHAKE: usize = LAST_SESSION + 1; @@ -261,7 +262,9 @@ pub struct HostInfo { /// TCP connection port. pub listen_port: u16, /// Registered capabilities (handlers) - pub capabilities: Vec + pub capabilities: Vec, + /// Public address + discovery port + public_endpoint: NodeEndpoint, } impl HostInfo { @@ -294,16 +297,15 @@ struct ProtocolTimer { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host where Message: Send + Sync + Clone { pub info: RwLock, - tcp_listener: Mutex, + tcp_listener: Mutex>, handshakes: Arc>>, sessions: Arc>>, - discovery: Option>, + discovery: Mutex>, nodes: RwLock, handlers: RwLock>>>, timers: RwLock>, timer_counter: RwLock, stats: Arc, - public_endpoint: NodeEndpoint, pinned_nodes: Vec, } @@ -316,27 +318,6 @@ impl Host where Message: Send + Sync + Clone { }; let udp_port = config.udp_port.unwrap_or(listen_address.port()); - let public_endpoint = match config.public_address { - None => { - let public_address = select_public_address(listen_address.port()); - let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port }; - if config.nat_enabled { - match map_external_address(&local_endpoint) { - Some(endpoint) => { - info!("NAT Mappped to external address {}", endpoint.address); - endpoint - }, - None => local_endpoint - } - } else { - local_endpoint - } - } - Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port } - }; - - // Setup the server socket - let tcp_listener = TcpListener::bind(&listen_address).unwrap(); let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { @@ -350,10 +331,8 @@ impl Host where Message: Send + Sync + Clone { }, |s| KeyPair::from_secret(s).expect("Error creating node secret key")) }; - let discovery = if config.discovery_enabled && !config.pin { - Some(Discovery::new(&keys, listen_address.clone(), public_endpoint.clone(), DISCOVERY)) - } else { None }; let path = config.config_path.clone(); + let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; let mut host = Host:: { info: RwLock::new(HostInfo { keys: keys, @@ -363,9 +342,10 @@ impl Host where Message: Send + Sync + Clone { client_version: version(), listen_port: 0, capabilities: Vec::new(), + public_endpoint: local_endpoint, // will be replaced by public once it is resolved }), - discovery: discovery.map(Mutex::new), - tcp_listener: Mutex::new(tcp_listener), + discovery: Mutex::new(None), + tcp_listener: Mutex::new(None), handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))), sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), nodes: RwLock::new(NodeTable::new(path)), @@ -373,16 +353,12 @@ impl Host where Message: Send + Sync + Clone { timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(USER_TIMER), stats: Arc::new(NetworkStats::default()), - public_endpoint: public_endpoint, pinned_nodes: Vec::new(), }; let port = listen_address.port(); host.info.write().unwrap().deref_mut().listen_port = port; let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); - if let Some(ref mut discovery) = host.discovery { - discovery.lock().unwrap().init_node_list(host.nodes.read().unwrap().unordered_entries()); - } for n in boot_nodes { host.add_node(&n); } @@ -400,8 +376,8 @@ impl Host where Message: Send + Sync + Clone { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; self.pinned_nodes.push(n.id.clone()); self.nodes.write().unwrap().add_node(n); - if let Some(ref mut discovery) = self.discovery { - discovery.lock().unwrap().add_node(entry); + if let &mut Some(ref mut discovery) = self.discovery.lock().unwrap().deref_mut() { + discovery.add_node(entry); } } } @@ -412,7 +388,61 @@ impl Host where Message: Send + Sync + Clone { } pub fn client_url(&self) -> String { - format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.public_endpoint.clone())) + format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().public_endpoint.clone())) + } + + fn init_public_interface(&self, io: &IoContext>) { + io.clear_timer(INIT_PUBLIC).unwrap(); + let mut tcp_listener = self.tcp_listener.lock().unwrap(); + if tcp_listener.is_some() { + return; + } + // public_endpoint in host info contains local adderss at this point + let listen_address = self.info.read().unwrap().public_endpoint.address.clone(); + let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port()); + let public_endpoint = match self.info.read().unwrap().config.public_address { + None => { + let public_address = select_public_address(listen_address.port()); + let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port }; + if self.info.read().unwrap().config.nat_enabled { + match map_external_address(&local_endpoint) { + Some(endpoint) => { + info!("NAT mappped to external address {}", endpoint.address); + endpoint + }, + None => local_endpoint + } + } else { + local_endpoint + } + } + Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port } + }; + + // Setup the server socket + *tcp_listener = Some(TcpListener::bind(&listen_address).unwrap()); + self.info.write().unwrap().public_endpoint = public_endpoint.clone(); + io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener"); + info!("Public node URL: {}", self.client_url()); + + // Initialize discovery. + let discovery = { + let info = self.info.read().unwrap(); + if info.config.discovery_enabled && !info.config.pin { + Some(Discovery::new(&info.keys, listen_address.clone(), public_endpoint, DISCOVERY)) + } else { None } + }; + + if let Some(mut discovery) = discovery { + discovery.init_node_list(self.nodes.read().unwrap().unordered_entries()); + for n in self.nodes.read().unwrap().unordered_entries() { + discovery.add_node(n.clone()); + } + io.register_stream(DISCOVERY).expect("Error registering UDP listener"); + io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); + io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); + *self.discovery.lock().unwrap().deref_mut() = Some(discovery); + } } fn maintain_network(&self, io: &IoContext>) { @@ -526,7 +556,7 @@ impl Host where Message: Send + Sync + Clone { fn accept(&self, io: &IoContext>) { trace!(target: "network", "Accepting incoming connection"); loop { - let socket = match self.tcp_listener.lock().unwrap().accept() { + let socket = match self.tcp_listener.lock().unwrap().as_ref().unwrap().accept() { Ok(None) => break, Ok(Some((sock, _addr))) => sock, Err(e) => { @@ -666,8 +696,9 @@ impl Host where Message: Send + Sync + Clone { if let Ok(address) = session.remote_addr() { let entry = NodeEntry { id: session.id().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone())); - if let Some(ref discovery) = self.discovery { - discovery.lock().unwrap().add_node(entry); + let mut discovery = self.discovery.lock().unwrap(); + if let &mut Some(ref mut discovery) = discovery.deref_mut() { + discovery.add_node(entry); } } } @@ -760,13 +791,8 @@ impl Host where Message: Send + Sync + Clone { impl IoHandler> for Host where Message: Send + Sync + Clone + 'static { /// Initialize networking fn initialize(&self, io: &IoContext>) { - io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener"); io.register_timer(IDLE, MAINTENANCE_TIMEOUT).expect("Error registering Network idle timer"); - if self.discovery.is_some() { - io.register_stream(DISCOVERY).expect("Error registering UDP listener"); - io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); - io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); - } + io.register_timer(INIT_PUBLIC, 0).expect("Error registering initialization timer"); self.maintain_network(io) } @@ -784,7 +810,7 @@ impl IoHandler> for Host where Messa FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_readable(stream, io), DISCOVERY => { - let node_changes = { self.discovery.as_ref().unwrap().lock().unwrap().readable() }; + let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable() }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -800,7 +826,7 @@ impl IoHandler> for Host where Messa FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_writable(stream, io), DISCOVERY => { - self.discovery.as_ref().unwrap().lock().unwrap().writable(); + self.discovery.lock().unwrap().as_mut().unwrap().writable(); io.update_registration(DISCOVERY).expect("Error updating discovery registration"); } _ => panic!("Received unknown writable token"), @@ -810,14 +836,15 @@ impl IoHandler> for Host where Messa fn timeout(&self, io: &IoContext>, token: TimerToken) { match token { IDLE => self.maintain_network(io), + INIT_PUBLIC => self.init_public_interface(io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io), DISCOVERY_REFRESH => { - self.discovery.as_ref().unwrap().lock().unwrap().refresh(); + self.discovery.lock().unwrap().as_mut().unwrap().refresh(); io.update_registration(DISCOVERY).expect("Error updating discovery registration"); }, DISCOVERY_ROUND => { - let node_changes = { self.discovery.as_ref().unwrap().lock().unwrap().round() }; + let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } @@ -892,8 +919,8 @@ impl IoHandler> for Host where Messa connection.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.as_ref().unwrap().lock().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), + TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -915,7 +942,6 @@ impl IoHandler> for Host where Messa } } DISCOVERY => (), - TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(), _ => warn!("Unexpected stream deregistration") } } @@ -934,8 +960,8 @@ impl IoHandler> for Host where Messa connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.as_ref().unwrap().lock().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), + TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } } diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 1cd48abe1..7b9388e85 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -42,7 +42,6 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat let host = Arc::new(Host::new(config)); let stats = host.stats().clone(); let host_info = host.client_version(); - info!("Node URL: {}", host.client_url()); try!(io_service.register_handler(host)); Ok(NetworkService { io_service: io_service,