diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 5a69fd0a9..19f5d0a4c 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -104,7 +104,7 @@ struct VerifyingBlock { struct QueueSignal { deleting: Arc, signalled: AtomicBool, - message_channel: IoChannel, + message_channel: IoChannel, } impl QueueSignal { @@ -116,7 +116,7 @@ impl QueueSignal { } if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { - if let Err(e) = self.message_channel.send(UserMessage(SyncMessage::BlockVerified)) { + if let Err(e) = self.message_channel.send(ClientIoMessage::BlockVerified) { debug!("Error sending BlockVerified message: {:?}", e); } } @@ -137,7 +137,7 @@ struct Verification { impl BlockQueue { /// Creates a new queue instance. - pub fn new(config: BlockQueueConfig, engine: Arc>, message_channel: IoChannel) -> BlockQueue { + pub fn new(config: BlockQueueConfig, engine: Arc>, message_channel: IoChannel) -> BlockQueue { let verification = Arc::new(Verification { unverified: Mutex::new(VecDeque::new()), verified: Mutex::new(VecDeque::new()), diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs new file mode 100644 index 000000000..2b1d8d562 --- /dev/null +++ b/ethcore/src/client/chain_notify.rs @@ -0,0 +1,40 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use util::numbers::*; + +/// Represents what has to be handled by actor listening to chain events +pub trait ChainNotify : Send + Sync { + /// fires when chain has new blocks + fn new_blocks(&self, + _imported: Vec, + _invalid: Vec, + _enacted: Vec, + _retracted: Vec, + _sealed: Vec) { + // does nothing by default + } + + /// fires when chain achieves active mode + fn start(&self) { + // does nothing by default + } + + /// fires when chain achieves passive mode + fn stop(&self) { + // does nothing by default + } +} diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 0c6a2c005..64ccb7db1 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -28,7 +28,6 @@ use std::time::Instant; // util use util::numbers::*; use util::panics::*; -use util::network::*; use util::io::*; use util::rlp; use util::sha3::*; @@ -47,7 +46,7 @@ use state::State; use spec::Spec; use engine::Engine; use views::HeaderView; -use service::{NetSyncMessage, SyncMessage}; +use service::ClientIoMessage; use env_info::LastHashes; use verification; use verification::{PreverifiedBlock, Verifier}; @@ -60,7 +59,7 @@ use block_queue::{BlockQueue, BlockQueueInfo}; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient, - TraceFilter, CallAnalytics, BlockImportError, Mode}; + TraceFilter, CallAnalytics, BlockImportError, Mode, ChainNotify}; use client::Error as ClientError; use env_info::EnvInfo; use executive::{Executive, Executed, TransactOptions, contract_address}; @@ -141,7 +140,8 @@ pub struct Client { miner: Arc, sleep_state: Mutex, liveness: AtomicBool, - io_channel: IoChannel, + io_channel: IoChannel, + notify: RwLock>>, queue_transactions: AtomicUsize, previous_enode: Mutex>, } @@ -178,7 +178,7 @@ impl Client { spec: Spec, path: &Path, miner: Arc, - message_channel: IoChannel + message_channel: IoChannel, ) -> Result, ClientError> { let path = get_db_path(path, config.pruning, spec.genesis_header().hash()); let gb = spec.genesis_block(); @@ -228,12 +228,24 @@ impl Client { trie_factory: TrieFactory::new(config.trie_spec), miner: miner, io_channel: message_channel, + notify: RwLock::new(None), queue_transactions: AtomicUsize::new(0), previous_enode: Mutex::new(None), }; Ok(Arc::new(client)) } + /// Sets the actor to be notified on certain events + pub fn set_notify(&self, target: &Arc) { + let mut write_lock = self.notify.unwrapped_write(); + *write_lock = Some(Arc::downgrade(target)); + } + + fn notify(&self) -> Option> { + let read_lock = self.notify.unwrapped_read(); + read_lock.as_ref().and_then(|weak| weak.upgrade()) + } + /// Flush the block import queue. pub fn flush_queue(&self) { self.block_queue.flush(); @@ -327,52 +339,54 @@ impl Client { } /// This is triggered by a message coming from a block queue when the block is ready for insertion - pub fn import_verified_blocks(&self, io: &IoChannel) -> usize { + pub fn import_verified_blocks(&self, io: &IoChannel) -> usize { let max_blocks_to_import = 64; + let (imported_blocks, import_results, invalid_blocks, original_best, imported) = { + let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); + let mut invalid_blocks = HashSet::new(); + let mut import_results = Vec::with_capacity(max_blocks_to_import); - let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); - let mut invalid_blocks = HashSet::new(); - let mut import_results = Vec::with_capacity(max_blocks_to_import); + let _import_lock = self.import_lock.lock(); + let _timer = PerfTimer::new("import_verified_blocks"); + let blocks = self.block_queue.drain(max_blocks_to_import); - let _import_lock = self.import_lock.lock(); - let _timer = PerfTimer::new("import_verified_blocks"); - let blocks = self.block_queue.drain(max_blocks_to_import); + let original_best = self.chain_info().best_block_hash; - let original_best = self.chain_info().best_block_hash; + for block in blocks { + let header = &block.header; - for block in blocks { - let header = &block.header; + if invalid_blocks.contains(&header.parent_hash) { + invalid_blocks.insert(header.hash()); + continue; + } + let closed_block = self.check_and_close_block(&block); + if let Err(_) = closed_block { + invalid_blocks.insert(header.hash()); + continue; + } + let closed_block = closed_block.unwrap(); + imported_blocks.push(header.hash()); - if invalid_blocks.contains(&header.parent_hash) { - invalid_blocks.insert(header.hash()); - continue; + let route = self.commit_block(closed_block, &header.hash(), &block.bytes); + import_results.push(route); + + self.report.unwrapped_write().accrue_block(&block); + trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } - let closed_block = self.check_and_close_block(&block); - if let Err(_) = closed_block { - invalid_blocks.insert(header.hash()); - continue; + + let imported = imported_blocks.len(); + let invalid_blocks = invalid_blocks.into_iter().collect::>(); + + { + if !invalid_blocks.is_empty() { + self.block_queue.mark_as_bad(&invalid_blocks); + } + if !imported_blocks.is_empty() { + self.block_queue.mark_as_good(&imported_blocks); + } } - let closed_block = closed_block.unwrap(); - imported_blocks.push(header.hash()); - - let route = self.commit_block(closed_block, &header.hash(), &block.bytes); - import_results.push(route); - - self.report.unwrapped_write().accrue_block(&block); - trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); - } - - let imported = imported_blocks.len(); - let invalid_blocks = invalid_blocks.into_iter().collect::>(); - - { - if !invalid_blocks.is_empty() { - self.block_queue.mark_as_bad(&invalid_blocks); - } - if !imported_blocks.is_empty() { - self.block_queue.mark_as_good(&imported_blocks); - } - } + (imported_blocks, import_results, invalid_blocks, original_best, imported) + }; { if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { @@ -382,13 +396,15 @@ impl Client { self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted); } - io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { - imported: imported_blocks, - invalid: invalid_blocks, - enacted: enacted, - retracted: retracted, - sealed: Vec::new(), - })).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); + if let Some(notify) = self.notify() { + notify.new_blocks( + imported_blocks, + invalid_blocks, + enacted, + retracted, + Vec::new(), + ); + } } } @@ -566,7 +582,9 @@ impl Client { fn wake_up(&self) { if !self.liveness.load(AtomicOrdering::Relaxed) { self.liveness.store(true, AtomicOrdering::Relaxed); - self.io_channel.send(NetworkIoMessage::User(SyncMessage::StartNetwork)).unwrap(); + if let Some(notify) = self.notify() { + notify.start(); + } trace!(target: "mode", "wake_up: Waking."); } } @@ -576,7 +594,9 @@ impl Client { // only sleep if the import queue is mostly empty. if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON { self.liveness.store(false, AtomicOrdering::Relaxed); - self.io_channel.send(NetworkIoMessage::User(SyncMessage::StopNetwork)).unwrap(); + if let Some(notify) = self.notify() { + notify.stop(); + } trace!(target: "mode", "sleep: Sleeping."); } else { trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing."); @@ -901,7 +921,7 @@ impl BlockChainClient for Client { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); - match self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewTransactions(transactions))) { + match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) { Ok(_) => { self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } @@ -969,13 +989,15 @@ impl MiningBlockChainClient for Client { let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted); - self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { - imported: vec![h.clone()], - invalid: vec![], - enacted: enacted, - retracted: retracted, - sealed: vec![h.clone()], - })).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); + if let Some(notify) = self.notify() { + notify.new_blocks( + vec![h.clone()], + vec![], + enacted, + retracted, + vec![h.clone()], + ); + } } if self.chain_info().best_block_hash != original_best { diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 7a24dc94b..a7ad99e80 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -20,6 +20,7 @@ mod config; mod error; mod test_client; mod trace; +mod chain_notify; pub use self::client::*; pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType}; @@ -29,6 +30,7 @@ pub use self::test_client::{TestBlockChainClient, EachBlockWith}; pub use types::trace_filter::Filter as TraceFilter; pub use executive::{Executed, Executive, TransactOptions}; pub use env_info::{LastHashes, EnvInfo}; +pub use self::chain_notify::ChainNotify; use util::bytes::Bytes; use util::hash::{Address, H256, H2048}; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 27cab9aab..3a5d555d8 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -20,67 +20,50 @@ use util::*; use util::panics::*; use spec::Spec; use error::*; -use client::{Client, ClientConfig}; +use client::{Client, ClientConfig, ChainNotify}; use miner::Miner; /// Message type for external and internal events #[derive(Clone)] -pub enum SyncMessage { - /// New block has been imported into the blockchain - NewChainBlocks { - /// Hashes of blocks imported to blockchain - imported: Vec, - /// Hashes of blocks not imported to blockchain (because were invalid) - invalid: Vec, - /// Hashes of blocks that were removed from canonical chain - retracted: Vec, - /// Hashes of blocks that are now included in cannonical chain - enacted: Vec, - /// Hashes of blocks that are sealed by this node - sealed: Vec, - }, +pub enum ClientIoMessage { /// Best Block Hash in chain has been changed NewChainHead, /// A block is ready BlockVerified, /// New transaction RLPs are ready to be imported NewTransactions(Vec), - /// Start network command. - StartNetwork, - /// Stop network command. - StopNetwork, } -/// IO Message type used for Network service -pub type NetSyncMessage = NetworkIoMessage; - /// Client service setup. Creates and registers client and network services with the IO subsystem. pub struct ClientService { - net_service: Arc>, + io_service: Arc>, client: Arc, panic_handler: Arc } impl ClientService { /// Start the service in a separate thread. - pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc, enable_network: bool) -> Result { + pub fn start( + config: ClientConfig, + spec: Spec, + db_path: &Path, + miner: Arc, + ) -> Result + { let panic_handler = PanicHandler::new_in_arc(); - let net_service = try!(NetworkService::new(net_config)); - panic_handler.forward_from(&net_service); - if enable_network { - try!(net_service.start()); - } + let io_service = try!(IoService::::start()); + panic_handler.forward_from(&io_service); info!("Configured for {} using {} engine", spec.name.clone().apply(Colour::White.bold()), spec.engine.name().apply(Colour::Yellow.bold())); - let client = try!(Client::new(config, spec, db_path, miner, net_service.io().channel())); + let client = try!(Client::new(config, spec, db_path, miner, io_service.channel())); panic_handler.forward_from(client.deref()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); - try!(net_service.io().register_handler(client_io)); + try!(io_service.register_handler(client_io)); Ok(ClientService { - net_service: Arc::new(net_service), + io_service: Arc::new(io_service), client: client, panic_handler: panic_handler, }) @@ -92,8 +75,8 @@ impl ClientService { } /// Get general IO interface - pub fn register_io_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { - self.net_service.io().register_handler(handler) + pub fn register_io_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { + self.io_service.register_handler(handler) } /// Get client interface @@ -102,8 +85,13 @@ impl ClientService { } /// Get network service component - pub fn network(&mut self) -> Arc> { - self.net_service.clone() + pub fn io(&self) -> Arc> { + self.io_service.clone() + } + + /// Set the actor to be notified on certain chain events + pub fn set_notify(&self, notify: &Arc) { + self.client.set_notify(notify); } } @@ -121,26 +109,22 @@ struct ClientIoHandler { const CLIENT_TICK_TIMER: TimerToken = 0; const CLIENT_TICK_MS: u64 = 5000; -impl IoHandler for ClientIoHandler { - fn initialize(&self, io: &IoContext) { +impl IoHandler for ClientIoHandler { + fn initialize(&self, io: &IoContext) { io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); } - fn timeout(&self, _io: &IoContext, timer: TimerToken) { + fn timeout(&self, _io: &IoContext, timer: TimerToken) { if timer == CLIENT_TICK_TIMER { self.client.tick(); } } #[cfg_attr(feature="dev", allow(single_match))] - fn message(&self, io: &IoContext, net_message: &NetSyncMessage) { + fn message(&self, io: &IoContext, net_message: &ClientIoMessage) { match *net_message { - UserMessage(ref message) => match *message { - SyncMessage::BlockVerified => { self.client.import_verified_blocks(&io.channel()); } - SyncMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(&transactions); } - _ => {} // ignore other messages - }, - NetworkIoMessage::NetworkStarted(ref url) => { self.client.network_started(url); } + ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(&io.channel()); } + ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(&transactions); } _ => {} // ignore other messages } } @@ -150,7 +134,6 @@ impl IoHandler for ClientIoHandler { mod tests { use super::*; use tests::helpers::*; - use util::network::*; use devtools::*; use client::ClientConfig; use std::sync::Arc; @@ -162,10 +145,8 @@ mod tests { let service = ClientService::start( ClientConfig::default(), get_test_spec(), - NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::with_spec(get_test_spec())), - false ); assert!(service.is_ok()); } diff --git a/parity/informant.rs b/parity/informant.rs index 5f2818f31..006cfc575 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -21,8 +21,8 @@ use self::ansi_term::Style; use std::time::{Instant, Duration}; use std::sync::RwLock; use std::ops::{Deref, DerefMut}; -use ethsync::{EthSync, SyncProvider}; -use util::{Uint, RwLockable, NetworkService}; +use ethsync::SyncStatus; +use util::{Uint, RwLockable, NetworkConfiguration}; use ethcore::client::*; use number_prefix::{binary_prefix, Standalone, Prefixed}; @@ -75,7 +75,8 @@ impl Informant { } } - pub fn tick(&self, client: &Client, maybe_sync: Option<(&EthSync, &NetworkService)>) where Message: Send + Sync + Clone + 'static { + #[cfg_attr(feature="dev", allow(match_bool))] + pub fn tick(&self, client: &Client, maybe_status: Option<(SyncStatus, NetworkConfiguration)>) { let elapsed = self.last_tick.unwrapped_read().elapsed(); if elapsed < Duration::from_secs(5) { return; @@ -108,10 +109,8 @@ impl Informant { paint(Yellow.bold(), format!("{:4}", ((report.transactions_applied - last_report.transactions_applied) * 1000) as u64 / elapsed.as_milliseconds())), paint(Yellow.bold(), format!("{:3}", ((report.gas_processed - last_report.gas_processed) / From::from(elapsed.as_milliseconds() * 1000)).low_u64())), - match maybe_sync { - Some((sync, net)) => { - let sync_info = sync.status(); - let net_config = net.config(); + match maybe_status { + Some((ref sync_info, ref net_config)) => { format!("{}/{}/{} peers {} ", paint(Green.bold(), format!("{:2}", sync_info.num_active_peers)), paint(Green.bold(), format!("{:2}", sync_info.num_peers)), @@ -128,13 +127,9 @@ impl Informant { paint(Purple.bold(), format!("{:>8}", Informant::format_bytes(report.state_db_mem))), paint(Purple.bold(), format!("{:>8}", Informant::format_bytes(cache_info.total()))), paint(Purple.bold(), format!("{:>8}", Informant::format_bytes(queue_info.mem_used))), - match maybe_sync { - Some((sync, _)) => { - let sync_info = sync.status(); - format!(" {} sync", paint(Purple.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))) - } - None => String::new() - }, + if let Some((ref sync_info, _)) = maybe_status { + format!(" {} sync", paint(Purple.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))) + } else { String::new() }, ); } diff --git a/parity/io_handler.rs b/parity/io_handler.rs index 497d3e374..da989a000 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -14,12 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::{Arc, Weak}; +use std::sync::Arc; use ethcore::client::Client; -use ethcore::service::{NetSyncMessage, SyncMessage}; -use ethsync::EthSync; +use ethcore::service::ClientIoMessage; +use ethsync::{EthSync, SyncProvider, ManageNetwork}; use ethcore::account_provider::AccountProvider; -use util::{TimerToken, IoHandler, IoContext, NetworkService, NetworkIoMessage}; +use util::{TimerToken, IoHandler, IoContext}; use informant::Informant; @@ -30,38 +30,18 @@ pub struct ClientIoHandler { pub sync: Arc, pub accounts: Arc, pub info: Informant, - pub network: Weak>, } -impl IoHandler for ClientIoHandler { - fn initialize(&self, io: &IoContext) { +impl IoHandler for ClientIoHandler { + fn initialize(&self, io: &IoContext) { io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); } - fn timeout(&self, _io: &IoContext, timer: TimerToken) { + fn timeout(&self, _io: &IoContext, timer: TimerToken) { if let INFO_TIMER = timer { - if let Some(net) = self.network.upgrade() { - self.info.tick(&self.client, Some((&self.sync, &net))); - } - } - } - - fn message(&self, _io: &IoContext, message: &NetSyncMessage) { - match *message { - NetworkIoMessage::User(SyncMessage::StartNetwork) => { - if let Some(network) = self.network.upgrade() { - network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); - EthSync::register(&*network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e)); - } - }, - NetworkIoMessage::User(SyncMessage::StopNetwork) => { - if let Some(network) = self.network.upgrade() { - network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); - } - }, - _ => {/* Ignore other messages */}, + let sync_status = self.sync.status(); + let network_config = self.sync.network_config(); + self.info.tick(&self.client, Some((sync_status, network_config))); } } } - - diff --git a/parity/main.rs b/parity/main.rs index 45c60234d..dc7f7e352 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -81,9 +81,10 @@ use std::thread::sleep; use std::time::Duration; use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; -use util::{Lockable, H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError, Colour, Applyable, version, journaldb}; +use util::{Lockable, H256, ToPretty, PayloadInfo, Bytes, Colour, Applyable, version, journaldb}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; -use ethcore::client::{Mode, BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError}; +use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, + ChainNotify, Mode}; use ethcore::error::{ImportError}; use ethcore::service::ClientService; use ethcore::spec::Spec; @@ -231,13 +232,11 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) miner.set_transactions_limit(conf.args.flag_tx_queue_size); // Build client - let mut service = ClientService::start( + let service = ClientService::start( client_config, spec, - net_settings, Path::new(&conf.path()), miner.clone(), - match conf.mode() { Mode::Dark(..) => false, _ => !conf.args.flag_no_network } ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -247,8 +246,14 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) let network_settings = Arc::new(conf.network_settings()); // Sync - let sync = EthSync::new(sync_config, client.clone()); - EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into())); + let sync = EthSync::new(sync_config, client.clone(), net_settings) + .unwrap_or_else(|e| die_with_error("Sync", ethcore::error::Error::Util(e))); + service.set_notify(&(sync.clone() as Arc)); + + // if network is active by default + if match conf.mode() { Mode::Dark(..) => false, _ => !conf.args.flag_no_network } { + sync.start(); + } let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies { signer_port: conf.signer_port(), @@ -261,7 +266,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) logger: logger.clone(), settings: network_settings.clone(), allow_pending_receipt_query: !conf.args.flag_geth, - net_service: service.network(), + net_service: sync.clone(), }); let dependencies = rpc::Dependencies { @@ -311,7 +316,6 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) info: Informant::new(conf.have_color()), sync: sync.clone(), accounts: account_service.clone(), - network: Arc::downgrade(&service.network()), }); service.register_io_handler(io_handler).expect("Error registering IO handler"); @@ -345,24 +349,11 @@ fn execute_export(conf: Configuration) { unsafe { ::fdlimit::raise_fd_limit(); } let spec = conf.spec(); - let net_settings = NetworkConfiguration { - config_path: None, - listen_address: None, - public_address: None, - udp_port: None, - nat_enabled: false, - discovery_enabled: false, - boot_nodes: Vec::new(), - use_secret: None, - ideal_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, - }; let client_config = conf.client_config(&spec); // Build client let service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::with_spec(conf.spec())), false + client_config, spec, Path::new(&conf.path()), Arc::new(Miner::with_spec(conf.spec())) ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -419,24 +410,11 @@ fn execute_import(conf: Configuration) { unsafe { ::fdlimit::raise_fd_limit(); } let spec = conf.spec(); - let net_settings = NetworkConfiguration { - config_path: None, - listen_address: None, - public_address: None, - udp_port: None, - nat_enabled: false, - discovery_enabled: false, - boot_nodes: Vec::new(), - use_secret: None, - ideal_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, - }; let client_config = conf.client_config(&spec); // Build client let service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::with_spec(conf.spec())), false + client_config, spec, Path::new(&conf.path()), Arc::new(Miner::with_spec(conf.spec())) ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -485,7 +463,7 @@ fn execute_import(conf: Configuration) { Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { trace!("Skipping block already in chain."); } Err(e) => die!("Cannot import block: {:?}", e) } - informant.tick::<&'static ()>(client.deref(), None); + informant.tick(client.deref(), None); }; match format { diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 4b971c167..7606ccd0b 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -18,13 +18,12 @@ use std::collections::BTreeMap; use std::str::FromStr; use std::sync::Arc; -use ethsync::EthSync; +use ethsync::{EthSync, ManageNetwork}; use ethcore::miner::{Miner, ExternalMiner}; use ethcore::client::Client; use util::RotatingLogger; use ethcore::account_provider::AccountProvider; use util::network_settings::NetworkSettings; -use util::network::NetworkService; #[cfg(feature="rpc")] pub use ethcore_rpc::ConfirmationsQueue; @@ -89,7 +88,7 @@ pub struct Dependencies { pub logger: Arc, pub settings: Arc, pub allow_pending_receipt_query: bool, - pub net_service: Arc>, + pub net_service: Arc, } fn to_modules(apis: &[Api]) -> BTreeMap { diff --git a/rpc/src/v1/impls/ethcore_set.rs b/rpc/src/v1/impls/ethcore_set.rs index 1b8dd474e..ae4756eac 100644 --- a/rpc/src/v1/impls/ethcore_set.rs +++ b/rpc/src/v1/impls/ethcore_set.rs @@ -19,26 +19,26 @@ use std::sync::{Arc, Weak}; use jsonrpc_core::*; use ethcore::miner::MinerService; use ethcore::client::MiningBlockChainClient; -use ethcore::service::SyncMessage; -use util::network::{NetworkService, NonReservedPeerMode}; +use ethsync::ManageNetwork; +use util::network::NonReservedPeerMode; use v1::traits::EthcoreSet; use v1::types::{Bytes, H160, U256}; /// Ethcore-specific rpc interface for operations altering the settings. pub struct EthcoreSetClient where C: MiningBlockChainClient, - M: MinerService { - + M: MinerService +{ client: Weak, miner: Weak, - net: Weak>, + net: Weak, } impl EthcoreSetClient where C: MiningBlockChainClient, M: MinerService { /// Creates new `EthcoreSetClient`. - pub fn new(client: &Arc, miner: &Arc, net: &Arc>) -> Self { + pub fn new(client: &Arc, miner: &Arc, net: &Arc) -> Self { EthcoreSetClient { client: Arc::downgrade(client), miner: Arc::downgrade(miner), @@ -144,4 +144,14 @@ impl EthcoreSet for EthcoreSetClient where take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept); to_value(&true) } + + fn start_network(&self, _: Params) -> Result { + take_weak!(self.net).start_network(); + Ok(Value::Bool(true)) + } + + fn stop_network(&self, _: Params) -> Result { + take_weak!(self.net).stop_network(); + Ok(Value::Bool(true)) + } } diff --git a/rpc/src/v1/impls/net.rs b/rpc/src/v1/impls/net.rs index 977561ede..20447ac92 100644 --- a/rpc/src/v1/impls/net.rs +++ b/rpc/src/v1/impls/net.rs @@ -48,13 +48,4 @@ impl Net for NetClient where S: SyncProvider + 'static { Ok(Value::Bool(true)) } - fn start_network(&self, _: Params) -> Result { - take_weak!(self.sync).start_network(); - Ok(Value::Bool(true)) - } - - fn stop_network(&self, _: Params) -> Result { - take_weak!(self.sync).stop_network(); - Ok(Value::Bool(true)) - } } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index d91c2a98c..9dfb3ba27 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -59,11 +59,5 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.unwrapped_read().clone() } - - fn start_network(&self) { - } - - fn stop_network(&self) { - } } diff --git a/rpc/src/v1/tests/mocked/ethcore_set.rs b/rpc/src/v1/tests/mocked/ethcore_set.rs index f43233733..853b7de30 100644 --- a/rpc/src/v1/tests/mocked/ethcore_set.rs +++ b/rpc/src/v1/tests/mocked/ethcore_set.rs @@ -19,12 +19,12 @@ use std::str::FromStr; use jsonrpc_core::IoHandler; use v1::{EthcoreSet, EthcoreSetClient}; use ethcore::miner::MinerService; -use ethcore::service::SyncMessage; use ethcore::client::TestBlockChainClient; use v1::tests::helpers::TestMinerService; use util::numbers::*; -use util::network::{NetworkConfiguration, NetworkService}; use rustc_serialize::hex::FromHex; +use super::manage_network::TestManageNetwork; +use ethsync::ManageNetwork; fn miner_service() -> Arc { Arc::new(TestMinerService::default()) @@ -34,12 +34,12 @@ fn client_service() -> Arc { Arc::new(TestBlockChainClient::default()) } -fn network_service() -> Arc> { - Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap()) +fn network_service() -> Arc { + Arc::new(TestManageNetwork) } -fn ethcore_set_client(client: &Arc, miner: &Arc, net: &Arc>) -> EthcoreSetClient { - EthcoreSetClient::new(client, miner, net) +fn ethcore_set_client(client: &Arc, miner: &Arc, net: &Arc) -> EthcoreSetClient { + EthcoreSetClient::new(client, miner, &(net.clone() as Arc)) } #[test] diff --git a/rpc/src/v1/tests/mocked/manage_network.rs b/rpc/src/v1/tests/mocked/manage_network.rs new file mode 100644 index 000000000..5ba243484 --- /dev/null +++ b/rpc/src/v1/tests/mocked/manage_network.rs @@ -0,0 +1,30 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use ethsync::ManageNetwork; +use util::network::NetworkConfiguration; + +pub struct TestManageNetwork; + +// TODO: rob, gavin (originally introduced this functions) - proper tests and test state +impl ManageNetwork for TestManageNetwork { + fn set_non_reserved_mode(&self, _mode: ::util::network::NonReservedPeerMode) {} + fn remove_reserved_peer(&self, _peer: &str) -> Result<(), String> { Ok(()) } + fn add_reserved_peer(&self, _peer: &str) -> Result<(), String> { Ok(()) } + fn start_network(&self) {} + fn stop_network(&self) {} + fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::new_local() } +} diff --git a/rpc/src/v1/tests/mocked/mod.rs b/rpc/src/v1/tests/mocked/mod.rs index 503c75c12..cc54555b7 100644 --- a/rpc/src/v1/tests/mocked/mod.rs +++ b/rpc/src/v1/tests/mocked/mod.rs @@ -26,3 +26,4 @@ mod personal_signer; mod ethcore; mod ethcore_set; mod rpc; +mod manage_network; diff --git a/rpc/src/v1/traits/ethcore_set.rs b/rpc/src/v1/traits/ethcore_set.rs index 8afbcdab9..bd1f6bf7c 100644 --- a/rpc/src/v1/traits/ethcore_set.rs +++ b/rpc/src/v1/traits/ethcore_set.rs @@ -55,6 +55,12 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { /// Accept non-reserved peers (default behavior) fn accept_non_reserved_peers(&self, _: Params) -> Result; + /// Start the network. + fn start_network(&self, _: Params) -> Result; + + /// Stop the network. + fn stop_network(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); diff --git a/rpc/src/v1/traits/net.rs b/rpc/src/v1/traits/net.rs index 2df8c8f9c..56fba3e32 100644 --- a/rpc/src/v1/traits/net.rs +++ b/rpc/src/v1/traits/net.rs @@ -30,12 +30,6 @@ pub trait Net: Sized + Send + Sync + 'static { /// Otherwise false. fn is_listening(&self, _: Params) -> Result; - /// Start the network. - fn start_network(&self, _: Params) -> Result; - - /// Stop the network. - fn stop_network(&self, _: Params) -> Result; - /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); diff --git a/sync/src/io.rs b/sync/src/io.rs index 5d4b32464..81c1d02a0 100644 --- a/sync/src/io.rs +++ b/sync/src/io.rs @@ -16,7 +16,6 @@ use util::{NetworkContext, PeerId, PacketId,}; use util::error::UtilError; -use ethcore::service::SyncMessage; use ethcore::client::BlockChainClient; /// IO interface for the syning handler. @@ -47,13 +46,13 @@ pub trait SyncIo { /// Wraps `NetworkContext` and the blockchain client pub struct NetSyncIo<'s, 'h> where 'h: 's { - network: &'s NetworkContext<'h, SyncMessage>, + network: &'s NetworkContext<'h>, chain: &'s BlockChainClient } impl<'s, 'h> NetSyncIo<'s, 'h> { /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { + pub fn new(network: &'s NetworkContext<'h>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> { NetSyncIo { network: network, chain: chain, diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 97e3d29ee..3b252bfb7 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -34,15 +34,14 @@ //! extern crate ethsync; //! use std::env; //! use std::sync::Arc; -//! use util::network::{NetworkService, NetworkConfiguration}; +//! use util::network::{NetworkConfiguration}; +//! use util::io::IoChannel; //! use ethcore::client::{Client, ClientConfig}; -//! use ethsync::{EthSync, SyncConfig}; +//! use ethsync::{EthSync, SyncConfig, ManageNetwork}; //! use ethcore::ethereum; //! use ethcore::miner::{GasPricer, Miner}; //! //! fn main() { -//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap(); -//! service.start().unwrap(); //! let dir = env::temp_dir(); //! let miner = Miner::new( //! Default::default(), @@ -55,10 +54,10 @@ //! ethereum::new_frontier(), //! &dir, //! miner, -//! service.io().channel() +//! IoChannel::disconnected() //! ).unwrap(); -//! let sync = EthSync::new(SyncConfig::default(), client); -//! EthSync::register(&mut service, sync); +//! let sync = EthSync::new(SyncConfig::default(), client, NetworkConfiguration::new()).unwrap(); +//! sync.start_network(); //! } //! ``` @@ -75,13 +74,10 @@ extern crate heapsize; use std::ops::*; use std::sync::*; -use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; -use util::{TimerToken, U256, RwLockable}; -use ethcore::client::Client; -use ethcore::service::{SyncMessage, NetSyncMessage}; +use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkConfiguration}; +use util::{TimerToken, U256, H256, RwLockable, UtilError}; +use ethcore::client::{Client, ChainNotify}; use io::NetSyncIo; -use util::io::IoChannel; -use util::{NetworkIoMessage, NetworkError}; use chain::ChainSync; mod chain; @@ -91,6 +87,9 @@ mod io; #[cfg(test)] mod tests; +/// Ethereum sync protocol +pub const ETH_PROTOCOL: &'static str = "eth"; + /// Sync configuration pub struct SyncConfig { /// Max blocks to download ahead @@ -112,99 +111,142 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; - /// Start the network - fn start_network(&self); - /// Stop the network - fn stop_network(&self); } /// Ethereum network protocol handler pub struct EthSync { - /// Shared blockchain client. TODO: this should evetually become an IPC endpoint - chain: Arc, - /// Sync strategy - sync: RwLock, - /// IO communication chnnel. - io_channel: RwLock>, + /// Network service + network: NetworkService, + /// Protocol handler + handler: Arc, } pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn new(config: SyncConfig, chain: Arc) -> Arc { - let sync = ChainSync::new(config, chain.deref()); - Arc::new(EthSync { - chain: chain, - sync: RwLock::new(sync), - io_channel: RwLock::new(IoChannel::disconnected()), - }) - } + pub fn new(config: SyncConfig, chain: Arc, network_config: NetworkConfiguration) -> Result, UtilError> { + let chain_sync = ChainSync::new(config, chain.deref()); + let service = try!(NetworkService::new(network_config)); + let sync = Arc::new(EthSync{ + network: service, + handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }), + }); - /// Register protocol with the network service - pub fn register(service: &NetworkService, sync: Arc) -> Result<(), NetworkError> { - service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]) - } - - /// Stop sync - pub fn stop(&mut self, io: &mut NetworkContext) { - self.sync.unwrapped_write().abort(&mut NetSyncIo::new(io, self.chain.deref())); - } - - /// Restart sync - pub fn restart(&mut self, io: &mut NetworkContext) { - self.sync.unwrapped_write().restart(&mut NetSyncIo::new(io, self.chain.deref())); + Ok(sync) } } impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> SyncStatus { - self.sync.unwrapped_read().status() - } - - fn start_network(&self) { - self.io_channel.unwrapped_read().send(NetworkIoMessage::User(SyncMessage::StartNetwork)) - .unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); - } - - fn stop_network(&self) { - self.io_channel.unwrapped_read().send(NetworkIoMessage::User(SyncMessage::StopNetwork)) - .unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); + self.handler.sync.unwrapped_read().status() } } -impl NetworkProtocolHandler for EthSync { - fn initialize(&self, io: &NetworkContext) { +struct SyncProtocolHandler { + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + chain: Arc, + /// Sync strategy + sync: RwLock, +} + +impl NetworkProtocolHandler for SyncProtocolHandler { + fn initialize(&self, io: &NetworkContext) { io.register_timer(0, 1000).expect("Error registering sync timer"); - *self.io_channel.unwrapped_write() = io.io_channel(); } - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()), *peer, packet_id, data); } - fn connected(&self, io: &NetworkContext, peer: &PeerId) { + fn connected(&self, io: &NetworkContext, peer: &PeerId) { self.sync.unwrapped_write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { self.sync.unwrapped_write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { self.sync.unwrapped_write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); self.sync.unwrapped_write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); } +} - #[cfg_attr(feature="dev", allow(single_match))] - fn message(&self, io: &NetworkContext, message: &SyncMessage) { - match *message { - SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted, ref sealed } => { - let mut sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.unwrapped_write().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, sealed); - }, - _ => {/* Ignore other messages */}, - } +impl ChainNotify for EthSync { + fn new_blocks(&self, + imported: Vec, + invalid: Vec, + enacted: Vec, + retracted: Vec, + sealed: Vec) + { + self.network.with_context(ETH_PROTOCOL, |context| { + let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); + self.handler.sync.unwrapped_write().chain_new_blocks( + &mut sync_io, + &imported, + &invalid, + &enacted, + &retracted, + &sealed); + }); + } + + fn start(&self) { + self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); + self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) + .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); + } + + fn stop(&self) { + self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); + } +} + +/// Trait for managing network +pub trait ManageNetwork : Send + Sync { + /// Set mode for reserved peers (allow/deny peers that are unreserved) + fn set_non_reserved_mode(&self, mode: ::util::network::NonReservedPeerMode); + /// Remove reservation for the peer + fn remove_reserved_peer(&self, peer: &str) -> Result<(), String>; + /// Add reserved peer + fn add_reserved_peer(&self, peer: &str) -> Result<(), String>; + /// Start network + fn start_network(&self); + /// Stop network + fn stop_network(&self); + /// Query the current configuration of the network + fn network_config(&self) -> NetworkConfiguration; +} + +impl ManageNetwork for EthSync { + fn set_non_reserved_mode(&self, mode: ::util::network::NonReservedPeerMode) { + self.network.set_non_reserved_mode(mode); + } + + fn remove_reserved_peer(&self, peer: &str) -> Result<(), String> { + self.network.remove_reserved_peer(peer).map_err(|e| format!("{:?}", e)) + } + + fn add_reserved_peer(&self, peer: &str) -> Result<(), String> { + self.network.add_reserved_peer(peer).map_err(|e| format!("{:?}", e)) + } + + fn start_network(&self) { + self.start(); + } + + fn stop_network(&self) { + self.network.with_context(ETH_PROTOCOL, |context| { + let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); + self.handler.sync.unwrapped_write().abort(&mut sync_io); + }); + self.stop(); + } + + fn network_config(&self) -> NetworkConfiguration { + self.network.config().clone() } } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index e96cea288..701437ab3 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -136,11 +136,11 @@ pub type ProtocolId = &'static str; /// Messages used to communitate with the event loop from other threads. #[derive(Clone)] -pub enum NetworkIoMessage where Message: Send + Sync + Clone { +pub enum NetworkIoMessage { /// Register a new protocol handler. AddHandler { /// Handler shared instance. - handler: Arc + Sync>, + handler: Arc, /// Protocol Id. protocol: ProtocolId, /// Supported protocol versions. @@ -163,8 +163,6 @@ pub enum NetworkIoMessage where Message: Send + Sync + Clone { DisablePeer(PeerId), /// Network has been started with the host as the given enode. NetworkStarted(String), - /// User message - User(Message), } /// Local (temporary) peer session ID. @@ -188,8 +186,8 @@ impl Encodable for CapabilityInfo { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, 's { - io: &'s IoContext>, +pub struct NetworkContext<'s> { + io: &'s IoContext, protocol: ProtocolId, sessions: Arc>>, session: Option, @@ -197,12 +195,12 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta _reserved_peers: &'s HashSet, } -impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { +impl<'s> NetworkContext<'s> { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. - fn new(io: &'s IoContext>, + fn new(io: &'s IoContext, protocol: ProtocolId, session: Option, sessions: Arc>>, - reserved_peers: &'s HashSet) -> NetworkContext<'s, Message> { + reserved_peers: &'s HashSet) -> NetworkContext<'s> { let id = session.as_ref().map(|s| s.locked().token()); NetworkContext { io: io, @@ -238,13 +236,8 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.send(self.session_id.unwrap(), packet_id, data) } - /// Send an IO message - pub fn message(&self, msg: Message) -> Result<(), UtilError> { - self.io.message(NetworkIoMessage::User(msg)) - } - /// Get an IoChannel. - pub fn io_channel(&self) -> IoChannel> { + pub fn io_channel(&self) -> IoChannel { self.io.channel() } @@ -333,13 +326,13 @@ struct ProtocolTimer { } /// Root IO handler. Manages protocol handlers, IO timers and network connections. -pub struct Host where Message: Send + Sync + Clone { +pub struct Host { pub info: RwLock, tcp_listener: Mutex, sessions: Arc>>, discovery: Mutex>, nodes: RwLock, - handlers: RwLock>>>, + handlers: RwLock>>, timers: RwLock>, timer_counter: RwLock, stats: Arc, @@ -348,9 +341,9 @@ pub struct Host where Message: Send + Sync + Clone { stopping: AtomicBool, } -impl Host where Message: Send + Sync + Clone { +impl Host { /// Create a new instance - pub fn new(config: NetworkConfiguration, stats: Arc) -> Result, UtilError> { + pub fn new(config: NetworkConfiguration, stats: Arc) -> Result { trace!(target: "host", "Creating new Host object"); let mut listen_address = match config.listen_address { @@ -381,7 +374,7 @@ impl Host where Message: Send + Sync + Clone { let boot_nodes = config.boot_nodes.clone(); let reserved_nodes = config.reserved_nodes.clone(); - let mut host = Host:: { + let mut host = Host { info: RwLock::new(HostInfo { keys: keys, config: config, @@ -444,7 +437,7 @@ impl Host where Message: Send + Sync + Clone { Ok(()) } - pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext>) { + pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext) { let mut info = self.info.unwrapped_write(); if info.config.non_reserved_mode != mode { @@ -495,7 +488,7 @@ impl Host where Message: Send + Sync + Clone { r } - pub fn stop(&self, io: &IoContext>) -> Result<(), UtilError> { + pub fn stop(&self, io: &IoContext) -> Result<(), UtilError> { self.stopping.store(true, AtomicOrdering::Release); let mut to_kill = Vec::new(); for e in self.sessions.unwrapped_write().iter_mut() { @@ -511,7 +504,7 @@ impl Host where Message: Send + Sync + Clone { Ok(()) } - fn init_public_interface(&self, io: &IoContext>) -> Result<(), UtilError> { + fn init_public_interface(&self, io: &IoContext) -> Result<(), UtilError> { if self.info.unwrapped_read().public_endpoint.is_some() { return Ok(()); } @@ -567,7 +560,7 @@ impl Host where Message: Send + Sync + Clone { Ok(()) } - fn maintain_network(&self, io: &IoContext>) { + fn maintain_network(&self, io: &IoContext) { self.keep_alive(io); self.connect_peers(io); } @@ -588,7 +581,7 @@ impl Host where Message: Send + Sync + Clone { self.sessions.unwrapped_read().count() - self.session_count() } - fn keep_alive(&self, io: &IoContext>) { + fn keep_alive(&self, io: &IoContext) { let mut to_kill = Vec::new(); for e in self.sessions.unwrapped_write().iter_mut() { let mut s = e.locked(); @@ -603,7 +596,7 @@ impl Host where Message: Send + Sync + Clone { } } - fn connect_peers(&self, io: &IoContext>) { + fn connect_peers(&self, io: &IoContext) { let (ideal_peers, mut pin) = { let info = self.info.unwrapped_read(); if info.capabilities.is_empty() { @@ -651,7 +644,7 @@ impl Host where Message: Send + Sync + Clone { } #[cfg_attr(feature="dev", allow(single_match))] - fn connect_peer(&self, id: &NodeId, io: &IoContext>) { + fn connect_peer(&self, id: &NodeId, io: &IoContext) { if self.have_session(id) { trace!(target: "network", "Aborted connect. Node already connected."); @@ -688,9 +681,10 @@ impl Host where Message: Send + Sync + Clone { } #[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))] - fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext>) -> Result<(), UtilError> { + fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext) -> Result<(), UtilError> { let nonce = self.info.unwrapped_write().next_nonce(); let mut sessions = self.sessions.unwrapped_write(); + let token = sessions.insert_with_opt(|token| { match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.unwrapped_read()) { Ok(s) => Some(Arc::new(Mutex::new(s))), @@ -710,7 +704,7 @@ impl Host where Message: Send + Sync + Clone { } } - fn accept(&self, io: &IoContext>) { + fn accept(&self, io: &IoContext) { trace!(target: "network", "Accepting incoming connection"); loop { let socket = match self.tcp_listener.locked().accept() { @@ -727,8 +721,9 @@ impl Host where Message: Send + Sync + Clone { } } - fn session_writable(&self, token: StreamToken, io: &IoContext>) { + fn session_writable(&self, token: StreamToken, io: &IoContext) { let session = { self.sessions.unwrapped_read().get(token).cloned() }; + if let Some(session) = session { let mut s = session.locked(); if let Err(e) = s.writable(io, &self.info.unwrapped_read()) { @@ -740,13 +735,13 @@ impl Host where Message: Send + Sync + Clone { } } - fn connection_closed(&self, token: TimerToken, io: &IoContext>) { + fn connection_closed(&self, token: TimerToken, io: &IoContext) { trace!(target: "network", "Connection closed: {}", token); self.kill_connection(token, io, true); } #[cfg_attr(feature="dev", allow(collapsible_if))] - fn session_readable(&self, token: StreamToken, io: &IoContext>) { + fn session_readable(&self, token: StreamToken, io: &IoContext) { let mut ready_data: Vec = Vec::new(); let mut packet_data: Vec<(ProtocolId, PacketId, Vec)> = Vec::new(); let mut kill = false; @@ -831,12 +826,12 @@ impl Host where Message: Send + Sync + Clone { } } - fn connection_timeout(&self, token: StreamToken, io: &IoContext>) { + fn connection_timeout(&self, token: StreamToken, io: &IoContext) { trace!(target: "network", "Connection timeout: {}", token); self.kill_connection(token, io, true) } - fn kill_connection(&self, token: StreamToken, io: &IoContext>, remote: bool) { + fn kill_connection(&self, token: StreamToken, io: &IoContext, remote: bool) { let mut to_disconnect: Vec = Vec::new(); let mut failure_id = None; let mut deregister = false; @@ -876,7 +871,7 @@ impl Host where Message: Send + Sync + Clone { } } - fn update_nodes(&self, io: &IoContext>, node_changes: TableUpdates) { + fn update_nodes(&self, io: &IoContext, node_changes: TableUpdates) { let mut to_remove: Vec = Vec::new(); { let sessions = self.sessions.unwrapped_write(); @@ -895,17 +890,24 @@ impl Host where Message: Send + Sync + Clone { } self.nodes.unwrapped_write().update(node_changes); } + + pub fn with_context(&self, protocol: ProtocolId, io: &IoContext, action: F) where F: Fn(&NetworkContext) { + let reserved = { self.reserved_nodes.unwrapped_read() }; + + let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); + action(&context); + } } -impl IoHandler> for Host where Message: Send + Sync + Clone + 'static { +impl IoHandler for Host { /// Initialize networking - fn initialize(&self, io: &IoContext>) { + fn initialize(&self, io: &IoContext) { io.register_timer(IDLE, MAINTENANCE_TIMEOUT).expect("Error registering Network idle timer"); io.message(NetworkIoMessage::InitPublicInterface).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); self.maintain_network(io) } - fn stream_hup(&self, io: &IoContext>, stream: StreamToken) { + fn stream_hup(&self, io: &IoContext, stream: StreamToken) { trace!(target: "network", "Hup: {}", stream); match stream { FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io), @@ -913,7 +915,7 @@ impl IoHandler> for Host where Messa }; } - fn stream_readable(&self, io: &IoContext>, stream: StreamToken) { + fn stream_readable(&self, io: &IoContext, stream: StreamToken) { if self.stopping.load(AtomicOrdering::Acquire) { return; } @@ -930,7 +932,7 @@ impl IoHandler> for Host where Messa } } - fn stream_writable(&self, io: &IoContext>, stream: StreamToken) { + fn stream_writable(&self, io: &IoContext, stream: StreamToken) { if self.stopping.load(AtomicOrdering::Acquire) { return; } @@ -943,7 +945,7 @@ impl IoHandler> for Host where Messa } } - fn timeout(&self, io: &IoContext>, token: TimerToken) { + fn timeout(&self, io: &IoContext, token: TimerToken) { if self.stopping.load(AtomicOrdering::Acquire) { return; } @@ -978,7 +980,7 @@ impl IoHandler> for Host where Messa } } - fn message(&self, io: &IoContext>, message: &NetworkIoMessage) { + fn message(&self, io: &IoContext, message: &NetworkIoMessage) { if self.stopping.load(AtomicOrdering::Acquire) { return; } @@ -1031,19 +1033,13 @@ impl IoHandler> for Host where Messa trace!(target: "network", "Disabling peer {}", peer); self.kill_connection(*peer, io, false); }, - NetworkIoMessage::User(ref message) => { - let reserved = self.reserved_nodes.unwrapped_read(); - for (p, h) in self.handlers.unwrapped_read().iter() { - h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message); - } - }, NetworkIoMessage::InitPublicInterface => self.init_public_interface(io).unwrap_or_else(|e| warn!("Error initializing public interface: {:?}", e)), _ => {} // ignore others. } } - fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>>) { + fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>) { match stream { FIRST_SESSION ... LAST_SESSION => { let session = { self.sessions.unwrapped_read().get(stream).cloned() }; @@ -1057,7 +1053,7 @@ impl IoHandler> for Host where Messa } } - fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop>>) { + fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop>) { match stream { FIRST_SESSION ... LAST_SESSION => { let mut connections = self.sessions.unwrapped_write(); @@ -1071,7 +1067,7 @@ impl IoHandler> for Host where Messa } } - fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>>) { + fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>) { match stream { FIRST_SESSION ... LAST_SESSION => { let connection = { self.sessions.unwrapped_read().get(stream).cloned() }; @@ -1152,6 +1148,6 @@ fn host_client_url() { let mut config = NetworkConfiguration::new(); let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2"); config.use_secret = Some(key); - let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap(); + let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap(); assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@")); } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index d59ab63b1..95ebbf928 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -24,39 +24,30 @@ //! //! struct MyHandler; //! -//! #[derive(Clone)] -//! struct MyMessage { -//! data: u32 -//! } -//! -//! impl NetworkProtocolHandler for MyHandler { -//! fn initialize(&self, io: &NetworkContext) { +//! impl NetworkProtocolHandler for MyHandler { +//! fn initialize(&self, io: &NetworkContext) { //! io.register_timer(0, 1000); //! } //! -//! fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { +//! fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { //! println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); //! } //! -//! fn connected(&self, io: &NetworkContext, peer: &PeerId) { +//! fn connected(&self, io: &NetworkContext, peer: &PeerId) { //! println!("Connected {}", peer); //! } //! -//! fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { +//! fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { //! println!("Disconnected {}", peer); //! } //! -//! fn timeout(&self, io: &NetworkContext, timer: TimerToken) { +//! fn timeout(&self, io: &NetworkContext, timer: TimerToken) { //! println!("Timeout {}", timer); //! } -//! -//! fn message(&self, io: &NetworkContext, message: &MyMessage) { -//! println!("Message {}", message.data); -//! } //! } //! //! fn main () { -//! let mut service = NetworkService::::new(NetworkConfiguration::new_local()).expect("Error creating network service"); +//! let mut service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service"); //! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); //! service.start().expect("Error starting service"); //! @@ -84,7 +75,6 @@ pub use network::host::PacketId; pub use network::host::NetworkContext; pub use network::service::NetworkService; pub use network::host::NetworkIoMessage; -pub use network::host::NetworkIoMessage::User as UserMessage; pub use network::error::NetworkError; pub use network::host::NetworkConfiguration; pub use network::stats::NetworkStats; @@ -97,19 +87,17 @@ const PROTOCOL_VERSION: u32 = 4; /// Network IO protocol handler. This needs to be implemented for each new subprotocol. /// All the handler function are called from within IO event loop. /// `Message` is the type for message data. -pub trait NetworkProtocolHandler: Sync + Send where Message: Send + Sync + Clone { +pub trait NetworkProtocolHandler: Sync + Send { /// Initialize the handler - fn initialize(&self, _io: &NetworkContext) {} + fn initialize(&self, _io: &NetworkContext) {} /// Called when new network packet received. - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. - fn connected(&self, io: &NetworkContext, peer: &PeerId); + fn connected(&self, io: &NetworkContext, peer: &PeerId); /// Called when a previously connected peer disconnects. - fn disconnected(&self, io: &NetworkContext, peer: &PeerId); + fn disconnected(&self, io: &NetworkContext, peer: &PeerId); /// Timer function called after a timeout created with `NetworkContext::timeout`. - fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {} - /// Called when a broadcasted message is received. The message can only be sent from a different IO handler. - fn message(&self, _io: &NetworkContext, _message: &Message) {} + fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {} } /// Non-reserved peer modes. @@ -130,4 +118,4 @@ impl NonReservedPeerMode { _ => None, } } -} \ No newline at end of file +} diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 50084abb4..ece8324c7 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -20,30 +20,30 @@ use panics::*; use misc::RwLockable; use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::NetworkError; -use network::host::{Host, NetworkIoMessage, ProtocolId}; +use network::host::{Host, NetworkContext, NetworkIoMessage, ProtocolId}; use network::stats::NetworkStats; use io::*; /// IO Service with networking /// `Message` defines a notification data type. -pub struct NetworkService where Message: Send + Sync + Clone + 'static { - io_service: IoService>, +pub struct NetworkService { + io_service: IoService, host_info: String, - host: RwLock>>>, + host: RwLock>>, stats: Arc, panic_handler: Arc, config: NetworkConfiguration, } -impl NetworkService where Message: Send + Sync + Clone + 'static { +impl NetworkService { /// Starts IO event loop - pub fn new(config: NetworkConfiguration) -> Result, UtilError> { + pub fn new(config: NetworkConfiguration) -> Result { let panic_handler = PanicHandler::new_in_arc(); - let io_service = try!(IoService::>::start()); + let io_service = try!(IoService::::start()); panic_handler.forward_from(&io_service); let stats = Arc::new(NetworkStats::new()); - let host_info = Host::::client_version(); + let host_info = Host::client_version(); Ok(NetworkService { io_service: io_service, host_info: host_info, @@ -55,7 +55,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&self, handler: Arc+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + pub fn register_protocol(&self, handler: Arc, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { try!(self.io_service.send_message(NetworkIoMessage::AddHandler { handler: handler, protocol: protocol, @@ -70,7 +70,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat } /// Returns underlying io service. - pub fn io(&self) -> &IoService> { + pub fn io(&self) -> &IoService { &self.io_service } @@ -146,9 +146,18 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat host.set_non_reserved_mode(mode, &io_ctxt); } } + + /// Executes action in the network context + pub fn with_context(&self, protocol: ProtocolId, action: F) where F: Fn(&NetworkContext) { + let io = IoContext::new(self.io_service.channel(), 0); + let host = self.host.unwrapped_read(); + if let Some(ref host) = host.as_ref() { + host.with_context(protocol, &io, action); + }; + } } -impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { +impl MayPanic for NetworkService { fn on_panic(&self, closure: F) where F: OnPanicListener { self.panic_handler.on_panic(closure); } diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index 85591b462..b428a0a8e 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -30,22 +30,17 @@ pub struct TestProtocol { pub got_disconnect: AtomicBool, } -#[derive(Clone)] -pub struct TestProtocolMessage { - payload: u32, -} - impl TestProtocol { pub fn new(drop_session: bool) -> Self { - TestProtocol { - packet: Mutex::new(Vec::new()), - got_timeout: AtomicBool::new(false), - got_disconnect: AtomicBool::new(false), + TestProtocol { + packet: Mutex::new(Vec::new()), + got_timeout: AtomicBool::new(false), + got_disconnect: AtomicBool::new(false), drop_session: drop_session, } } /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc { + pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc { let handler = Arc::new(TestProtocol::new(drop_session)); service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler"); handler @@ -64,17 +59,17 @@ impl TestProtocol { } } -impl NetworkProtocolHandler for TestProtocol { - fn initialize(&self, io: &NetworkContext) { +impl NetworkProtocolHandler for TestProtocol { + fn initialize(&self, io: &NetworkContext) { io.register_timer(0, 10).unwrap(); } - fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { + fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { assert_eq!(packet_id, 33); self.packet.locked().extend(data); } - fn connected(&self, io: &NetworkContext, peer: &PeerId) { + fn connected(&self, io: &NetworkContext, peer: &PeerId) { assert!(io.peer_info(*peer).contains("Parity")); if self.drop_session { io.disconnect_peer(*peer) @@ -83,13 +78,12 @@ impl NetworkProtocolHandler for TestProtocol { } } - fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { + fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { self.got_disconnect.store(true, AtomicOrdering::Relaxed); } /// Timer function called after a timeout created with `NetworkContext::timeout`. - fn timeout(&self, io: &NetworkContext, timer: TimerToken) { - io.message(TestProtocolMessage { payload: 22 }).unwrap(); + fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { assert_eq!(timer, 0); self.got_timeout.store(true, AtomicOrdering::Relaxed); } @@ -98,7 +92,7 @@ impl NetworkProtocolHandler for TestProtocol { #[test] fn net_service() { - let service = NetworkService::::new(NetworkConfiguration::new_local()).expect("Error creating network service"); + let service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service"); service.start().unwrap(); service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap(); } @@ -110,13 +104,13 @@ fn net_connect() { let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::::new(config1).unwrap(); + let mut service1 = NetworkService::new(config1).unwrap(); service1.start().unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let mut config2 = NetworkConfiguration::new_local(); info!("net_connect: local URL: {}", service1.local_url().unwrap()); config2.boot_nodes = vec![ service1.local_url().unwrap() ]; - let mut service2 = NetworkService::::new(config2).unwrap(); + let mut service2 = NetworkService::new(config2).unwrap(); service2.start().unwrap(); let handler2 = TestProtocol::register(&mut service2, false); while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) { @@ -129,7 +123,7 @@ fn net_connect() { #[test] fn net_start_stop() { let config = NetworkConfiguration::new_local(); - let service = NetworkService::::new(config).unwrap(); + let service = NetworkService::new(config).unwrap(); service.start().unwrap(); service.stop().unwrap(); service.start().unwrap(); @@ -141,12 +135,12 @@ fn net_disconnect() { let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::::new(config1).unwrap(); + let mut service1 = NetworkService::new(config1).unwrap(); service1.start().unwrap(); let handler1 = TestProtocol::register(&mut service1, false); let mut config2 = NetworkConfiguration::new_local(); config2.boot_nodes = vec![ service1.local_url().unwrap() ]; - let mut service2 = NetworkService::::new(config2).unwrap(); + let mut service2 = NetworkService::new(config2).unwrap(); service2.start().unwrap(); let handler2 = TestProtocol::register(&mut service2, true); while !(handler1.got_disconnect() && handler2.got_disconnect()) { @@ -159,7 +153,7 @@ fn net_disconnect() { #[test] fn net_timeout() { let config = NetworkConfiguration::new_local(); - let mut service = NetworkService::::new(config).unwrap(); + let mut service = NetworkService::new(config).unwrap(); service.start().unwrap(); let handler = TestProtocol::register(&mut service, false); while !handler.got_timeout() {