diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 95a891198..bcfe7724f 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -146,7 +146,7 @@ mod tests { fn it_can_be_started() { let spec = get_test_spec(); let temp_path = RandomTempPath::new(); - let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path()); + let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_with_port(40456), &temp_path.as_path()); assert!(service.is_ok()); } } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 6baf0cf76..02c576424 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -105,14 +105,6 @@ impl NetworkConfiguration { config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap()); config } - - /// Create new default configuration for localhost-only connection with random port (usefull for testing) - pub fn new_local() -> NetworkConfiguration { - let mut config = NetworkConfiguration::new(); - config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap()); - config.nat_enabled = false; - config - } } // Tokens @@ -277,12 +269,12 @@ pub struct HostInfo { pub protocol_version: u32, /// Client identifier pub client_version: String, + /// TCP connection port. + pub listen_port: u16, /// Registered capabilities (handlers) pub capabilities: Vec, - /// Local address + discovery port - pub local_endpoint: NodeEndpoint, /// Public address + discovery port - pub public_endpoint: Option, + public_endpoint: NodeEndpoint, } impl HostInfo { @@ -315,7 +307,7 @@ struct ProtocolTimer { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host where Message: Send + Sync + Clone { pub info: RwLock, - tcp_listener: Mutex, + tcp_listener: Mutex>, handshakes: Arc>>, sessions: Arc>>, discovery: Mutex>, @@ -329,12 +321,13 @@ pub struct Host where Message: Send + Sync + Clone { impl Host where Message: Send + Sync + Clone { /// Create a new instance - pub fn new(config: NetworkConfiguration) -> Result, UtilError> { - let mut listen_address = match config.listen_address { + pub fn new(config: NetworkConfiguration) -> Host { + let listen_address = match config.listen_address { None => SocketAddr::from_str("0.0.0.0:30304").unwrap(), Some(addr) => addr, }; + let udp_port = config.udp_port.unwrap_or(listen_address.port()); let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { @@ -349,12 +342,7 @@ impl Host where Message: Send + Sync + Clone { |s| KeyPair::from_secret(s).expect("Error creating node secret key")) }; let path = config.config_path.clone(); - // Setup the server socket - let tcp_listener = try!(TcpListener::bind(&listen_address)); - listen_address = SocketAddr::new(listen_address.ip(), try!(tcp_listener.local_addr()).port()); - let udp_port = config.udp_port.unwrap_or(listen_address.port()); let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; - let mut host = Host:: { info: RwLock::new(HostInfo { keys: keys, @@ -362,12 +350,12 @@ impl Host where Message: Send + Sync + Clone { nonce: H256::random(), protocol_version: PROTOCOL_VERSION, client_version: version(), + listen_port: 0, capabilities: Vec::new(), - public_endpoint: None, - local_endpoint: local_endpoint, + public_endpoint: local_endpoint, // will be replaced by public once it is resolved }), discovery: Mutex::new(None), - tcp_listener: Mutex::new(tcp_listener), + tcp_listener: Mutex::new(None), handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))), sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), nodes: RwLock::new(NodeTable::new(path)), @@ -377,12 +365,14 @@ impl Host where Message: Send + Sync + Clone { stats: Arc::new(NetworkStats::default()), pinned_nodes: Vec::new(), }; + let port = listen_address.port(); + host.info.write().unwrap().deref_mut().listen_port = port; let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); for n in boot_nodes { host.add_node(&n); } - Ok(host) + host } pub fn stats(&self) -> Arc { @@ -407,50 +397,50 @@ impl Host where Message: Send + Sync + Clone { self.info.read().unwrap().client_version.clone() } - pub fn external_url(&self) -> Option { - self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone()))) + pub fn client_url(&self) -> String { + format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().public_endpoint.clone())) } - pub fn local_url(&self) -> String { - let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone())); - println!("{}", r); - r - } - - fn init_public_interface(&self, io: &IoContext>) -> Result<(), UtilError> { + fn init_public_interface(&self, io: &IoContext>) { io.clear_timer(INIT_PUBLIC).unwrap(); - if self.info.read().unwrap().public_endpoint.is_some() { - return Ok(()); + let mut tcp_listener = self.tcp_listener.lock().unwrap(); + if tcp_listener.is_some() { + return; } - let local_endpoint = self.info.read().unwrap().local_endpoint.clone(); + // public_endpoint in host info contains local adderss at this point + let listen_address = self.info.read().unwrap().public_endpoint.address.clone(); + let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port()); let public_address = self.info.read().unwrap().config.public_address.clone(); let public_endpoint = match public_address { None => { - let public_address = select_public_address(local_endpoint.address.port()); - let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port }; + let public_address = select_public_address(listen_address.port()); + let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port }; if self.info.read().unwrap().config.nat_enabled { match map_external_address(&local_endpoint) { Some(endpoint) => { - info!("NAT mapped to external address {}", endpoint.address); + info!("NAT mappped to external address {}", endpoint.address); endpoint }, - None => public_endpoint + None => local_endpoint } } else { - public_endpoint + local_endpoint } } - Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port } + Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port } }; - self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone()); - info!("Public node URL: {}", self.external_url().unwrap()); + // Setup the server socket + *tcp_listener = Some(TcpListener::bind(&listen_address).unwrap()); + self.info.write().unwrap().public_endpoint = public_endpoint.clone(); + io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener"); + info!("Public node URL: {}", self.client_url()); // Initialize discovery. let discovery = { let info = self.info.read().unwrap(); if info.config.discovery_enabled && !info.config.pin { - Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) + Some(Discovery::new(&info.keys, listen_address.clone(), public_endpoint, DISCOVERY)) } else { None } }; @@ -464,8 +454,6 @@ impl Host where Message: Send + Sync + Clone { io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); *self.discovery.lock().unwrap().deref_mut() = Some(discovery); } - try!(io.register_stream(TCP_ACCEPT)); - Ok(()) } fn maintain_network(&self, io: &IoContext>) { @@ -579,7 +567,7 @@ impl Host where Message: Send + Sync + Clone { fn accept(&self, io: &IoContext>) { trace!(target: "network", "Accepting incoming connection"); loop { - let socket = match self.tcp_listener.lock().unwrap().accept() { + let socket = match self.tcp_listener.lock().unwrap().as_ref().unwrap().accept() { Ok(None) => break, Ok(Some((sock, _addr))) => sock, Err(e) => { @@ -873,8 +861,7 @@ impl IoHandler> for Host where Messa fn timeout(&self, io: &IoContext>, token: TimerToken) { match token { IDLE => self.maintain_network(io), - INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e| - warn!("Error initializing public interface: {:?}", e)), + INIT_PUBLIC => self.init_public_interface(io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io), DISCOVERY_REFRESH => { @@ -958,7 +945,7 @@ impl IoHandler> for Host where Messa } } DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -999,7 +986,7 @@ impl IoHandler> for Host where Messa } } DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } } @@ -1067,6 +1054,6 @@ fn host_client_url() { let mut config = NetworkConfiguration::new(); let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2"); config.use_secret = Some(key); - let host: Host = Host::new(config).unwrap(); - assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); + let host: Host = Host::new(config); + assert!(host.client_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); } diff --git a/util/src/network/ip_utils.rs b/util/src/network/ip_utils.rs index 27ff29737..b37a47064 100644 --- a/util/src/network/ip_utils.rs +++ b/util/src/network/ip_utils.rs @@ -208,7 +208,6 @@ fn can_select_public_address() { assert!(pub_address.port() == 40477); } -#[ignore] #[test] fn can_map_external_address_or_fail() { let pub_address = select_public_address(40478); diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 29f3d166c..50645f2be 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -56,7 +56,7 @@ //! } //! //! fn main () { -//! let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); +//! let mut service = NetworkService::::start(NetworkConfiguration::new_with_port(40412)).expect("Error creating network service"); //! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); //! //! // Wait for quit condition diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 49957f7e7..7b9388e85 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -28,7 +28,6 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, - host: Arc>, stats: Arc, panic_handler: Arc } @@ -40,16 +39,15 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat let mut io_service = try!(IoService::>::start()); panic_handler.forward_from(&io_service); - let host = Arc::new(try!(Host::new(config))); + let host = Arc::new(Host::new(config)); let stats = host.stats().clone(); let host_info = host.client_version(); - try!(io_service.register_handler(host.clone())); + try!(io_service.register_handler(host)); Ok(NetworkService { io_service: io_service, host_info: host_info, stats: stats, - panic_handler: panic_handler, - host: host, + panic_handler: panic_handler }) } @@ -73,22 +71,13 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat &mut self.io_service } - /// Returns network statistics. + /// Returns underlying io service. pub fn stats(&self) -> &NetworkStats { &self.stats } - - /// Returns external url if available. - pub fn external_url(&self) -> Option { - self.host.external_url() - } - - /// Returns external url if available. - pub fn local_url(&self) -> String { - self.host.local_url() - } } + impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 7dbcc4229..2f30d7376 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -315,7 +315,7 @@ impl Session { .append(&host.protocol_version) .append(&host.client_version) .append(&host.capabilities) - .append(&host.local_endpoint.address.port()) + .append(&host.listen_port) .append(host.id()); self.connection.send_packet(&rlp.out()) } diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index 8df3b4028..f8ef588f6 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -97,21 +97,21 @@ impl NetworkProtocolHandler for TestProtocol { #[test] fn net_service() { - let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); + let mut service = NetworkService::::start(NetworkConfiguration::new_with_port(40414)).expect("Error creating network service"); service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap(); } #[test] fn net_connect() { - ::log::init_log(); let key1 = KeyPair::create().unwrap(); - let mut config1 = NetworkConfiguration::new_local(); + let mut config1 = NetworkConfiguration::new_with_port(30354); config1.use_secret = Some(key1.secret().clone()); + config1.nat_enabled = false; config1.boot_nodes = vec![ ]; + let mut config2 = NetworkConfiguration::new_with_port(30355); + config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30354", key1.public().hex()) ]; + config2.nat_enabled = false; let mut service1 = NetworkService::::start(config1).unwrap(); - let mut config2 = NetworkConfiguration::new_local(); - info!("net_connect: local URL: {}", service1.local_url()); - config2.boot_nodes = vec![ service1.local_url() ]; let mut service2 = NetworkService::::start(config2).unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let handler2 = TestProtocol::register(&mut service2, false); @@ -125,12 +125,14 @@ fn net_connect() { #[test] fn net_disconnect() { let key1 = KeyPair::create().unwrap(); - let mut config1 = NetworkConfiguration::new_local(); + let mut config1 = NetworkConfiguration::new_with_port(30364); config1.use_secret = Some(key1.secret().clone()); + config1.nat_enabled = false; config1.boot_nodes = vec![ ]; + let mut config2 = NetworkConfiguration::new_with_port(30365); + config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30364", key1.public().hex()) ]; + config2.nat_enabled = false; let mut service1 = NetworkService::::start(config1).unwrap(); - let mut config2 = NetworkConfiguration::new_local(); - config2.boot_nodes = vec![ service1.local_url() ]; let mut service2 = NetworkService::::start(config2).unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let handler2 = TestProtocol::register(&mut service2, true); @@ -143,7 +145,7 @@ fn net_disconnect() { #[test] fn net_timeout() { - let config = NetworkConfiguration::new_local(); + let config = NetworkConfiguration::new_with_port(30346); let mut service = NetworkService::::start(config).unwrap(); let handler = TestProtocol::register(&mut service, false); while !handler.got_timeout() {