Merge pull request #505 from ethcore/network
Delayed UPnP initialization
This commit is contained in:
commit
8bd585b2b9
@ -73,7 +73,7 @@ Options:
|
|||||||
--address URL Equivalent to --listen-address URL --public-address URL.
|
--address URL Equivalent to --listen-address URL --public-address URL.
|
||||||
--peers NUM Try to manintain that many peers [default: 25].
|
--peers NUM Try to manintain that many peers [default: 25].
|
||||||
--no-discovery Disable new peer discovery.
|
--no-discovery Disable new peer discovery.
|
||||||
--upnp Use UPnP to try to figure out the correct network settings.
|
--no-upnp Disable trying to figure out the correct public adderss over UPnP.
|
||||||
--node-key KEY Specify node secret key, either as 64-character hex string or input to SHA3 operation.
|
--node-key KEY Specify node secret key, either as 64-character hex string or input to SHA3 operation.
|
||||||
|
|
||||||
--cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384].
|
--cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384].
|
||||||
@ -102,7 +102,7 @@ struct Args {
|
|||||||
flag_address: Option<String>,
|
flag_address: Option<String>,
|
||||||
flag_peers: u32,
|
flag_peers: u32,
|
||||||
flag_no_discovery: bool,
|
flag_no_discovery: bool,
|
||||||
flag_upnp: bool,
|
flag_no_upnp: bool,
|
||||||
flag_node_key: Option<String>,
|
flag_node_key: Option<String>,
|
||||||
flag_cache_pref_size: usize,
|
flag_cache_pref_size: usize,
|
||||||
flag_cache_max_size: usize,
|
flag_cache_max_size: usize,
|
||||||
@ -246,7 +246,7 @@ impl Configuration {
|
|||||||
|
|
||||||
fn net_settings(&self, spec: &Spec) -> NetworkConfiguration {
|
fn net_settings(&self, spec: &Spec) -> NetworkConfiguration {
|
||||||
let mut ret = NetworkConfiguration::new();
|
let mut ret = NetworkConfiguration::new();
|
||||||
ret.nat_enabled = self.args.flag_upnp;
|
ret.nat_enabled = !self.args.flag_no_upnp;
|
||||||
ret.boot_nodes = self.init_nodes(spec);
|
ret.boot_nodes = self.init_nodes(spec);
|
||||||
let (listen, public) = self.net_addresses();
|
let (listen, public) = self.net_addresses();
|
||||||
ret.listen_address = listen;
|
ret.listen_address = listen;
|
||||||
|
@ -106,6 +106,7 @@ const IDLE: usize = LAST_HANDSHAKE + 2;
|
|||||||
const DISCOVERY: usize = LAST_HANDSHAKE + 3;
|
const DISCOVERY: usize = LAST_HANDSHAKE + 3;
|
||||||
const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4;
|
const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4;
|
||||||
const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5;
|
const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5;
|
||||||
|
const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6;
|
||||||
const FIRST_SESSION: usize = 0;
|
const FIRST_SESSION: usize = 0;
|
||||||
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
|
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
|
||||||
const FIRST_HANDSHAKE: usize = LAST_SESSION + 1;
|
const FIRST_HANDSHAKE: usize = LAST_SESSION + 1;
|
||||||
@ -261,7 +262,9 @@ pub struct HostInfo {
|
|||||||
/// TCP connection port.
|
/// TCP connection port.
|
||||||
pub listen_port: u16,
|
pub listen_port: u16,
|
||||||
/// Registered capabilities (handlers)
|
/// Registered capabilities (handlers)
|
||||||
pub capabilities: Vec<CapabilityInfo>
|
pub capabilities: Vec<CapabilityInfo>,
|
||||||
|
/// Public address + discovery port
|
||||||
|
public_endpoint: NodeEndpoint,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HostInfo {
|
impl HostInfo {
|
||||||
@ -294,16 +297,15 @@ struct ProtocolTimer {
|
|||||||
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
|
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
|
||||||
pub struct Host<Message> where Message: Send + Sync + Clone {
|
pub struct Host<Message> where Message: Send + Sync + Clone {
|
||||||
pub info: RwLock<HostInfo>,
|
pub info: RwLock<HostInfo>,
|
||||||
tcp_listener: Mutex<TcpListener>,
|
tcp_listener: Mutex<Option<TcpListener>>,
|
||||||
handshakes: Arc<RwLock<Slab<SharedHandshake>>>,
|
handshakes: Arc<RwLock<Slab<SharedHandshake>>>,
|
||||||
sessions: Arc<RwLock<Slab<SharedSession>>>,
|
sessions: Arc<RwLock<Slab<SharedSession>>>,
|
||||||
discovery: Option<Mutex<Discovery>>,
|
discovery: Mutex<Option<Discovery>>,
|
||||||
nodes: RwLock<NodeTable>,
|
nodes: RwLock<NodeTable>,
|
||||||
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler<Message>>>>,
|
handlers: RwLock<HashMap<ProtocolId, Arc<NetworkProtocolHandler<Message>>>>,
|
||||||
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
|
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
|
||||||
timer_counter: RwLock<usize>,
|
timer_counter: RwLock<usize>,
|
||||||
stats: Arc<NetworkStats>,
|
stats: Arc<NetworkStats>,
|
||||||
public_endpoint: NodeEndpoint,
|
|
||||||
pinned_nodes: Vec<NodeId>,
|
pinned_nodes: Vec<NodeId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -316,27 +318,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let udp_port = config.udp_port.unwrap_or(listen_address.port());
|
let udp_port = config.udp_port.unwrap_or(listen_address.port());
|
||||||
let public_endpoint = match config.public_address {
|
|
||||||
None => {
|
|
||||||
let public_address = select_public_address(listen_address.port());
|
|
||||||
let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port };
|
|
||||||
if config.nat_enabled {
|
|
||||||
match map_external_address(&local_endpoint) {
|
|
||||||
Some(endpoint) => {
|
|
||||||
info!("NAT Mappped to external address {}", endpoint.address);
|
|
||||||
endpoint
|
|
||||||
},
|
|
||||||
None => local_endpoint
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
local_endpoint
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port }
|
|
||||||
};
|
|
||||||
|
|
||||||
// Setup the server socket
|
|
||||||
let tcp_listener = TcpListener::bind(&listen_address).unwrap();
|
|
||||||
let keys = if let Some(ref secret) = config.use_secret {
|
let keys = if let Some(ref secret) = config.use_secret {
|
||||||
KeyPair::from_secret(secret.clone()).unwrap()
|
KeyPair::from_secret(secret.clone()).unwrap()
|
||||||
} else {
|
} else {
|
||||||
@ -350,10 +331,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
},
|
},
|
||||||
|s| KeyPair::from_secret(s).expect("Error creating node secret key"))
|
|s| KeyPair::from_secret(s).expect("Error creating node secret key"))
|
||||||
};
|
};
|
||||||
let discovery = if config.discovery_enabled && !config.pin {
|
|
||||||
Some(Discovery::new(&keys, listen_address.clone(), public_endpoint.clone(), DISCOVERY))
|
|
||||||
} else { None };
|
|
||||||
let path = config.config_path.clone();
|
let path = config.config_path.clone();
|
||||||
|
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };
|
||||||
let mut host = Host::<Message> {
|
let mut host = Host::<Message> {
|
||||||
info: RwLock::new(HostInfo {
|
info: RwLock::new(HostInfo {
|
||||||
keys: keys,
|
keys: keys,
|
||||||
@ -363,9 +342,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
client_version: version(),
|
client_version: version(),
|
||||||
listen_port: 0,
|
listen_port: 0,
|
||||||
capabilities: Vec::new(),
|
capabilities: Vec::new(),
|
||||||
|
public_endpoint: local_endpoint, // will be replaced by public once it is resolved
|
||||||
}),
|
}),
|
||||||
discovery: discovery.map(Mutex::new),
|
discovery: Mutex::new(None),
|
||||||
tcp_listener: Mutex::new(tcp_listener),
|
tcp_listener: Mutex::new(None),
|
||||||
handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))),
|
handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))),
|
||||||
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
|
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
|
||||||
nodes: RwLock::new(NodeTable::new(path)),
|
nodes: RwLock::new(NodeTable::new(path)),
|
||||||
@ -373,16 +353,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
timers: RwLock::new(HashMap::new()),
|
timers: RwLock::new(HashMap::new()),
|
||||||
timer_counter: RwLock::new(USER_TIMER),
|
timer_counter: RwLock::new(USER_TIMER),
|
||||||
stats: Arc::new(NetworkStats::default()),
|
stats: Arc::new(NetworkStats::default()),
|
||||||
public_endpoint: public_endpoint,
|
|
||||||
pinned_nodes: Vec::new(),
|
pinned_nodes: Vec::new(),
|
||||||
};
|
};
|
||||||
let port = listen_address.port();
|
let port = listen_address.port();
|
||||||
host.info.write().unwrap().deref_mut().listen_port = port;
|
host.info.write().unwrap().deref_mut().listen_port = port;
|
||||||
|
|
||||||
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
||||||
if let Some(ref mut discovery) = host.discovery {
|
|
||||||
discovery.lock().unwrap().init_node_list(host.nodes.read().unwrap().unordered_entries());
|
|
||||||
}
|
|
||||||
for n in boot_nodes {
|
for n in boot_nodes {
|
||||||
host.add_node(&n);
|
host.add_node(&n);
|
||||||
}
|
}
|
||||||
@ -400,8 +376,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
||||||
self.pinned_nodes.push(n.id.clone());
|
self.pinned_nodes.push(n.id.clone());
|
||||||
self.nodes.write().unwrap().add_node(n);
|
self.nodes.write().unwrap().add_node(n);
|
||||||
if let Some(ref mut discovery) = self.discovery {
|
if let &mut Some(ref mut discovery) = self.discovery.lock().unwrap().deref_mut() {
|
||||||
discovery.lock().unwrap().add_node(entry);
|
discovery.add_node(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -412,7 +388,61 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_url(&self) -> String {
|
pub fn client_url(&self) -> String {
|
||||||
format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.public_endpoint.clone()))
|
format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().public_endpoint.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
|
io.clear_timer(INIT_PUBLIC).unwrap();
|
||||||
|
let mut tcp_listener = self.tcp_listener.lock().unwrap();
|
||||||
|
if tcp_listener.is_some() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// public_endpoint in host info contains local adderss at this point
|
||||||
|
let listen_address = self.info.read().unwrap().public_endpoint.address.clone();
|
||||||
|
let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port());
|
||||||
|
let public_endpoint = match self.info.read().unwrap().config.public_address {
|
||||||
|
None => {
|
||||||
|
let public_address = select_public_address(listen_address.port());
|
||||||
|
let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port };
|
||||||
|
if self.info.read().unwrap().config.nat_enabled {
|
||||||
|
match map_external_address(&local_endpoint) {
|
||||||
|
Some(endpoint) => {
|
||||||
|
info!("NAT mappped to external address {}", endpoint.address);
|
||||||
|
endpoint
|
||||||
|
},
|
||||||
|
None => local_endpoint
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
local_endpoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port }
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup the server socket
|
||||||
|
*tcp_listener = Some(TcpListener::bind(&listen_address).unwrap());
|
||||||
|
self.info.write().unwrap().public_endpoint = public_endpoint.clone();
|
||||||
|
io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener");
|
||||||
|
info!("Public node URL: {}", self.client_url());
|
||||||
|
|
||||||
|
// Initialize discovery.
|
||||||
|
let discovery = {
|
||||||
|
let info = self.info.read().unwrap();
|
||||||
|
if info.config.discovery_enabled && !info.config.pin {
|
||||||
|
Some(Discovery::new(&info.keys, listen_address.clone(), public_endpoint, DISCOVERY))
|
||||||
|
} else { None }
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(mut discovery) = discovery {
|
||||||
|
discovery.init_node_list(self.nodes.read().unwrap().unordered_entries());
|
||||||
|
for n in self.nodes.read().unwrap().unordered_entries() {
|
||||||
|
discovery.add_node(n.clone());
|
||||||
|
}
|
||||||
|
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
|
||||||
|
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
|
||||||
|
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
|
||||||
|
*self.discovery.lock().unwrap().deref_mut() = Some(discovery);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
@ -526,7 +556,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
trace!(target: "network", "Accepting incoming connection");
|
trace!(target: "network", "Accepting incoming connection");
|
||||||
loop {
|
loop {
|
||||||
let socket = match self.tcp_listener.lock().unwrap().accept() {
|
let socket = match self.tcp_listener.lock().unwrap().as_ref().unwrap().accept() {
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
Ok(Some((sock, _addr))) => sock,
|
Ok(Some((sock, _addr))) => sock,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -666,8 +696,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
if let Ok(address) = session.remote_addr() {
|
if let Ok(address) = session.remote_addr() {
|
||||||
let entry = NodeEntry { id: session.id().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
|
let entry = NodeEntry { id: session.id().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
|
||||||
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
|
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
|
||||||
if let Some(ref discovery) = self.discovery {
|
let mut discovery = self.discovery.lock().unwrap();
|
||||||
discovery.lock().unwrap().add_node(entry);
|
if let &mut Some(ref mut discovery) = discovery.deref_mut() {
|
||||||
|
discovery.add_node(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -764,13 +795,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Message: Send + Sync + Clone + 'static {
|
impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Message: Send + Sync + Clone + 'static {
|
||||||
/// Initialize networking
|
/// Initialize networking
|
||||||
fn initialize(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn initialize(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener");
|
|
||||||
io.register_timer(IDLE, MAINTENANCE_TIMEOUT).expect("Error registering Network idle timer");
|
io.register_timer(IDLE, MAINTENANCE_TIMEOUT).expect("Error registering Network idle timer");
|
||||||
if self.discovery.is_some() {
|
io.register_timer(INIT_PUBLIC, 0).expect("Error registering initialization timer");
|
||||||
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
|
|
||||||
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
|
|
||||||
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
|
|
||||||
}
|
|
||||||
self.maintain_network(io)
|
self.maintain_network(io)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -788,7 +814,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
||||||
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_readable(stream, io),
|
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_readable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
let node_changes = { self.discovery.as_ref().unwrap().lock().unwrap().readable() };
|
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable() };
|
||||||
if let Some(node_changes) = node_changes {
|
if let Some(node_changes) = node_changes {
|
||||||
self.update_nodes(io, node_changes);
|
self.update_nodes(io, node_changes);
|
||||||
}
|
}
|
||||||
@ -804,7 +830,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
||||||
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_writable(stream, io),
|
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_writable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
self.discovery.as_ref().unwrap().lock().unwrap().writable();
|
self.discovery.lock().unwrap().as_mut().unwrap().writable();
|
||||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
||||||
}
|
}
|
||||||
_ => panic!("Received unknown writable token"),
|
_ => panic!("Received unknown writable token"),
|
||||||
@ -814,14 +840,15 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
|
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
|
||||||
match token {
|
match token {
|
||||||
IDLE => self.maintain_network(io),
|
IDLE => self.maintain_network(io),
|
||||||
|
INIT_PUBLIC => self.init_public_interface(io),
|
||||||
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
||||||
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io),
|
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io),
|
||||||
DISCOVERY_REFRESH => {
|
DISCOVERY_REFRESH => {
|
||||||
self.discovery.as_ref().unwrap().lock().unwrap().refresh();
|
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
|
||||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
||||||
},
|
},
|
||||||
DISCOVERY_ROUND => {
|
DISCOVERY_ROUND => {
|
||||||
let node_changes = { self.discovery.as_ref().unwrap().lock().unwrap().round() };
|
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() };
|
||||||
if let Some(node_changes) = node_changes {
|
if let Some(node_changes) = node_changes {
|
||||||
self.update_nodes(io, node_changes);
|
self.update_nodes(io, node_changes);
|
||||||
}
|
}
|
||||||
@ -896,8 +923,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
connection.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
|
connection.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.as_ref().unwrap().lock().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
||||||
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||||
_ => warn!("Unexpected stream registration")
|
_ => warn!("Unexpected stream registration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -919,7 +946,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => (),
|
DISCOVERY => (),
|
||||||
TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(),
|
|
||||||
_ => warn!("Unexpected stream deregistration")
|
_ => warn!("Unexpected stream deregistration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -938,8 +964,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket");
|
connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DISCOVERY => self.discovery.as_ref().unwrap().lock().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
||||||
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||||
_ => warn!("Unexpected stream update")
|
_ => warn!("Unexpected stream update")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,6 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
|
|||||||
let host = Arc::new(Host::new(config));
|
let host = Arc::new(Host::new(config));
|
||||||
let stats = host.stats().clone();
|
let stats = host.stats().clone();
|
||||||
let host_info = host.client_version();
|
let host_info = host.client_version();
|
||||||
info!("Node URL: {}", host.client_url());
|
|
||||||
try!(io_service.register_handler(host));
|
try!(io_service.register_handler(host));
|
||||||
Ok(NetworkService {
|
Ok(NetworkService {
|
||||||
io_service: io_service,
|
io_service: io_service,
|
||||||
|
Loading…
Reference in New Issue
Block a user