diff --git a/.travis.yml b/.travis.yml index f7ac723e1..f66b38c51 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,6 +41,7 @@ env: - RUN_COVERAGE="false" - RUN_BUILD="false" - RUN_BENCHES="false" + - RUST_BACKTRACE="1" cache: apt: true directories: diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index bcfe7724f..95a891198 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -146,7 +146,7 @@ mod tests { fn it_can_be_started() { let spec = get_test_spec(); let temp_path = RandomTempPath::new(); - let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_with_port(40456), &temp_path.as_path()); + let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path()); assert!(service.is_ok()); } } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 02c576424..d4e6d13ef 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -105,6 +105,14 @@ impl NetworkConfiguration { config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap()); config } + + /// Create new default configuration for localhost-only connection with random port (usefull for testing) + pub fn new_local() -> NetworkConfiguration { + let mut config = NetworkConfiguration::new(); + config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap()); + config.nat_enabled = false; + config + } } // Tokens @@ -269,12 +277,12 @@ pub struct HostInfo { pub protocol_version: u32, /// Client identifier pub client_version: String, - /// TCP connection port. - pub listen_port: u16, /// Registered capabilities (handlers) pub capabilities: Vec, + /// Local address + discovery port + pub local_endpoint: NodeEndpoint, /// Public address + discovery port - public_endpoint: NodeEndpoint, + pub public_endpoint: Option, } impl HostInfo { @@ -307,7 +315,7 @@ struct ProtocolTimer { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host where Message: Send + Sync + Clone { pub info: RwLock, - tcp_listener: Mutex>, + tcp_listener: Mutex, handshakes: Arc>>, sessions: Arc>>, discovery: Mutex>, @@ -321,13 +329,12 @@ pub struct Host where Message: Send + Sync + Clone { impl Host where Message: Send + Sync + Clone { /// Create a new instance - pub fn new(config: NetworkConfiguration) -> Host { - let listen_address = match config.listen_address { + pub fn new(config: NetworkConfiguration) -> Result, UtilError> { + let mut listen_address = match config.listen_address { None => SocketAddr::from_str("0.0.0.0:30304").unwrap(), Some(addr) => addr, }; - let udp_port = config.udp_port.unwrap_or(listen_address.port()); let keys = if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { @@ -342,7 +349,12 @@ impl Host where Message: Send + Sync + Clone { |s| KeyPair::from_secret(s).expect("Error creating node secret key")) }; let path = config.config_path.clone(); + // Setup the server socket + let tcp_listener = try!(TcpListener::bind(&listen_address)); + listen_address = SocketAddr::new(listen_address.ip(), try!(tcp_listener.local_addr()).port()); + let udp_port = config.udp_port.unwrap_or(listen_address.port()); let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; + let mut host = Host:: { info: RwLock::new(HostInfo { keys: keys, @@ -350,12 +362,12 @@ impl Host where Message: Send + Sync + Clone { nonce: H256::random(), protocol_version: PROTOCOL_VERSION, client_version: version(), - listen_port: 0, capabilities: Vec::new(), - public_endpoint: local_endpoint, // will be replaced by public once it is resolved + public_endpoint: None, + local_endpoint: local_endpoint, }), discovery: Mutex::new(None), - tcp_listener: Mutex::new(None), + tcp_listener: Mutex::new(tcp_listener), handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))), sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), nodes: RwLock::new(NodeTable::new(path)), @@ -365,14 +377,12 @@ impl Host where Message: Send + Sync + Clone { stats: Arc::new(NetworkStats::default()), pinned_nodes: Vec::new(), }; - let port = listen_address.port(); - host.info.write().unwrap().deref_mut().listen_port = port; let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); for n in boot_nodes { host.add_node(&n); } - host + Ok(host) } pub fn stats(&self) -> Arc { @@ -397,50 +407,50 @@ impl Host where Message: Send + Sync + Clone { self.info.read().unwrap().client_version.clone() } - pub fn client_url(&self) -> String { - format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().public_endpoint.clone())) + pub fn external_url(&self) -> Option { + self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone()))) } - fn init_public_interface(&self, io: &IoContext>) { + pub fn local_url(&self) -> String { + let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone())); + println!("{}", r); + r + } + + fn init_public_interface(&self, io: &IoContext>) -> Result<(), UtilError> { io.clear_timer(INIT_PUBLIC).unwrap(); - let mut tcp_listener = self.tcp_listener.lock().unwrap(); - if tcp_listener.is_some() { - return; + if self.info.read().unwrap().public_endpoint.is_some() { + return Ok(()); } - // public_endpoint in host info contains local adderss at this point - let listen_address = self.info.read().unwrap().public_endpoint.address.clone(); - let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port()); + let local_endpoint = self.info.read().unwrap().local_endpoint.clone(); let public_address = self.info.read().unwrap().config.public_address.clone(); let public_endpoint = match public_address { None => { - let public_address = select_public_address(listen_address.port()); - let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port }; + let public_address = select_public_address(local_endpoint.address.port()); + let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port }; if self.info.read().unwrap().config.nat_enabled { match map_external_address(&local_endpoint) { Some(endpoint) => { - info!("NAT mappped to external address {}", endpoint.address); + info!("NAT mapped to external address {}", endpoint.address); endpoint }, - None => local_endpoint + None => public_endpoint } } else { - local_endpoint + public_endpoint } } - Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port } + Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port } }; - // Setup the server socket - *tcp_listener = Some(TcpListener::bind(&listen_address).unwrap()); - self.info.write().unwrap().public_endpoint = public_endpoint.clone(); - io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener"); - info!("Public node URL: {}", self.client_url()); + self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone()); + info!("Public node URL: {}", self.external_url().unwrap()); // Initialize discovery. let discovery = { let info = self.info.read().unwrap(); if info.config.discovery_enabled && !info.config.pin { - Some(Discovery::new(&info.keys, listen_address.clone(), public_endpoint, DISCOVERY)) + Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) } else { None } }; @@ -449,11 +459,13 @@ impl Host where Message: Send + Sync + Clone { for n in self.nodes.read().unwrap().unordered_entries() { discovery.add_node(n.clone()); } + *self.discovery.lock().unwrap().deref_mut() = Some(discovery); io.register_stream(DISCOVERY).expect("Error registering UDP listener"); io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); - *self.discovery.lock().unwrap().deref_mut() = Some(discovery); } + try!(io.register_stream(TCP_ACCEPT)); + Ok(()) } fn maintain_network(&self, io: &IoContext>) { @@ -567,7 +579,7 @@ impl Host where Message: Send + Sync + Clone { fn accept(&self, io: &IoContext>) { trace!(target: "network", "Accepting incoming connection"); loop { - let socket = match self.tcp_listener.lock().unwrap().as_ref().unwrap().accept() { + let socket = match self.tcp_listener.lock().unwrap().accept() { Ok(None) => break, Ok(Some((sock, _addr))) => sock, Err(e) => { @@ -861,7 +873,8 @@ impl IoHandler> for Host where Messa fn timeout(&self, io: &IoContext>, token: TimerToken) { match token { IDLE => self.maintain_network(io), - INIT_PUBLIC => self.init_public_interface(io), + INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e| + warn!("Error initializing public interface: {:?}", e)), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io), DISCOVERY_REFRESH => { @@ -945,7 +958,7 @@ impl IoHandler> for Host where Messa } } DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), - TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), + TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } } @@ -986,7 +999,7 @@ impl IoHandler> for Host where Messa } } DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), - TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), + TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } } @@ -1054,6 +1067,6 @@ fn host_client_url() { let mut config = NetworkConfiguration::new(); let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2"); config.use_secret = Some(key); - let host: Host = Host::new(config); - assert!(host.client_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); + let host: Host = Host::new(config).unwrap(); + assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); } diff --git a/util/src/network/ip_utils.rs b/util/src/network/ip_utils.rs index b37a47064..27ff29737 100644 --- a/util/src/network/ip_utils.rs +++ b/util/src/network/ip_utils.rs @@ -208,6 +208,7 @@ fn can_select_public_address() { assert!(pub_address.port() == 40477); } +#[ignore] #[test] fn can_map_external_address_or_fail() { let pub_address = select_public_address(40478); diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 50645f2be..29f3d166c 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -56,7 +56,7 @@ //! } //! //! fn main () { -//! let mut service = NetworkService::::start(NetworkConfiguration::new_with_port(40412)).expect("Error creating network service"); +//! let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); //! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); //! //! // Wait for quit condition diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 7b9388e85..49957f7e7 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -28,6 +28,7 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, + host: Arc>, stats: Arc, panic_handler: Arc } @@ -39,15 +40,16 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat let mut io_service = try!(IoService::>::start()); panic_handler.forward_from(&io_service); - let host = Arc::new(Host::new(config)); + let host = Arc::new(try!(Host::new(config))); let stats = host.stats().clone(); let host_info = host.client_version(); - try!(io_service.register_handler(host)); + try!(io_service.register_handler(host.clone())); Ok(NetworkService { io_service: io_service, host_info: host_info, stats: stats, - panic_handler: panic_handler + panic_handler: panic_handler, + host: host, }) } @@ -71,12 +73,21 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat &mut self.io_service } - /// Returns underlying io service. + /// Returns network statistics. pub fn stats(&self) -> &NetworkStats { &self.stats } -} + /// Returns external url if available. + pub fn external_url(&self) -> Option { + self.host.external_url() + } + + /// Returns external url if available. + pub fn local_url(&self) -> String { + self.host.local_url() + } +} impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { fn on_panic(&self, closure: F) where F: OnPanicListener { diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 2f30d7376..7dbcc4229 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -315,7 +315,7 @@ impl Session { .append(&host.protocol_version) .append(&host.client_version) .append(&host.capabilities) - .append(&host.listen_port) + .append(&host.local_endpoint.address.port()) .append(host.id()); self.connection.send_packet(&rlp.out()) } diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index f8ef588f6..b43da9320 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -97,23 +97,23 @@ impl NetworkProtocolHandler for TestProtocol { #[test] fn net_service() { - let mut service = NetworkService::::start(NetworkConfiguration::new_with_port(40414)).expect("Error creating network service"); + let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap(); } #[test] fn net_connect() { + ::log::init_log(); let key1 = KeyPair::create().unwrap(); - let mut config1 = NetworkConfiguration::new_with_port(30354); + let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); - config1.nat_enabled = false; config1.boot_nodes = vec![ ]; - let mut config2 = NetworkConfiguration::new_with_port(30355); - config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30354", key1.public().hex()) ]; - config2.nat_enabled = false; let mut service1 = NetworkService::::start(config1).unwrap(); - let mut service2 = NetworkService::::start(config2).unwrap(); let handler1 = TestProtocol::register(&mut service1, false); + let mut config2 = NetworkConfiguration::new_local(); + info!("net_connect: local URL: {}", service1.local_url()); + config2.boot_nodes = vec![ service1.local_url() ]; + let mut service2 = NetworkService::::start(config2).unwrap(); let handler2 = TestProtocol::register(&mut service2, false); while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) { thread::sleep(Duration::from_millis(50)); @@ -125,16 +125,14 @@ fn net_connect() { #[test] fn net_disconnect() { let key1 = KeyPair::create().unwrap(); - let mut config1 = NetworkConfiguration::new_with_port(30364); + let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); - config1.nat_enabled = false; config1.boot_nodes = vec![ ]; - let mut config2 = NetworkConfiguration::new_with_port(30365); - config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30364", key1.public().hex()) ]; - config2.nat_enabled = false; let mut service1 = NetworkService::::start(config1).unwrap(); - let mut service2 = NetworkService::::start(config2).unwrap(); let handler1 = TestProtocol::register(&mut service1, false); + let mut config2 = NetworkConfiguration::new_local(); + config2.boot_nodes = vec![ service1.local_url() ]; + let mut service2 = NetworkService::::start(config2).unwrap(); let handler2 = TestProtocol::register(&mut service2, true); while !(handler1.got_disconnect() && handler2.got_disconnect()) { thread::sleep(Duration::from_millis(50)); @@ -145,7 +143,7 @@ fn net_disconnect() { #[test] fn net_timeout() { - let config = NetworkConfiguration::new_with_port(30346); + let config = NetworkConfiguration::new_local(); let mut service = NetworkService::::start(config).unwrap(); let handler = TestProtocol::register(&mut service, false); while !handler.got_timeout() {