Merge pull request #782 from ethcore/test-fix

Auto detect available port
This commit is contained in:
Gav Wood 2016-03-20 10:23:55 +01:00
commit 004cd00f13
7 changed files with 83 additions and 60 deletions

View File

@ -146,7 +146,7 @@ mod tests {
fn it_can_be_started() { fn it_can_be_started() {
let spec = get_test_spec(); let spec = get_test_spec();
let temp_path = RandomTempPath::new(); let temp_path = RandomTempPath::new();
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_with_port(40456), &temp_path.as_path()); let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path());
assert!(service.is_ok()); assert!(service.is_ok());
} }
} }

View File

@ -105,6 +105,14 @@ impl NetworkConfiguration {
config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap()); config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap());
config config
} }
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap());
config.nat_enabled = false;
config
}
} }
// Tokens // Tokens
@ -269,12 +277,12 @@ pub struct HostInfo {
pub protocol_version: u32, pub protocol_version: u32,
/// Client identifier /// Client identifier
pub client_version: String, pub client_version: String,
/// TCP connection port.
pub listen_port: u16,
/// Registered capabilities (handlers) /// Registered capabilities (handlers)
pub capabilities: Vec<CapabilityInfo>, pub capabilities: Vec<CapabilityInfo>,
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
/// Public address + discovery port /// Public address + discovery port
public_endpoint: NodeEndpoint, pub public_endpoint: Option<NodeEndpoint>,
} }
impl HostInfo { impl HostInfo {
@ -307,7 +315,7 @@ struct ProtocolTimer {
/// Root IO handler. Manages protocol handlers, IO timers and network connections. /// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host<Message> where Message: Send + Sync + Clone { pub struct Host<Message> where Message: Send + Sync + Clone {
pub info: RwLock<HostInfo>, pub info: RwLock<HostInfo>,
tcp_listener: Mutex<Option<TcpListener>>, tcp_listener: Mutex<TcpListener>,
handshakes: Arc<RwLock<Slab<SharedHandshake>>>, handshakes: Arc<RwLock<Slab<SharedHandshake>>>,
sessions: Arc<RwLock<Slab<SharedSession>>>, sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery>>, discovery: Mutex<Option<Discovery>>,
@ -321,13 +329,12 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
impl<Message> Host<Message> where Message: Send + Sync + Clone { impl<Message> Host<Message> where Message: Send + Sync + Clone {
/// Create a new instance /// Create a new instance
pub fn new(config: NetworkConfiguration) -> Host<Message> { pub fn new(config: NetworkConfiguration) -> Result<Host<Message>, UtilError> {
let listen_address = match config.listen_address { let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(), None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
Some(addr) => addr, Some(addr) => addr,
}; };
let udp_port = config.udp_port.unwrap_or(listen_address.port());
let keys = if let Some(ref secret) = config.use_secret { let keys = if let Some(ref secret) = config.use_secret {
KeyPair::from_secret(secret.clone()).unwrap() KeyPair::from_secret(secret.clone()).unwrap()
} else { } else {
@ -342,7 +349,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|s| KeyPair::from_secret(s).expect("Error creating node secret key")) |s| KeyPair::from_secret(s).expect("Error creating node secret key"))
}; };
let path = config.config_path.clone(); let path = config.config_path.clone();
// Setup the server socket
let tcp_listener = try!(TcpListener::bind(&listen_address));
listen_address = SocketAddr::new(listen_address.ip(), try!(tcp_listener.local_addr()).port());
let udp_port = config.udp_port.unwrap_or(listen_address.port());
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
let mut host = Host::<Message> { let mut host = Host::<Message> {
info: RwLock::new(HostInfo { info: RwLock::new(HostInfo {
keys: keys, keys: keys,
@ -350,12 +362,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
nonce: H256::random(), nonce: H256::random(),
protocol_version: PROTOCOL_VERSION, protocol_version: PROTOCOL_VERSION,
client_version: version(), client_version: version(),
listen_port: 0,
capabilities: Vec::new(), capabilities: Vec::new(),
public_endpoint: local_endpoint, // will be replaced by public once it is resolved public_endpoint: None,
local_endpoint: local_endpoint,
}), }),
discovery: Mutex::new(None), discovery: Mutex::new(None),
tcp_listener: Mutex::new(None), tcp_listener: Mutex::new(tcp_listener),
handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))), handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
nodes: RwLock::new(NodeTable::new(path)), nodes: RwLock::new(NodeTable::new(path)),
@ -365,14 +377,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
stats: Arc::new(NetworkStats::default()), stats: Arc::new(NetworkStats::default()),
pinned_nodes: Vec::new(), pinned_nodes: Vec::new(),
}; };
let port = listen_address.port();
host.info.write().unwrap().deref_mut().listen_port = port;
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
for n in boot_nodes { for n in boot_nodes {
host.add_node(&n); host.add_node(&n);
} }
host Ok(host)
} }
pub fn stats(&self) -> Arc<NetworkStats> { pub fn stats(&self) -> Arc<NetworkStats> {
@ -397,50 +407,50 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.info.read().unwrap().client_version.clone() self.info.read().unwrap().client_version.clone()
} }
pub fn client_url(&self) -> String { pub fn external_url(&self) -> Option<String> {
format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().public_endpoint.clone())) self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone())))
} }
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) { pub fn local_url(&self) -> String {
let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone()));
println!("{}", r);
r
}
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap(); io.clear_timer(INIT_PUBLIC).unwrap();
let mut tcp_listener = self.tcp_listener.lock().unwrap(); if self.info.read().unwrap().public_endpoint.is_some() {
if tcp_listener.is_some() { return Ok(());
return;
} }
// public_endpoint in host info contains local adderss at this point let local_endpoint = self.info.read().unwrap().local_endpoint.clone();
let listen_address = self.info.read().unwrap().public_endpoint.address.clone();
let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port());
let public_address = self.info.read().unwrap().config.public_address.clone(); let public_address = self.info.read().unwrap().config.public_address.clone();
let public_endpoint = match public_address { let public_endpoint = match public_address {
None => { None => {
let public_address = select_public_address(listen_address.port()); let public_address = select_public_address(local_endpoint.address.port());
let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port }; let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port };
if self.info.read().unwrap().config.nat_enabled { if self.info.read().unwrap().config.nat_enabled {
match map_external_address(&local_endpoint) { match map_external_address(&local_endpoint) {
Some(endpoint) => { Some(endpoint) => {
info!("NAT mappped to external address {}", endpoint.address); info!("NAT mapped to external address {}", endpoint.address);
endpoint endpoint
}, },
None => local_endpoint None => public_endpoint
} }
} else { } else {
local_endpoint public_endpoint
} }
} }
Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port } Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
}; };
// Setup the server socket self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone());
*tcp_listener = Some(TcpListener::bind(&listen_address).unwrap()); info!("Public node URL: {}", self.external_url().unwrap());
self.info.write().unwrap().public_endpoint = public_endpoint.clone();
io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener");
info!("Public node URL: {}", self.client_url());
// Initialize discovery. // Initialize discovery.
let discovery = { let discovery = {
let info = self.info.read().unwrap(); let info = self.info.read().unwrap();
if info.config.discovery_enabled && !info.config.pin { if info.config.discovery_enabled && !info.config.pin {
Some(Discovery::new(&info.keys, listen_address.clone(), public_endpoint, DISCOVERY)) Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None } } else { None }
}; };
@ -454,6 +464,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer"); io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
*self.discovery.lock().unwrap().deref_mut() = Some(discovery); *self.discovery.lock().unwrap().deref_mut() = Some(discovery);
} }
try!(io.register_stream(TCP_ACCEPT));
Ok(())
} }
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) { fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
@ -567,7 +579,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) { fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "network", "Accepting incoming connection"); trace!(target: "network", "Accepting incoming connection");
loop { loop {
let socket = match self.tcp_listener.lock().unwrap().as_ref().unwrap().accept() { let socket = match self.tcp_listener.lock().unwrap().accept() {
Ok(None) => break, Ok(None) => break,
Ok(Some((sock, _addr))) => sock, Ok(Some((sock, _addr))) => sock,
Err(e) => { Err(e) => {
@ -861,7 +873,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) { fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
match token { match token {
IDLE => self.maintain_network(io), IDLE => self.maintain_network(io),
INIT_PUBLIC => self.init_public_interface(io), INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
warn!("Error initializing public interface: {:?}", e)),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io), FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io),
DISCOVERY_REFRESH => { DISCOVERY_REFRESH => {
@ -945,7 +958,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} }
} }
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"), DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"), TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration") _ => warn!("Unexpected stream registration")
} }
} }
@ -986,7 +999,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} }
} }
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"), DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"), TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update") _ => warn!("Unexpected stream update")
} }
} }
@ -1054,6 +1067,6 @@ fn host_client_url() {
let mut config = NetworkConfiguration::new(); let mut config = NetworkConfiguration::new();
let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2"); let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2");
config.use_secret = Some(key); config.use_secret = Some(key);
let host: Host<u32> = Host::new(config); let host: Host<u32> = Host::new(config).unwrap();
assert!(host.client_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
} }

