diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 13f1c9f74..6adca7b52 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -41,6 +41,10 @@ pub enum SyncMessage { NewChainHead, /// A block is ready BlockVerified, + /// Start network command. + StartNetwork, + /// Stop network command. + StopNetwork, } /// IO Message type used for Network service @@ -48,17 +52,20 @@ pub type NetSyncMessage = NetworkIoMessage; /// Client service setup. Creates and registers client and network services with the IO subsystem. pub struct ClientService { - net_service: NetworkService, + net_service: Arc>, client: Arc, panic_handler: Arc } impl ClientService { /// Start the service in a separate thread. - pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc) -> Result { + pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc, enable_network: bool) -> Result { let panic_handler = PanicHandler::new_in_arc(); - let mut net_service = try!(NetworkService::start(net_config)); + let net_service = try!(NetworkService::new(net_config)); panic_handler.forward_from(&net_service); + if enable_network { + try!(net_service.start()); + } info!("Starting {}", net_service.host_info()); info!("Configured for {} using {:?} engine", spec.name, spec.engine.name()); @@ -70,7 +77,7 @@ impl ClientService { try!(net_service.io().register_handler(client_io)); Ok(ClientService { - net_service: net_service, + net_service: Arc::new(net_service), client: client, panic_handler: panic_handler, }) @@ -82,8 +89,8 @@ impl ClientService { } /// Get general IO interface - pub fn io(&mut self) -> &mut IoService { - self.net_service.io() + pub fn register_io_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { + self.net_service.io().register_handler(handler) } /// Get client interface @@ -92,8 +99,8 @@ impl ClientService { } /// Get network service component - pub fn network(&mut self) -> &mut NetworkService { - &mut self.net_service + pub fn network(&mut self) -> Arc> { + self.net_service.clone() } } diff --git a/parity/cli.rs b/parity/cli.rs index b8aa88299..7e2a1d6a0 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -52,6 +52,7 @@ Account Options: --no-import-keys Do not import keys from legacy clients. Networking Options: + --no-network Disable p2p networking. --port PORT Override the port on which the node should listen [default: 30303]. --peers NUM Try to maintain that many peers [default: 25]. @@ -266,6 +267,7 @@ pub struct Args { pub flag_format: Option, pub flag_jitvm: bool, pub flag_no_color: bool, + pub flag_no_network: bool, // legacy... pub flag_geth: bool, pub flag_nodekey: Option, diff --git a/parity/io_handler.rs b/parity/io_handler.rs index d130d584c..569a11716 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -16,10 +16,10 @@ use std::sync::Arc; use ethcore::client::Client; -use ethcore::service::NetSyncMessage; +use ethcore::service::{NetSyncMessage, SyncMessage}; use ethsync::EthSync; use util::keys::store::AccountService; -use util::{TimerToken, IoHandler, IoContext}; +use util::{TimerToken, IoHandler, IoContext, NetworkService, NetworkIoMessage}; use informant::Informant; @@ -33,6 +33,7 @@ pub struct ClientIoHandler { pub sync: Arc, pub accounts: Arc, pub info: Informant, + pub network: Arc>, } impl IoHandler for ClientIoHandler { @@ -48,6 +49,20 @@ impl IoHandler for ClientIoHandler { _ => {} } } + + fn message(&self, _io: &IoContext, message: &NetSyncMessage) { + match *message { + NetworkIoMessage::User(SyncMessage::StartNetwork) => { + info!("Starting network"); + self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); + }, + NetworkIoMessage::User(SyncMessage::StopNetwork) => { + info!("Stopping network"); + self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); + }, + _ => {/* Ignore other messages */}, + } + } } diff --git a/parity/main.rs b/parity/main.rs index 284dac673..80ea29576 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -198,7 +198,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) // Build client let mut service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), miner.clone() + client_config, spec, net_settings, Path::new(&conf.path()), miner.clone(), !conf.args.flag_no_network ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -208,7 +208,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) let network_settings = Arc::new(conf.network_settings()); // Sync - let sync = EthSync::register(service.network(), sync_config, client.clone()); + let sync = EthSync::register(service.network().deref(), sync_config, client.clone()); let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies { signer_port: conf.signer_port(), @@ -269,8 +269,9 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) info: Informant::new(conf.have_color()), sync: sync.clone(), accounts: account_service.clone(), + network: service.network(), }); - service.io().register_handler(io_handler).expect("Error registering IO handler"); + service.register_io_handler(io_handler).expect("Error registering IO handler"); // Handle exit wait_for_exit(panic_handler, rpc_server, dapps_server, signer_server); @@ -309,7 +310,7 @@ fn execute_export(conf: Configuration) { // Build client let service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), + client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -380,7 +381,7 @@ fn execute_import(conf: Configuration) { // Build client let service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), + client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); diff --git a/rpc/src/v1/impls/net.rs b/rpc/src/v1/impls/net.rs index fb502e6cf..977561ede 100644 --- a/rpc/src/v1/impls/net.rs +++ b/rpc/src/v1/impls/net.rs @@ -47,4 +47,14 @@ impl Net for NetClient where S: SyncProvider + 'static { // right now (11 march 2016), we are always listening for incoming connections Ok(Value::Bool(true)) } + + fn start_network(&self, _: Params) -> Result { + take_weak!(self.sync).start_network(); + Ok(Value::Bool(true)) + } + + fn stop_network(&self, _: Params) -> Result { + take_weak!(self.sync).stop_network(); + Ok(Value::Bool(true)) + } } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 114b5b08f..11fa1cb9e 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -59,5 +59,11 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.read().unwrap().clone() } + + fn start_network(&self) { + } + + fn stop_network(&self) { + } } diff --git a/rpc/src/v1/traits/net.rs b/rpc/src/v1/traits/net.rs index 56fba3e32..b65f53491 100644 --- a/rpc/src/v1/traits/net.rs +++ b/rpc/src/v1/traits/net.rs @@ -30,12 +30,20 @@ pub trait Net: Sized + Send + Sync + 'static { /// Otherwise false. fn is_listening(&self, _: Params) -> Result; + /// Start the network. + fn start_network(&self, _: Params) -> Result; + + /// Stop the network. + fn stop_network(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); delegate.add_method("net_version", Net::version); delegate.add_method("net_peerCount", Net::peer_count); delegate.add_method("net_listening", Net::is_listening); + delegate.add_method("net_start", Net::start_network); + delegate.add_method("net_stop", Net::stop_network); delegate } } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 9c7da71ab..ca31ea020 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -66,8 +66,10 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer use util::TimerToken; use util::{U256, ONE_U256}; use ethcore::client::Client; -use ethcore::service::SyncMessage; +use ethcore::service::{SyncMessage, NetSyncMessage}; use io::NetSyncIo; +use util::io::IoChannel; +use util::NetworkIoMessage; use chain::ChainSync; mod chain; @@ -98,6 +100,10 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; + /// Start the network + fn start_network(&self); + /// Stop the network + fn stop_network(&self); } /// Ethereum network protocol handler @@ -105,18 +111,21 @@ pub struct EthSync { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint chain: Arc, /// Sync strategy - sync: RwLock + sync: RwLock, + /// IO communication chnnel. + io_channel: RwLock>, } pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc) -> Arc { + pub fn register(service: &NetworkService, config: SyncConfig, chain: Arc) -> Arc { let sync = ChainSync::new(config, chain.deref()); let sync = Arc::new(EthSync { chain: chain, sync: RwLock::new(sync), + io_channel: RwLock::new(IoChannel::disconnected()), }); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync @@ -138,11 +147,20 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.sync.read().unwrap().status() } + + fn start_network(&self) { + self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification"); + } + + fn stop_network(&self) { + self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification"); + } } impl NetworkProtocolHandler for EthSync { fn initialize(&self, io: &NetworkContext) { io.register_timer(0, 1000).expect("Error registering sync timer"); + *self.io_channel.write().unwrap() = io.io_channel(); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 409667c46..b924bbe44 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -17,10 +17,12 @@ use std::sync::*; use std::thread::{self, JoinHandle}; use std::collections::HashMap; +use std::ops::Deref; use mio::*; +use crossbeam::sync::chase_lev; +use slab::Slab; use error::*; use io::{IoError, IoHandler}; -use crossbeam::sync::chase_lev; use io::worker::{Worker, Work, WorkType}; use panics::*; @@ -33,6 +35,14 @@ pub type HandlerId = usize; /// Maximum number of tokens a handler can use pub const TOKENS_PER_HANDLER: usize = 16384; +const MAX_HANDLERS: usize = 8; + +fn compare_arcs(a: Arc, b: Arc) -> bool { + let p1 = &*a as *const T; + let p2 = &*b as *const T; + info!("{:p} == {:p} : {}", p1, p2 , p1 == p2); + p1 == p2 +} /// Messages used to communicate with the event loop from other threads. #[derive(Clone)] @@ -43,6 +53,9 @@ pub enum IoMessage where Message: Send + Clone + Sized { AddHandler { handler: Arc+Send>, }, + RemoveHandler { + handler_id: HandlerId, + }, AddTimer { handler_id: HandlerId, token: TimerToken, @@ -138,6 +151,15 @@ impl IoContext where Message: Send + Clone + 'static { pub fn channel(&self) -> IoChannel { self.channel.clone() } + + /// Unregister current IO handler. + pub fn unregister_handler(&self) -> Result<(), IoError> { + try!(self.channel.send_io(IoMessage::RemoveHandler { + handler_id: self.handler, + })); + Ok(()) + } + } #[derive(Clone)] @@ -149,7 +171,7 @@ struct UserTimer { /// Root IO handler. Manages user handlers, messages and IO timers. pub struct IoManager where Message: Send + Sync { timers: Arc>>, - handlers: Vec>>, + handlers: Slab>, HandlerId>, workers: Vec, worker_channel: chase_lev::Worker>, work_ready: Arc, @@ -175,7 +197,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), - handlers: Vec::new(), + handlers: Slab::new(MAX_HANDLERS), worker_channel: worker, workers: workers, work_ready: work_ready, @@ -192,10 +214,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: EventSet) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if handler_index >= self.handlers.len() { - panic!("Unexpected stream token: {}", token.as_usize()); - } - let handler = self.handlers[handler_index].clone(); + let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone(); if events.is_hup() { self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); @@ -214,12 +233,9 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if handler_index >= self.handlers.len() { - panic!("Unexpected timer token: {}", token.as_usize()); - } + let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone(); if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) { event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); - let handler = self.handlers[handler_index].clone(); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index }); self.work_ready.notify_all(); } @@ -232,12 +248,14 @@ impl Handler for IoManager where Message: Send + Clone + Sync event_loop.shutdown(); }, IoMessage::AddHandler { handler } => { - let handler_id = { - self.handlers.push(handler.clone()); - self.handlers.len() - 1 - }; + let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id)); }, + IoMessage::RemoveHandler { handler_id } => { + // TODO: flush event loop + self.handlers.remove(handler_id); + info!("{} left", self.handlers.count()); + }, IoMessage::AddTimer { handler_id, token, delay } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer"); @@ -267,9 +285,12 @@ impl Handler for IoManager where Message: Send + Clone + Sync handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); }, IoMessage::UserMessage(data) => { - for n in 0 .. self.handlers.len() { - let handler = self.handlers[n].clone(); - self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: n }); + //TODO: better way to iterate the slab + for id in 0 .. MAX_HANDLERS { + if let Some(h) = self.handlers.get(id) { + let handler = h.clone(); + self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id }); + } } self.work_ready.notify_all(); } @@ -351,8 +372,8 @@ impl IoService where Message: Send + Sync + Clone + 'static { }) } - /// Regiter a IO hadnler with the event loop. - pub fn register_handler(&mut self, handler: Arc+Send>) -> Result<(), IoError> { + /// Regiter an IO handler with the event loop. + pub fn register_handler(&self, handler: Arc+Send>) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::AddHandler { handler: handler, })); @@ -360,13 +381,13 @@ impl IoService where Message: Send + Sync + Clone + 'static { } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. - pub fn send_message(&mut self, message: Message) -> Result<(), IoError> { + pub fn send_message(&self, message: Message) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::UserMessage(message))); Ok(()) } /// Create a new message channel - pub fn channel(&mut self) -> IoChannel { + pub fn channel(&self) -> IoChannel { IoChannel { channel: Some(self.host_channel.clone()) } } } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index abace1983..6001769fe 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -49,7 +49,7 @@ const MAX_HANDSHAKES: usize = 80; const MAX_HANDSHAKES_PER_ROUND: usize = 32; const MAINTENANCE_TIMEOUT: u64 = 1000; -#[derive(Debug)] +#[derive(Debug, Clone)] /// Network service configuration pub struct NetworkConfiguration { /// Directory path to store network configuration. None means nothing will be saved @@ -233,6 +233,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } + /// Send an IO message + pub fn io_channel(&self) -> IoChannel> { + self.io.channel() + } + /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left @@ -327,7 +332,7 @@ pub struct Host where Message: Send + Sync + Clone { impl Host where Message: Send + Sync + Clone { /// Create a new instance - pub fn new(config: NetworkConfiguration) -> Result, UtilError> { + pub fn new(config: NetworkConfiguration, stats: Arc) -> Result, UtilError> { let mut listen_address = match config.listen_address { None => SocketAddr::from_str("0.0.0.0:30304").unwrap(), Some(addr) => addr, @@ -371,7 +376,7 @@ impl Host where Message: Send + Sync + Clone { handlers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(USER_TIMER), - stats: Arc::new(NetworkStats::default()), + stats: stats, pinned_nodes: Vec::new(), num_sessions: AtomicUsize::new(0), }; @@ -383,10 +388,6 @@ impl Host where Message: Send + Sync + Clone { Ok(host) } - pub fn stats(&self) -> Arc { - self.stats.clone() - } - pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, @@ -401,8 +402,8 @@ impl Host where Message: Send + Sync + Clone { } } - pub fn client_version(&self) -> String { - self.info.read().unwrap().client_version.clone() + pub fn client_version() -> String { + version() } pub fn external_url(&self) -> Option { @@ -415,6 +416,23 @@ impl Host where Message: Send + Sync + Clone { r } + pub fn stop(&self, io: &IoContext>) -> Result<(), UtilError> { + let mut to_kill = Vec::new(); + for e in self.sessions.write().unwrap().iter_mut() { + let mut s = e.lock().unwrap(); + if !s.keep_alive(io) { + s.disconnect(io, DisconnectReason::PingTimeout); + to_kill.push(s.token()); + } + } + for p in to_kill { + trace!(target: "network", "Ping timeout: {}", p); + self.kill_connection(p, io, true); + } + io.unregister_handler(); + Ok(()) + } + fn init_public_interface(&self, io: &IoContext>) -> Result<(), UtilError> { io.clear_timer(INIT_PUBLIC).unwrap(); if self.info.read().unwrap().public_endpoint.is_some() { @@ -767,6 +785,13 @@ impl Host where Message: Send + Sync + Clone { } } + +impl Drop for Host where Message: Send + Sync + Clone { + fn drop(&mut self) { + info!("Dropping host"); + } +} + impl IoHandler> for Host where Message: Send + Sync + Clone + 'static { /// Initialize networking fn initialize(&self, io: &IoContext>) { @@ -831,8 +856,8 @@ impl IoHandler> for Host where Messa }, _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { - None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, - Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } + None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, + Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } }, None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us } diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 49957f7e7..406126425 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -28,33 +28,33 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, - host: Arc>, + host: RwLock>>>, stats: Arc, - panic_handler: Arc + panic_handler: Arc, + config: NetworkConfiguration, } impl NetworkService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop - pub fn start(config: NetworkConfiguration) -> Result, UtilError> { + pub fn new(config: NetworkConfiguration) -> Result, UtilError> { let panic_handler = PanicHandler::new_in_arc(); - let mut io_service = try!(IoService::>::start()); + let io_service = try!(IoService::>::start()); panic_handler.forward_from(&io_service); - let host = Arc::new(try!(Host::new(config))); - let stats = host.stats().clone(); - let host_info = host.client_version(); - try!(io_service.register_handler(host.clone())); + let stats = Arc::new(NetworkStats::new()); + let host_info = Host::::client_version(); Ok(NetworkService { io_service: io_service, host_info: host_info, stats: stats, panic_handler: panic_handler, - host: host, + host: RwLock::new(None), + config: config, }) } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&mut self, handler: Arc+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + pub fn register_protocol(&self, handler: Arc+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { try!(self.io_service.send_message(NetworkIoMessage::AddHandler { handler: handler, protocol: protocol, @@ -69,8 +69,8 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } /// Returns underlying io service. - pub fn io(&mut self) -> &mut IoService> { - &mut self.io_service + pub fn io(&self) -> &IoService> { + &self.io_service } /// Returns network statistics. @@ -80,12 +80,37 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Returns external url if available. pub fn external_url(&self) -> Option { - self.host.external_url() + let host = self.host.read().unwrap(); + host.as_ref().and_then(|h| h.external_url()) } /// Returns external url if available. - pub fn local_url(&self) -> String { - self.host.local_url() + pub fn local_url(&self) -> Option { + let host = self.host.read().unwrap(); + host.as_ref().map(|h| h.local_url()) + } + + /// Start network IO + pub fn start(&self) -> Result<(), UtilError> { + let mut host = self.host.write().unwrap(); + if host.is_none() { + let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone()))); + try!(self.io_service.register_handler(h.clone())); + *host = Some(h); + } + Ok(()) + } + + /// Stop network IO + pub fn stop(&self) -> Result<(), UtilError> { + let mut host = self.host.write().unwrap(); + if let Some(ref host) = *host { + info!("Unregistering handler"); + let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host + host.stop(&io); + } + *host = None; + Ok(()) } } diff --git a/util/src/network/stats.rs b/util/src/network/stats.rs index 30e751a81..cc3b845da 100644 --- a/util/src/network/stats.rs +++ b/util/src/network/stats.rs @@ -65,7 +65,7 @@ impl NetworkStats { self.sessions.load(Ordering::Relaxed) } - #[cfg(test)] + /// Create a new empty instance. pub fn new() -> NetworkStats { NetworkStats { recv: AtomicUsize::new(0),