diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 13f1c9f74..d9040113f 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -41,6 +41,10 @@ pub enum SyncMessage { NewChainHead, /// A block is ready BlockVerified, + /// Start network command. + StartNetwork, + /// Stop network command. + StopNetwork, } /// IO Message type used for Network service @@ -48,17 +52,20 @@ pub type NetSyncMessage = NetworkIoMessage; /// Client service setup. Creates and registers client and network services with the IO subsystem. pub struct ClientService { - net_service: NetworkService, + net_service: Arc>, client: Arc, panic_handler: Arc } impl ClientService { /// Start the service in a separate thread. - pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc) -> Result { + pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc, enable_network: bool) -> Result { let panic_handler = PanicHandler::new_in_arc(); - let mut net_service = try!(NetworkService::start(net_config)); + let net_service = try!(NetworkService::new(net_config)); panic_handler.forward_from(&net_service); + if enable_network { + try!(net_service.start()); + } info!("Starting {}", net_service.host_info()); info!("Configured for {} using {:?} engine", spec.name, spec.engine.name()); @@ -70,7 +77,7 @@ impl ClientService { try!(net_service.io().register_handler(client_io)); Ok(ClientService { - net_service: net_service, + net_service: Arc::new(net_service), client: client, panic_handler: panic_handler, }) @@ -82,8 +89,8 @@ impl ClientService { } /// Get general IO interface - pub fn io(&mut self) -> &mut IoService { - self.net_service.io() + pub fn register_io_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { + self.net_service.io().register_handler(handler) } /// Get client interface @@ -92,8 +99,8 @@ impl ClientService { } /// Get network service component - pub fn network(&mut self) -> &mut NetworkService { - &mut self.net_service + pub fn network(&mut self) -> Arc> { + self.net_service.clone() } } @@ -149,7 +156,7 @@ mod tests { fn it_can_be_started() { let spec = get_test_spec(); let temp_path = RandomTempPath::new(); - let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default())); + let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()), false); assert!(service.is_ok()); } } diff --git a/parity/cli.rs b/parity/cli.rs index ba3cfc21a..954df7640 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -53,6 +53,7 @@ Account Options: --no-import-keys Do not import keys from legacy clients. Networking Options: + --no-network Disable p2p networking. --port PORT Override the port on which the node should listen [default: 30303]. --peers NUM Try to maintain that many peers [default: 25]. @@ -268,6 +269,7 @@ pub struct Args { pub flag_format: Option, pub flag_jitvm: bool, pub flag_no_color: bool, + pub flag_no_network: bool, // legacy... pub flag_geth: bool, pub flag_nodekey: Option, diff --git a/parity/io_handler.rs b/parity/io_handler.rs index d130d584c..2460cfec7 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -16,10 +16,10 @@ use std::sync::Arc; use ethcore::client::Client; -use ethcore::service::NetSyncMessage; +use ethcore::service::{NetSyncMessage, SyncMessage}; use ethsync::EthSync; use util::keys::store::AccountService; -use util::{TimerToken, IoHandler, IoContext}; +use util::{TimerToken, IoHandler, IoContext, NetworkService, NetworkIoMessage}; use informant::Informant; @@ -33,6 +33,7 @@ pub struct ClientIoHandler { pub sync: Arc, pub accounts: Arc, pub info: Informant, + pub network: Arc>, } impl IoHandler for ClientIoHandler { @@ -48,6 +49,21 @@ impl IoHandler for ClientIoHandler { _ => {} } } + + fn message(&self, _io: &IoContext, message: &NetSyncMessage) { + match *message { + NetworkIoMessage::User(SyncMessage::StartNetwork) => { + info!("Starting network"); + self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); + EthSync::register(&*self.network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e)); + }, + NetworkIoMessage::User(SyncMessage::StopNetwork) => { + info!("Stopping network"); + self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); + }, + _ => {/* Ignore other messages */}, + } + } } diff --git a/parity/main.rs b/parity/main.rs index 8b99f463b..973b1174f 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -80,7 +80,7 @@ use std::thread::sleep; use std::time::Duration; use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; -use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes}; +use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path}; use ethcore::error::{Error, ImportError}; @@ -199,7 +199,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) // Build client let mut service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), miner.clone() + client_config, spec, net_settings, Path::new(&conf.path()), miner.clone(), !conf.args.flag_no_network ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -209,7 +209,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) let network_settings = Arc::new(conf.network_settings()); // Sync - let sync = EthSync::register(service.network(), sync_config, client.clone()); + let sync = EthSync::new(sync_config, client.clone()); + EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into())); let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies { signer_port: conf.signer_port(), @@ -270,8 +271,9 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) info: Informant::new(conf.have_color()), sync: sync.clone(), accounts: account_service.clone(), + network: service.network(), }); - service.io().register_handler(io_handler).expect("Error registering IO handler"); + service.register_io_handler(io_handler).expect("Error registering IO handler"); if conf.args.cmd_ui { url::open("http://localhost:8080/") @@ -314,7 +316,7 @@ fn execute_export(conf: Configuration) { // Build client let service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), + client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -385,7 +387,7 @@ fn execute_import(conf: Configuration) { // Build client let service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), + client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); diff --git a/rpc/src/v1/impls/net.rs b/rpc/src/v1/impls/net.rs index fb502e6cf..977561ede 100644 --- a/rpc/src/v1/impls/net.rs +++ b/rpc/src/v1/impls/net.rs @@ -47,4 +47,14 @@ impl Net for NetClient where S: SyncProvider + 'static { // right now (11 march 2016), we are always listening for incoming connections Ok(Value::Bool(true)) } + + fn start_network(&self, _: Params) -> Result { + take_weak!(self.sync).start_network(); + Ok(Value::Bool(true)) + } + + fn stop_network(&self, _: Params) -> Result { + take_weak!(self.sync).stop_network(); + Ok(Value::Bool(true)) + } } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 114b5b08f..11fa1cb9e 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -59,5 +59,11 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.read().unwrap().clone() } + + fn start_network(&self) { + } + + fn stop_network(&self) { + } } diff --git a/rpc/src/v1/traits/net.rs b/rpc/src/v1/traits/net.rs index 56fba3e32..2df8c8f9c 100644 --- a/rpc/src/v1/traits/net.rs +++ b/rpc/src/v1/traits/net.rs @@ -30,6 +30,12 @@ pub trait Net: Sized + Send + Sync + 'static { /// Otherwise false. fn is_listening(&self, _: Params) -> Result; + /// Start the network. + fn start_network(&self, _: Params) -> Result; + + /// Stop the network. + fn stop_network(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ae58c4513..25f1eb6a3 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -356,6 +356,10 @@ impl ChainSync { }; trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); + if io.is_expired() { + trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); + return Ok(()); + } if self.peers.contains_key(&peer_id) { warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); diff --git a/sync/src/io.rs b/sync/src/io.rs index 53a546e1c..5d4b32464 100644 --- a/sync/src/io.rs +++ b/sync/src/io.rs @@ -41,6 +41,8 @@ pub trait SyncIo { fn is_chain_queue_empty(&self) -> bool { self.chain().queue_info().is_empty() } + /// Check if the session is expired + fn is_expired(&self) -> bool; } /// Wraps `NetworkContext` and the blockchain client @@ -83,6 +85,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { fn peer_info(&self, peer_id: PeerId) -> String { self.network.peer_info(peer_id) } + + fn is_expired(&self) -> bool { + self.network.is_expired() + } } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 9c7da71ab..cabc55ace 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -41,11 +41,13 @@ //! use ethcore::miner::Miner; //! //! fn main() { -//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); +//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap(); +//! service.start().unwrap(); //! let dir = env::temp_dir(); //! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap(); //! let miner = Miner::new(false, ethereum::new_frontier()); -//! EthSync::register(&mut service, SyncConfig::default(), client); +//! let sync = EthSync::new(SyncConfig::default(), client); +//! EthSync::register(&mut service, sync); //! } //! ``` @@ -66,8 +68,10 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer use util::TimerToken; use util::{U256, ONE_U256}; use ethcore::client::Client; -use ethcore::service::SyncMessage; +use ethcore::service::{SyncMessage, NetSyncMessage}; use io::NetSyncIo; +use util::io::IoChannel; +use util::{NetworkIoMessage, NetworkError}; use chain::ChainSync; mod chain; @@ -98,6 +102,10 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; + /// Start the network + fn start_network(&self); + /// Stop the network + fn stop_network(&self); } /// Ethereum network protocol handler @@ -105,23 +113,30 @@ pub struct EthSync { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint chain: Arc, /// Sync strategy - sync: RwLock + sync: RwLock, + /// IO communication chnnel. + io_channel: RwLock>, } pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc) -> Arc { + pub fn new(config: SyncConfig, chain: Arc) -> Arc { let sync = ChainSync::new(config, chain.deref()); let sync = Arc::new(EthSync { chain: chain, sync: RwLock::new(sync), + io_channel: RwLock::new(IoChannel::disconnected()), }); - service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync } + /// Register protocol with the network service + pub fn register(service: &NetworkService, sync: Arc) -> Result<(), NetworkError> { + service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]) + } + /// Stop sync pub fn stop(&mut self, io: &mut NetworkContext) { self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); @@ -138,11 +153,20 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.sync.read().unwrap().status() } + + fn start_network(&self) { + self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification"); + } + + fn stop_network(&self) { + self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification"); + } } impl NetworkProtocolHandler for EthSync { fn initialize(&self, io: &NetworkContext) { io.register_timer(0, 1000).expect("Error registering sync timer"); + *self.io_channel.write().unwrap() = io.io_channel(); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 09e83e358..7c7d70dde 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -192,3 +192,13 @@ fn restart_on_broken_chain() { assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); } + +#[test] +fn high_td_attach() { + let mut net = TestNet::new(2); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(1).chain.corrupt_block_parent(6); + net.sync_steps(20); + + assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5); +} diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index a9163b52e..6496a43d5 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -43,6 +43,10 @@ impl<'p> SyncIo for TestIo<'p> { fn disconnect_peer(&mut self, _peer_id: PeerId) { } + fn is_expired(&self) -> bool { + false + } + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { self.queue.push_back(TestPacket { data: data, diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 93805a46a..3229c95d5 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -142,7 +142,7 @@ mod tests { #[test] fn test_service_register_handler () { - let mut service = IoService::::start().expect("Error creating network service"); + let service = IoService::::start().expect("Error creating network service"); service.register_handler(Arc::new(MyHandler)).unwrap(); } diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 409667c46..65be55540 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -18,9 +18,10 @@ use std::sync::*; use std::thread::{self, JoinHandle}; use std::collections::HashMap; use mio::*; +use crossbeam::sync::chase_lev; +use slab::Slab; use error::*; use io::{IoError, IoHandler}; -use crossbeam::sync::chase_lev; use io::worker::{Worker, Work, WorkType}; use panics::*; @@ -33,6 +34,7 @@ pub type HandlerId = usize; /// Maximum number of tokens a handler can use pub const TOKENS_PER_HANDLER: usize = 16384; +const MAX_HANDLERS: usize = 8; /// Messages used to communicate with the event loop from other threads. #[derive(Clone)] @@ -43,6 +45,9 @@ pub enum IoMessage where Message: Send + Clone + Sized { AddHandler { handler: Arc+Send>, }, + RemoveHandler { + handler_id: HandlerId, + }, AddTimer { handler_id: HandlerId, token: TimerToken, @@ -138,6 +143,15 @@ impl IoContext where Message: Send + Clone + 'static { pub fn channel(&self) -> IoChannel { self.channel.clone() } + + /// Unregister current IO handler. + pub fn unregister_handler(&self) -> Result<(), IoError> { + try!(self.channel.send_io(IoMessage::RemoveHandler { + handler_id: self.handler, + })); + Ok(()) + } + } #[derive(Clone)] @@ -149,7 +163,7 @@ struct UserTimer { /// Root IO handler. Manages user handlers, messages and IO timers. pub struct IoManager where Message: Send + Sync { timers: Arc>>, - handlers: Vec>>, + handlers: Slab>, HandlerId>, workers: Vec, worker_channel: chase_lev::Worker>, work_ready: Arc, @@ -175,7 +189,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), - handlers: Vec::new(), + handlers: Slab::new(MAX_HANDLERS), worker_channel: worker, workers: workers, work_ready: work_ready, @@ -192,36 +206,31 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: EventSet) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if handler_index >= self.handlers.len() { - panic!("Unexpected stream token: {}", token.as_usize()); - } - let handler = self.handlers[handler_index].clone(); - - if events.is_hup() { - self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); - } - else { - if events.is_readable() { - self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + if let Some(handler) = self.handlers.get(handler_index) { + if events.is_hup() { + self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); } - if events.is_writable() { - self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + else { + if events.is_readable() { + self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + } + if events.is_writable() { + self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index }); + } } + self.work_ready.notify_all(); } - self.work_ready.notify_all(); } fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if handler_index >= self.handlers.len() { - panic!("Unexpected timer token: {}", token.as_usize()); - } - if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) { - event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); - let handler = self.handlers[handler_index].clone(); - self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index }); - self.work_ready.notify_all(); + if let Some(handler) = self.handlers.get(handler_index) { + if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) { + event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); + self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); + self.work_ready.notify_all(); + } } } @@ -232,12 +241,13 @@ impl Handler for IoManager where Message: Send + Clone + Sync event_loop.shutdown(); }, IoMessage::AddHandler { handler } => { - let handler_id = { - self.handlers.push(handler.clone()); - self.handlers.len() - 1 - }; + let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id)); }, + IoMessage::RemoveHandler { handler_id } => { + // TODO: flush event loop + self.handlers.remove(handler_id); + }, IoMessage::AddTimer { handler_id, token, delay } => { let timer_id = token + handler_id * TOKENS_PER_HANDLER; let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer"); @@ -250,26 +260,32 @@ impl Handler for IoManager where Message: Send + Clone + Sync } }, IoMessage::RegisterStream { handler_id, token } => { - let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); - handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + if let Some(handler) = self.handlers.get(handler_id) { + handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + } }, IoMessage::DeregisterStream { handler_id, token } => { - let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); - handler.deregister_stream(token, event_loop); - // unregister a timer associated with the token (if any) - let timer_id = token + handler_id * TOKENS_PER_HANDLER; - if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) { - event_loop.clear_timeout(timer.timeout); + if let Some(handler) = self.handlers.get(handler_id) { + handler.deregister_stream(token, event_loop); + // unregister a timer associated with the token (if any) + let timer_id = token + handler_id * TOKENS_PER_HANDLER; + if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) { + event_loop.clear_timeout(timer.timeout); + } } }, IoMessage::UpdateStreamRegistration { handler_id, token } => { - let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); - handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + if let Some(handler) = self.handlers.get(handler_id) { + handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); + } }, IoMessage::UserMessage(data) => { - for n in 0 .. self.handlers.len() { - let handler = self.handlers[n].clone(); - self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: n }); + //TODO: better way to iterate the slab + for id in 0 .. MAX_HANDLERS { + if let Some(h) = self.handlers.get(id) { + let handler = h.clone(); + self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id }); + } } self.work_ready.notify_all(); } @@ -351,8 +367,8 @@ impl IoService where Message: Send + Sync + Clone + 'static { }) } - /// Regiter a IO hadnler with the event loop. - pub fn register_handler(&mut self, handler: Arc+Send>) -> Result<(), IoError> { + /// Regiter an IO handler with the event loop. + pub fn register_handler(&self, handler: Arc+Send>) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::AddHandler { handler: handler, })); @@ -360,13 +376,13 @@ impl IoService where Message: Send + Sync + Clone + 'static { } /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. - pub fn send_message(&mut self, message: Message) -> Result<(), IoError> { + pub fn send_message(&self, message: Message) -> Result<(), IoError> { try!(self.host_channel.send(IoMessage::UserMessage(message))); Ok(()) } /// Create a new message channel - pub fn channel(&mut self) -> IoChannel { + pub fn channel(&self) -> IoChannel { IoChannel { channel: Some(self.host_channel.clone()) } } } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index de6908200..e2cf32aa0 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -18,7 +18,7 @@ use std::net::{SocketAddr}; use std::collections::{HashMap}; use std::str::{FromStr}; use std::sync::*; -use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::ops::*; use std::cmp::min; use std::path::{Path, PathBuf}; @@ -50,7 +50,7 @@ const MAX_HANDSHAKES: usize = 80; const MAX_HANDSHAKES_PER_ROUND: usize = 32; const MAINTENANCE_TIMEOUT: u64 = 1000; -#[derive(Debug)] +#[derive(Debug, Clone)] /// Network service configuration pub struct NetworkConfiguration { /// Directory path to store network configuration. None means nothing will be saved @@ -234,6 +234,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } + /// Send an IO message + pub fn io_channel(&self) -> IoChannel> { + self.io.channel() + } + /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left @@ -245,6 +250,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::Disconnect(peer)); } + /// Sheck if the session is till active. + pub fn is_expired(&self) -> bool { + self.session.as_ref().map(|s| s.lock().unwrap().expired()).unwrap_or(false) + } + /// Register a new IO timer. 'IoHandler::timeout' will be called with the token. pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), UtilError> { self.io.message(NetworkIoMessage::AddTimer { @@ -324,11 +334,12 @@ pub struct Host where Message: Send + Sync + Clone { stats: Arc, pinned_nodes: Vec, num_sessions: AtomicUsize, + stopping: AtomicBool, } impl Host where Message: Send + Sync + Clone { /// Create a new instance - pub fn new(config: NetworkConfiguration) -> Result, UtilError> { + pub fn new(config: NetworkConfiguration, stats: Arc) -> Result, UtilError> { let mut listen_address = match config.listen_address { None => SocketAddr::from_str("0.0.0.0:30304").unwrap(), Some(addr) => addr, @@ -372,9 +383,10 @@ impl Host where Message: Send + Sync + Clone { handlers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(USER_TIMER), - stats: Arc::new(NetworkStats::default()), + stats: stats, pinned_nodes: Vec::new(), num_sessions: AtomicUsize::new(0), + stopping: AtomicBool::new(false), }; let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); @@ -384,10 +396,6 @@ impl Host where Message: Send + Sync + Clone { Ok(host) } - pub fn stats(&self) -> Arc { - self.stats.clone() - } - pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, @@ -402,8 +410,8 @@ impl Host where Message: Send + Sync + Clone { } } - pub fn client_version(&self) -> String { - self.info.read().unwrap().client_version.clone() + pub fn client_version() -> String { + version() } pub fn external_url(&self) -> Option { @@ -416,6 +424,22 @@ impl Host where Message: Send + Sync + Clone { r } + pub fn stop(&self, io: &IoContext>) -> Result<(), UtilError> { + self.stopping.store(true, AtomicOrdering::Release); + let mut to_kill = Vec::new(); + for e in self.sessions.write().unwrap().iter_mut() { + let mut s = e.lock().unwrap(); + s.disconnect(io, DisconnectReason::ClientQuit); + to_kill.push(s.token()); + } + for p in to_kill { + trace!(target: "network", "Disconnecting on shutdown: {}", p); + self.kill_connection(p, io, true); + } + try!(io.unregister_handler()); + Ok(()) + } + fn init_public_interface(&self, io: &IoContext>) -> Result<(), UtilError> { io.clear_timer(INIT_PUBLIC).unwrap(); if self.info.read().unwrap().public_endpoint.is_some() { @@ -787,6 +811,9 @@ impl IoHandler> for Host where Messa } fn stream_readable(&self, io: &IoContext>, stream: StreamToken) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), DISCOVERY => { @@ -802,6 +829,9 @@ impl IoHandler> for Host where Messa } fn stream_writable(&self, io: &IoContext>, stream: StreamToken) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), DISCOVERY => { @@ -813,6 +843,9 @@ impl IoHandler> for Host where Messa } fn timeout(&self, io: &IoContext>, token: TimerToken) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match token { IDLE => self.maintain_network(io), INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e| @@ -835,8 +868,8 @@ impl IoHandler> for Host where Messa }, _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { - None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, - Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } + None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, + Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } }, None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us } @@ -844,6 +877,9 @@ impl IoHandler> for Host where Messa } fn message(&self, io: &IoContext>, message: &NetworkIoMessage) { + if self.stopping.load(AtomicOrdering::Acquire) { + return; + } match *message { NetworkIoMessage::AddHandler { ref handler, @@ -1009,6 +1045,6 @@ fn host_client_url() { let mut config = NetworkConfiguration::new(); let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2"); config.use_secret = Some(key); - let host: Host = Host::new(config).unwrap(); + let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap(); assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 29f3d166c..d074e6631 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -56,8 +56,9 @@ //! } //! //! fn main () { -//! let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); +//! let mut service = NetworkService::::new(NetworkConfiguration::new_local()).expect("Error creating network service"); //! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); +//! service.start().expect("Error starting service"); //! //! // Wait for quit condition //! // ... diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 49957f7e7..e8db354d4 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -28,33 +28,33 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, - host: Arc>, + host: RwLock>>>, stats: Arc, - panic_handler: Arc + panic_handler: Arc, + config: NetworkConfiguration, } impl NetworkService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop - pub fn start(config: NetworkConfiguration) -> Result, UtilError> { + pub fn new(config: NetworkConfiguration) -> Result, UtilError> { let panic_handler = PanicHandler::new_in_arc(); - let mut io_service = try!(IoService::>::start()); + let io_service = try!(IoService::>::start()); panic_handler.forward_from(&io_service); - let host = Arc::new(try!(Host::new(config))); - let stats = host.stats().clone(); - let host_info = host.client_version(); - try!(io_service.register_handler(host.clone())); + let stats = Arc::new(NetworkStats::new()); + let host_info = Host::::client_version(); Ok(NetworkService { io_service: io_service, host_info: host_info, stats: stats, panic_handler: panic_handler, - host: host, + host: RwLock::new(None), + config: config, }) } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&mut self, handler: Arc+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + pub fn register_protocol(&self, handler: Arc+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { try!(self.io_service.send_message(NetworkIoMessage::AddHandler { handler: handler, protocol: protocol, @@ -69,8 +69,8 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } /// Returns underlying io service. - pub fn io(&mut self) -> &mut IoService> { - &mut self.io_service + pub fn io(&self) -> &IoService> { + &self.io_service } /// Returns network statistics. @@ -80,12 +80,36 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat /// Returns external url if available. pub fn external_url(&self) -> Option { - self.host.external_url() + let host = self.host.read().unwrap(); + host.as_ref().and_then(|h| h.external_url()) } /// Returns external url if available. - pub fn local_url(&self) -> String { - self.host.local_url() + pub fn local_url(&self) -> Option { + let host = self.host.read().unwrap(); + host.as_ref().map(|h| h.local_url()) + } + + /// Start network IO + pub fn start(&self) -> Result<(), UtilError> { + let mut host = self.host.write().unwrap(); + if host.is_none() { + let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone()))); + try!(self.io_service.register_handler(h.clone())); + *host = Some(h); + } + Ok(()) + } + + /// Stop network IO + pub fn stop(&self) -> Result<(), UtilError> { + let mut host = self.host.write().unwrap(); + if let Some(ref host) = *host { + let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host + try!(host.stop(&io)); + } + *host = None; + Ok(()) } } diff --git a/util/src/network/stats.rs b/util/src/network/stats.rs index 30e751a81..cc3b845da 100644 --- a/util/src/network/stats.rs +++ b/util/src/network/stats.rs @@ -65,7 +65,7 @@ impl NetworkStats { self.sessions.load(Ordering::Relaxed) } - #[cfg(test)] + /// Create a new empty instance. pub fn new() -> NetworkStats { NetworkStats { recv: AtomicUsize::new(0), diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index b43da9320..861edc144 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -97,7 +97,8 @@ impl NetworkProtocolHandler for TestProtocol { #[test] fn net_service() { - let mut service = NetworkService::::start(NetworkConfiguration::new_local()).expect("Error creating network service"); + let service = NetworkService::::new(NetworkConfiguration::new_local()).expect("Error creating network service"); + service.start().unwrap(); service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap(); } @@ -108,12 +109,14 @@ fn net_connect() { let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::::start(config1).unwrap(); + let mut service1 = NetworkService::::new(config1).unwrap(); + service1.start().unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let mut config2 = NetworkConfiguration::new_local(); - info!("net_connect: local URL: {}", service1.local_url()); - config2.boot_nodes = vec![ service1.local_url() ]; - let mut service2 = NetworkService::::start(config2).unwrap(); + info!("net_connect: local URL: {}", service1.local_url().unwrap()); + config2.boot_nodes = vec![ service1.local_url().unwrap() ]; + let mut service2 = NetworkService::::new(config2).unwrap(); + service2.start().unwrap(); let handler2 = TestProtocol::register(&mut service2, false); while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) { thread::sleep(Duration::from_millis(50)); @@ -122,17 +125,28 @@ fn net_connect() { assert!(service2.stats().sessions() >= 1); } +#[test] +fn net_start_stop() { + let config = NetworkConfiguration::new_local(); + let service = NetworkService::::new(config).unwrap(); + service.start().unwrap(); + service.stop().unwrap(); + service.start().unwrap(); +} + #[test] fn net_disconnect() { let key1 = KeyPair::create().unwrap(); let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::::start(config1).unwrap(); + let mut service1 = NetworkService::::new(config1).unwrap(); + service1.start().unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let mut config2 = NetworkConfiguration::new_local(); - config2.boot_nodes = vec![ service1.local_url() ]; - let mut service2 = NetworkService::::start(config2).unwrap(); + config2.boot_nodes = vec![ service1.local_url().unwrap() ]; + let mut service2 = NetworkService::::new(config2).unwrap(); + service2.start().unwrap(); let handler2 = TestProtocol::register(&mut service2, true); while !(handler1.got_disconnect() && handler2.got_disconnect()) { thread::sleep(Duration::from_millis(50)); @@ -144,7 +158,8 @@ fn net_disconnect() { #[test] fn net_timeout() { let config = NetworkConfiguration::new_local(); - let mut service = NetworkService::::start(config).unwrap(); + let mut service = NetworkService::::new(config).unwrap(); + service.start().unwrap(); let handler = TestProtocol::register(&mut service, false); while !handler.got_timeout() { thread::sleep(Duration::from_millis(50));