View File

@ -208,6 +208,7 @@ fn can_select_public_address() {
assert!(pub_address.port() == 40477); assert!(pub_address.port() == 40477);
} }
#[ignore]
#[test] #[test]
fn can_map_external_address_or_fail() { fn can_map_external_address_or_fail() {
let pub_address = select_public_address(40478); let pub_address = select_public_address(40478);

View File

@ -56,7 +56,7 @@
//! } //! }
//! //!
//! fn main () { //! fn main () {
//! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_with_port(40412)).expect("Error creating network service"); //! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); //! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
//! //!
//! // Wait for quit condition //! // Wait for quit condition

View File

@ -28,6 +28,7 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static { pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>, io_service: IoService<NetworkIoMessage<Message>>,
host_info: String, host_info: String,
host: Arc<Host<Message>>,
stats: Arc<NetworkStats>, stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler> panic_handler: Arc<PanicHandler>
} }
@ -39,15 +40,16 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start()); let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service); panic_handler.forward_from(&io_service);
let host = Arc::new(Host::new(config)); let host = Arc::new(try!(Host::new(config)));
let stats = host.stats().clone(); let stats = host.stats().clone();
let host_info = host.client_version(); let host_info = host.client_version();
try!(io_service.register_handler(host)); try!(io_service.register_handler(host.clone()));
Ok(NetworkService { Ok(NetworkService {
io_service: io_service, io_service: io_service,
host_info: host_info, host_info: host_info,
stats: stats, stats: stats,
panic_handler: panic_handler panic_handler: panic_handler,
host: host,
}) })
} }
@ -71,12 +73,21 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
&mut self.io_service &mut self.io_service
} }
/// Returns underlying io service. /// Returns network statistics.
pub fn stats(&self) -> &NetworkStats { pub fn stats(&self) -> &NetworkStats {
&self.stats &self.stats
} }
}
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
self.host.external_url()
}
/// Returns external url if available.
pub fn local_url(&self) -> String {
self.host.local_url()
}
}
impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener { fn on_panic<F>(&self, closure: F) where F: OnPanicListener {

View File

@ -315,7 +315,7 @@ impl Session {
.append(&host.protocol_version) .append(&host.protocol_version)
.append(&host.client_version) .append(&host.client_version)
.append(&host.capabilities) .append(&host.capabilities)
.append(&host.listen_port) .append(&host.local_endpoint.address.port())
.append(host.id()); .append(host.id());
self.connection.send_packet(&rlp.out()) self.connection.send_packet(&rlp.out())
} }

View File

@ -97,21 +97,21 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
#[test] #[test]
fn net_service() { fn net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_with_port(40414)).expect("Error creating network service"); let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap(); service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
} }
#[test] #[test]
fn net_connect() { fn net_connect() {
::log::init_log();
let key1 = KeyPair::create().unwrap(); let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30354); let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone()); config1.use_secret = Some(key1.secret().clone());
config1.nat_enabled = false;
config1.boot_nodes = vec![ ]; config1.boot_nodes = vec![ ];
let mut config2 = NetworkConfiguration::new_with_port(30355);
config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30354", key1.public().hex()) ];
config2.nat_enabled = false;
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap(); let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut config2 = NetworkConfiguration::new_local();
info!("net_connect: local URL: {}", service1.local_url());
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap(); let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
let handler1 = TestProtocol::register(&mut service1, false); let handler1 = TestProtocol::register(&mut service1, false);
let handler2 = TestProtocol::register(&mut service2, false); let handler2 = TestProtocol::register(&mut service2, false);
@ -125,14 +125,12 @@ fn net_connect() {
#[test] #[test]
fn net_disconnect() { fn net_disconnect() {
let key1 = KeyPair::create().unwrap(); let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30364); let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone()); config1.use_secret = Some(key1.secret().clone());
config1.nat_enabled = false;
config1.boot_nodes = vec![ ]; config1.boot_nodes = vec![ ];
let mut config2 = NetworkConfiguration::new_with_port(30365);
config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30364", key1.public().hex()) ];
config2.nat_enabled = false;
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap(); let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap(); let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
let handler1 = TestProtocol::register(&mut service1, false); let handler1 = TestProtocol::register(&mut service1, false);
let handler2 = TestProtocol::register(&mut service2, true); let handler2 = TestProtocol::register(&mut service2, true);
@ -145,7 +143,7 @@ fn net_disconnect() {
#[test] #[test]
fn net_timeout() { fn net_timeout() {
let config = NetworkConfiguration::new_with_port(30346); let config = NetworkConfiguration::new_local();
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap(); let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let handler = TestProtocol::register(&mut service, false); let handler = TestProtocol::register(&mut service, false);
while !handler.got_timeout() { while !handler.got_timeout() {