diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 6adca7b52..d9040113f 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -156,7 +156,7 @@ mod tests { fn it_can_be_started() { let spec = get_test_spec(); let temp_path = RandomTempPath::new(); - let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default())); + let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()), false); assert!(service.is_ok()); } } diff --git a/parity/io_handler.rs b/parity/io_handler.rs index 569a11716..2460cfec7 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -55,6 +55,7 @@ impl IoHandler for ClientIoHandler { NetworkIoMessage::User(SyncMessage::StartNetwork) => { info!("Starting network"); self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); + EthSync::register(&*self.network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e)); }, NetworkIoMessage::User(SyncMessage::StopNetwork) => { info!("Stopping network"); diff --git a/parity/main.rs b/parity/main.rs index 80ea29576..f600d4de0 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -79,7 +79,7 @@ use std::thread::sleep; use std::time::Duration; use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; -use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes}; +use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path}; use ethcore::error::{Error, ImportError}; @@ -208,7 +208,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) let network_settings = Arc::new(conf.network_settings()); // Sync - let sync = EthSync::register(service.network().deref(), sync_config, client.clone()); + let sync = EthSync::new(sync_config, client.clone()); + EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into())); let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies { signer_port: conf.signer_port(), diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 7e23601e4..8b6aa9d7c 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -357,7 +357,7 @@ impl ChainSync { trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); if io.is_expired() { - info!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); + trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); return Ok(()); } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index ca31ea020..cabc55ace 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -41,11 +41,13 @@ //! use ethcore::miner::Miner; //! //! fn main() { -//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); +//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap(); +//! service.start().unwrap(); //! let dir = env::temp_dir(); //! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap(); //! let miner = Miner::new(false, ethereum::new_frontier()); -//! EthSync::register(&mut service, SyncConfig::default(), client); +//! let sync = EthSync::new(SyncConfig::default(), client); +//! EthSync::register(&mut service, sync); //! } //! ``` @@ -69,7 +71,7 @@ use ethcore::client::Client; use ethcore::service::{SyncMessage, NetSyncMessage}; use io::NetSyncIo; use util::io::IoChannel; -use util::NetworkIoMessage; +use util::{NetworkIoMessage, NetworkError}; use chain::ChainSync; mod chain; @@ -120,17 +122,21 @@ pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &NetworkService, config: SyncConfig, chain: Arc) -> Arc { + pub fn new(config: SyncConfig, chain: Arc) -> Arc { let sync = ChainSync::new(config, chain.deref()); let sync = Arc::new(EthSync { chain: chain, sync: RwLock::new(sync), io_channel: RwLock::new(IoChannel::disconnected()), }); - service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync } + /// Register protocol with the network service + pub fn register(service: &NetworkService, sync: Arc) -> Result<(), NetworkError> { + service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]) + } + /// Stop sync pub fn stop(&mut self, io: &mut NetworkContext) { self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index a9163b52e..6496a43d5 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -43,6 +43,10 @@ impl<'p> SyncIo for TestIo<'p> { fn disconnect_peer(&mut self, _peer_id: PeerId) { } + fn is_expired(&self) -> bool { + false + } + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { self.queue.push_back(TestPacket { data: data, diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 93805a46a..3229c95d5 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -142,7 +142,7 @@ mod tests { #[test] fn test_service_register_handler () { - let mut service = IoService::::start().expect("Error creating network service"); + let service = IoService::::start().expect("Error creating network service"); service.register_handler(Arc::new(MyHandler)).unwrap(); } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index b924bbe44..65be55540 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -17,7 +17,6 @@ use std::sync::*; use std::thread::{self, JoinHandle}; use std::collections::HashMap; -use std::ops::Deref; use mio::*; use crossbeam::sync::chase_lev; use slab::Slab; @@ -37,13 +36,6 @@ pub type HandlerId = usize; pub const TOKENS_PER_HANDLER: usize = 16384; const MAX_HANDLERS: usize = 8; -fn compare_arcs(a: Arc, b: Arc) -> bool { - let p1 = &*a as *const T; - let p2 = &*b as *const T; - info!("{:p} == {:p} : {}", p1, p2 , p1 == p2); - p1 == p2 -} - /// Messages used to communicate with the event loop from other threads. #[derive(Clone)] pub enum IoMessage where Message: Send + Clone + Sized { @@ -214,30 +206,31 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: EventSet) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone(); - - if events.is_hup() { - self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); - } - else { - if events.is_readable() { - self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + if let Some(handler) = self.handlers.get(handler_index) { + if events.is_hup() { + self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); } - if events.is_writable() { - self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + else { + if events.is_readable() { + self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + } + if events.is_writable() { + self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + } } + self.work_ready.notify_all(); } - self.work_ready.notify_all(); } fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone(); - if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) { - event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); - self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index }); - self.work_ready.notify_all(); + if let Some(handler) = self.handlers.get(handler_index) { + if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) { + event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); + self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); + self.work_ready.notify_all(); + } } } @@ -254,7 +247,6 @@ impl Handler for IoManager where Message: Send + Clone + Sync IoMessage::RemoveHandler { handler_id } => { // TODO: flush event loop self.handlers.remove(handler_id); - info!("{} left", self.handlers.count()); }, IoMessage::AddTimer { handler_id, token, delay } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; @@ -268,21 +260,24 @@ impl Handler for IoManager where Message: Send + Clone + Sync } }, IoMessage::RegisterStream { handler_id, token } => { - let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); - handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + if let Some(handler) = self.handlers.get(handler_id) { + handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + } }, IoMessage::DeregisterStream { handler_id, token } => { - let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); - handler.deregister_stream(token, event_loop); - // unregister a timer associated with the token (if any) - let timer_id = token + handler_id * TOKENS_PER_HANDLER; - if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) { - event_loop.clear_timeout(timer.timeout); + if let Some(handler) = self.handlers.get(handler_id) { + handler.deregister_stream(token, event_loop); + // unregister a timer associated with the token (if any) + let timer_id = token + handler_id * TOKENS_PER_HANDLER; + if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) { + event_loop.clear_timeout(timer.timeout); + } } }, IoMessage::UpdateStreamRegistration { handler_id, token } => { - let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); - handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + if let Some(handler) = self.handlers.get(handler_id) { + handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + } }, IoMessage::UserMessage(data) => { //TODO: better way to iterate the slab diff --git a/util/src/network/host.rs b/util/src/network/host.rs index e6e76c472..161fcfe3f 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -333,6 +333,7 @@ pub struct Host where Message: Send + Sync + Clone { stats: Arc, pinned_nodes: Vec, num_sessions: AtomicUsize, + stopping: AtomicBool, } impl Host where Message: Send + Sync + Clone { @@ -384,6 +385,7 @@ impl Host where Message: Send + Sync + Clone { stats: stats, pinned_nodes: Vec::new(), num_sessions: AtomicUsize::new(0), + stopping: AtomicBool::new(false), }; let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); @@ -422,19 +424,18 @@ impl Host where Message: Send + Sync + Clone { } pub fn stop(&self, io: &IoContext>) -> Result<(), UtilError> { + self.stopping.store(true, AtomicOrdering::Release); let mut to_kill = Vec::new(); for e in self.sessions.write().unwrap().iter_mut() { let mut s = e.lock().unwrap(); - if !s.keep_alive(io) { - s.disconnect(io, DisconnectReason::PingTimeout); - to_kill.push(s.token()); - } + s.disconnect(io, DisconnectReason::ClientQuit); + to_kill.push(s.token()); } for p in to_kill { - trace!(target: "network", "Ping timeout: {}", p); + trace!(target: "network", "Disconnecting on shutdown: {}", p); self.kill_connection(p, io, true); } - io.unregister_handler(); + try!(io.unregister_handler()); Ok(()) } @@ -790,13 +791,6 @@ impl Host where Message: Send + Sync + Clone { } } - -impl Drop for Host where Message: Send + Sync + Clone { - fn drop(&mut self) { - info!("Dropping host"); - } -} - impl IoHandler> for Host where Message: Send + Sync + Clone + 'static { /// Initialize networking fn initialize(&self, io: &IoContext>) { @@ -814,6 +808,9 @@ impl IoHandler> for Host where Messa } fn stream_readable(&self, io: &IoContext>, stream: StreamToken) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), DISCOVERY => { @@ -829,6 +826,9 @@ impl IoHandler> for Host where Messa } fn stream_writable(&self, io: &IoContext>, stream: StreamToken) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), DISCOVERY => { @@ -840,6 +840,9 @@ impl IoHandler> for Host where Messa } fn timeout(&self, io: &IoContext>, token: TimerToken) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match token { IDLE => self.maintain_network(io), INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e| @@ -870,6 +873,9 @@ impl IoHandler> for Host where Messa } fn message(&self, io: &IoContext>, message: &NetworkIoMessage) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match *message { NetworkIoMessage::AddHandler { ref handler, @@ -1031,6 +1037,6 @@ fn host_client_url() { let mut config = NetworkConfiguration::new(); let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2"); config.use_secret = Some(key); - let host: Host = Host::new(config).unwrap(); + let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap(); assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 29f3d166c..d074e6631 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -56,8 +56,9 @@ //! } //! //! fn main () { -//! let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); +//! let mut service = NetworkService::::new(NetworkConfiguration::new_local()).expect("Error creating network service"); //! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); +//! service.start().expect("Error starting service"); //! //! // Wait for quit condition //! // ... diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 406126425..36f05d283 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -107,7 +107,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat if let Some(ref host) = *host { info!("Unregistering handler"); let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host - host.stop(&io); + try!(host.stop(&io)); } *host = None; Ok(()) diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index b43da9320..861edc144 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -97,7 +97,8 @@ impl NetworkProtocolHandler for TestProtocol { #[test] fn net_service() { - let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); + let service = NetworkService::::new(NetworkConfiguration::new_local()).expect("Error creating network service"); + service.start().unwrap(); service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap(); } @@ -108,12 +109,14 @@ fn net_connect() { let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::::start(config1).unwrap(); + let mut service1 = NetworkService::::new(config1).unwrap(); + service1.start().unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let mut config2 = NetworkConfiguration::new_local(); - info!("net_connect: local URL: {}", service1.local_url()); - config2.boot_nodes = vec![ service1.local_url() ]; - let mut service2 = NetworkService::::start(config2).unwrap(); + info!("net_connect: local URL: {}", service1.local_url().unwrap()); + config2.boot_nodes = vec![ service1.local_url().unwrap() ]; + let mut service2 = NetworkService::::new(config2).unwrap(); + service2.start().unwrap(); let handler2 = TestProtocol::register(&mut service2, false); while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) { thread::sleep(Duration::from_millis(50)); @@ -122,17 +125,28 @@ fn net_connect() { assert!(service2.stats().sessions() >= 1); } +#[test] +fn net_start_stop() { + let config = NetworkConfiguration::new_local(); + let service = NetworkService::::new(config).unwrap(); + service.start().unwrap(); + service.stop().unwrap(); + service.start().unwrap(); +} + #[test] fn net_disconnect() { let key1 = KeyPair::create().unwrap(); let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::::start(config1).unwrap(); + let mut service1 = NetworkService::::new(config1).unwrap(); + service1.start().unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let mut config2 = NetworkConfiguration::new_local(); - config2.boot_nodes = vec![ service1.local_url() ]; - let mut service2 = NetworkService::::start(config2).unwrap(); + config2.boot_nodes = vec![ service1.local_url().unwrap() ]; + let mut service2 = NetworkService::::new(config2).unwrap(); + service2.start().unwrap(); let handler2 = TestProtocol::register(&mut service2, true); while !(handler1.got_disconnect() && handler2.got_disconnect()) { thread::sleep(Duration::from_millis(50)); @@ -144,7 +158,8 @@ fn net_disconnect() { #[test] fn net_timeout() { let config = NetworkConfiguration::new_local(); - let mut service = NetworkService::::start(config).unwrap(); + let mut service = NetworkService::::new(config).unwrap(); + service.start().unwrap(); let handler = TestProtocol::register(&mut service, false); while !handler.got_timeout() { thread::sleep(Duration::from_millis(50));