Tests
This commit is contained in:
parent
67ffac1df9
commit
c340d8a34f
@ -156,7 +156,7 @@ mod tests {
|
|||||||
fn it_can_be_started() {
|
fn it_can_be_started() {
|
||||||
let spec = get_test_spec();
|
let spec = get_test_spec();
|
||||||
let temp_path = RandomTempPath::new();
|
let temp_path = RandomTempPath::new();
|
||||||
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()));
|
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()), false);
|
||||||
assert!(service.is_ok());
|
assert!(service.is_ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,7 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
|||||||
NetworkIoMessage::User(SyncMessage::StartNetwork) => {
|
NetworkIoMessage::User(SyncMessage::StartNetwork) => {
|
||||||
info!("Starting network");
|
info!("Starting network");
|
||||||
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
|
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
|
||||||
|
EthSync::register(&*self.network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e));
|
||||||
},
|
},
|
||||||
NetworkIoMessage::User(SyncMessage::StopNetwork) => {
|
NetworkIoMessage::User(SyncMessage::StopNetwork) => {
|
||||||
info!("Stopping network");
|
info!("Stopping network");
|
||||||
|
@ -79,7 +79,7 @@ use std::thread::sleep;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use rustc_serialize::hex::FromHex;
|
use rustc_serialize::hex::FromHex;
|
||||||
use ctrlc::CtrlC;
|
use ctrlc::CtrlC;
|
||||||
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes};
|
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError};
|
||||||
use util::panics::{MayPanic, ForwardPanic, PanicHandler};
|
use util::panics::{MayPanic, ForwardPanic, PanicHandler};
|
||||||
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path};
|
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path};
|
||||||
use ethcore::error::{Error, ImportError};
|
use ethcore::error::{Error, ImportError};
|
||||||
@ -208,7 +208,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
|
|||||||
let network_settings = Arc::new(conf.network_settings());
|
let network_settings = Arc::new(conf.network_settings());
|
||||||
|
|
||||||
// Sync
|
// Sync
|
||||||
let sync = EthSync::register(service.network().deref(), sync_config, client.clone());
|
let sync = EthSync::new(sync_config, client.clone());
|
||||||
|
EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into()));
|
||||||
|
|
||||||
let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
|
let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
|
||||||
signer_port: conf.signer_port(),
|
signer_port: conf.signer_port(),
|
||||||
|
@ -357,7 +357,7 @@ impl ChainSync {
|
|||||||
|
|
||||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||||
if io.is_expired() {
|
if io.is_expired() {
|
||||||
info!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,11 +41,13 @@
|
|||||||
//! use ethcore::miner::Miner;
|
//! use ethcore::miner::Miner;
|
||||||
//!
|
//!
|
||||||
//! fn main() {
|
//! fn main() {
|
||||||
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
|
//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap();
|
||||||
|
//! service.start().unwrap();
|
||||||
//! let dir = env::temp_dir();
|
//! let dir = env::temp_dir();
|
||||||
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap();
|
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap();
|
||||||
//! let miner = Miner::new(false, ethereum::new_frontier());
|
//! let miner = Miner::new(false, ethereum::new_frontier());
|
||||||
//! EthSync::register(&mut service, SyncConfig::default(), client);
|
//! let sync = EthSync::new(SyncConfig::default(), client);
|
||||||
|
//! EthSync::register(&mut service, sync);
|
||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
@ -69,7 +71,7 @@ use ethcore::client::Client;
|
|||||||
use ethcore::service::{SyncMessage, NetSyncMessage};
|
use ethcore::service::{SyncMessage, NetSyncMessage};
|
||||||
use io::NetSyncIo;
|
use io::NetSyncIo;
|
||||||
use util::io::IoChannel;
|
use util::io::IoChannel;
|
||||||
use util::NetworkIoMessage;
|
use util::{NetworkIoMessage, NetworkError};
|
||||||
use chain::ChainSync;
|
use chain::ChainSync;
|
||||||
|
|
||||||
mod chain;
|
mod chain;
|
||||||
@ -120,17 +122,21 @@ pub use self::chain::{SyncStatus, SyncState};
|
|||||||
|
|
||||||
impl EthSync {
|
impl EthSync {
|
||||||
/// Creates and register protocol with the network service
|
/// Creates and register protocol with the network service
|
||||||
pub fn register(service: &NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
|
pub fn new(config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
|
||||||
let sync = ChainSync::new(config, chain.deref());
|
let sync = ChainSync::new(config, chain.deref());
|
||||||
let sync = Arc::new(EthSync {
|
let sync = Arc::new(EthSync {
|
||||||
chain: chain,
|
chain: chain,
|
||||||
sync: RwLock::new(sync),
|
sync: RwLock::new(sync),
|
||||||
io_channel: RwLock::new(IoChannel::disconnected()),
|
io_channel: RwLock::new(IoChannel::disconnected()),
|
||||||
});
|
});
|
||||||
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
|
|
||||||
sync
|
sync
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register protocol with the network service
|
||||||
|
pub fn register(service: &NetworkService<SyncMessage>, sync: Arc<EthSync>) -> Result<(), NetworkError> {
|
||||||
|
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8])
|
||||||
|
}
|
||||||
|
|
||||||
/// Stop sync
|
/// Stop sync
|
||||||
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
|
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
|
||||||
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref()));
|
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||||
|
@ -43,6 +43,10 @@ impl<'p> SyncIo for TestIo<'p> {
|
|||||||
fn disconnect_peer(&mut self, _peer_id: PeerId) {
|
fn disconnect_peer(&mut self, _peer_id: PeerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_expired(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||||
self.queue.push_back(TestPacket {
|
self.queue.push_back(TestPacket {
|
||||||
data: data,
|
data: data,
|
||||||
|
@ -142,7 +142,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_service_register_handler () {
|
fn test_service_register_handler () {
|
||||||
let mut service = IoService::<MyMessage>::start().expect("Error creating network service");
|
let service = IoService::<MyMessage>::start().expect("Error creating network service");
|
||||||
service.register_handler(Arc::new(MyHandler)).unwrap();
|
service.register_handler(Arc::new(MyHandler)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
use std::thread::{self, JoinHandle};
|
use std::thread::{self, JoinHandle};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::ops::Deref;
|
|
||||||
use mio::*;
|
use mio::*;
|
||||||
use crossbeam::sync::chase_lev;
|
use crossbeam::sync::chase_lev;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
@ -37,13 +36,6 @@ pub type HandlerId = usize;
|
|||||||
pub const TOKENS_PER_HANDLER: usize = 16384;
|
pub const TOKENS_PER_HANDLER: usize = 16384;
|
||||||
const MAX_HANDLERS: usize = 8;
|
const MAX_HANDLERS: usize = 8;
|
||||||
|
|
||||||
fn compare_arcs<T: ?Sized>(a: Arc<T>, b: Arc<T>) -> bool {
|
|
||||||
let p1 = &*a as *const T;
|
|
||||||
let p2 = &*b as *const T;
|
|
||||||
info!("{:p} == {:p} : {}", p1, p2 , p1 == p2);
|
|
||||||
p1 == p2
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Messages used to communicate with the event loop from other threads.
|
/// Messages used to communicate with the event loop from other threads.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum IoMessage<Message> where Message: Send + Clone + Sized {
|
pub enum IoMessage<Message> where Message: Send + Clone + Sized {
|
||||||
@ -214,30 +206,31 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
|||||||
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
||||||
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
||||||
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
||||||
let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone();
|
if let Some(handler) = self.handlers.get(handler_index) {
|
||||||
|
if events.is_hup() {
|
||||||
if events.is_hup() {
|
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||||
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if events.is_readable() {
|
|
||||||
self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
|
||||||
}
|
}
|
||||||
if events.is_writable() {
|
else {
|
||||||
self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
if events.is_readable() {
|
||||||
|
self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||||
|
}
|
||||||
|
if events.is_writable() {
|
||||||
|
self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
self.work_ready.notify_all();
|
||||||
}
|
}
|
||||||
self.work_ready.notify_all();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
|
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
|
||||||
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
|
||||||
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
|
||||||
let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone();
|
if let Some(handler) = self.handlers.get(handler_index) {
|
||||||
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
|
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
|
||||||
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
|
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
|
||||||
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index });
|
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
|
||||||
self.work_ready.notify_all();
|
self.work_ready.notify_all();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,7 +247,6 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
|||||||
IoMessage::RemoveHandler { handler_id } => {
|
IoMessage::RemoveHandler { handler_id } => {
|
||||||
// TODO: flush event loop
|
// TODO: flush event loop
|
||||||
self.handlers.remove(handler_id);
|
self.handlers.remove(handler_id);
|
||||||
info!("{} left", self.handlers.count());
|
|
||||||
},
|
},
|
||||||
IoMessage::AddTimer { handler_id, token, delay } => {
|
IoMessage::AddTimer { handler_id, token, delay } => {
|
||||||
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
||||||
@ -268,21 +260,24 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
IoMessage::RegisterStream { handler_id, token } => {
|
IoMessage::RegisterStream { handler_id, token } => {
|
||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
if let Some(handler) = self.handlers.get(handler_id) {
|
||||||
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
IoMessage::DeregisterStream { handler_id, token } => {
|
IoMessage::DeregisterStream { handler_id, token } => {
|
||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
if let Some(handler) = self.handlers.get(handler_id) {
|
||||||
handler.deregister_stream(token, event_loop);
|
handler.deregister_stream(token, event_loop);
|
||||||
// unregister a timer associated with the token (if any)
|
// unregister a timer associated with the token (if any)
|
||||||
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
|
||||||
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
|
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
|
||||||
event_loop.clear_timeout(timer.timeout);
|
event_loop.clear_timeout(timer.timeout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
if let Some(handler) = self.handlers.get(handler_id) {
|
||||||
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
IoMessage::UserMessage(data) => {
|
IoMessage::UserMessage(data) => {
|
||||||
//TODO: better way to iterate the slab
|
//TODO: better way to iterate the slab
|
||||||
|
@ -333,6 +333,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
|
|||||||
stats: Arc<NetworkStats>,
|
stats: Arc<NetworkStats>,
|
||||||
pinned_nodes: Vec<NodeId>,
|
pinned_nodes: Vec<NodeId>,
|
||||||
num_sessions: AtomicUsize,
|
num_sessions: AtomicUsize,
|
||||||
|
stopping: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||||
@ -384,6 +385,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
stats: stats,
|
stats: stats,
|
||||||
pinned_nodes: Vec::new(),
|
pinned_nodes: Vec::new(),
|
||||||
num_sessions: AtomicUsize::new(0),
|
num_sessions: AtomicUsize::new(0),
|
||||||
|
stopping: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
|
|
||||||
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
|
||||||
@ -422,19 +424,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
|
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
|
||||||
|
self.stopping.store(true, AtomicOrdering::Release);
|
||||||
let mut to_kill = Vec::new();
|
let mut to_kill = Vec::new();
|
||||||
for e in self.sessions.write().unwrap().iter_mut() {
|
for e in self.sessions.write().unwrap().iter_mut() {
|
||||||
let mut s = e.lock().unwrap();
|
let mut s = e.lock().unwrap();
|
||||||
if !s.keep_alive(io) {
|
s.disconnect(io, DisconnectReason::ClientQuit);
|
||||||
s.disconnect(io, DisconnectReason::PingTimeout);
|
to_kill.push(s.token());
|
||||||
to_kill.push(s.token());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for p in to_kill {
|
for p in to_kill {
|
||||||
trace!(target: "network", "Ping timeout: {}", p);
|
trace!(target: "network", "Disconnecting on shutdown: {}", p);
|
||||||
self.kill_connection(p, io, true);
|
self.kill_connection(p, io, true);
|
||||||
}
|
}
|
||||||
io.unregister_handler();
|
try!(io.unregister_handler());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -790,13 +791,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<Message> Drop for Host<Message> where Message: Send + Sync + Clone {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
info!("Dropping host");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Message: Send + Sync + Clone + 'static {
|
impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Message: Send + Sync + Clone + 'static {
|
||||||
/// Initialize networking
|
/// Initialize networking
|
||||||
fn initialize(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn initialize(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
@ -814,6 +808,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn stream_readable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
|
fn stream_readable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
|
||||||
|
if self.stopping.load(AtomicOrdering::Acquire) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
@ -829,6 +826,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
|
fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
|
||||||
|
if self.stopping.load(AtomicOrdering::Acquire) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
@ -840,6 +840,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
|
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
|
||||||
|
if self.stopping.load(AtomicOrdering::Acquire) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
match token {
|
match token {
|
||||||
IDLE => self.maintain_network(io),
|
IDLE => self.maintain_network(io),
|
||||||
INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
|
INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
|
||||||
@ -870,6 +873,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn message(&self, io: &IoContext<NetworkIoMessage<Message>>, message: &NetworkIoMessage<Message>) {
|
fn message(&self, io: &IoContext<NetworkIoMessage<Message>>, message: &NetworkIoMessage<Message>) {
|
||||||
|
if self.stopping.load(AtomicOrdering::Acquire) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
match *message {
|
match *message {
|
||||||
NetworkIoMessage::AddHandler {
|
NetworkIoMessage::AddHandler {
|
||||||
ref handler,
|
ref handler,
|
||||||
@ -1031,6 +1037,6 @@ fn host_client_url() {
|
|||||||
let mut config = NetworkConfiguration::new();
|
let mut config = NetworkConfiguration::new();
|
||||||
let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2");
|
let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2");
|
||||||
config.use_secret = Some(key);
|
config.use_secret = Some(key);
|
||||||
let host: Host<u32> = Host::new(config).unwrap();
|
let host: Host<u32> = Host::new(config, Arc::new(NetworkStats::new())).unwrap();
|
||||||
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
|
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
|
||||||
}
|
}
|
||||||
|
@ -56,8 +56,9 @@
|
|||||||
//! }
|
//! }
|
||||||
//!
|
//!
|
||||||
//! fn main () {
|
//! fn main () {
|
||||||
//! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
|
//! let mut service = NetworkService::<MyMessage>::new(NetworkConfiguration::new_local()).expect("Error creating network service");
|
||||||
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
|
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
|
||||||
|
//! service.start().expect("Error starting service");
|
||||||
//!
|
//!
|
||||||
//! // Wait for quit condition
|
//! // Wait for quit condition
|
||||||
//! // ...
|
//! // ...
|
||||||
|
@ -107,7 +107,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
|
|||||||
if let Some(ref host) = *host {
|
if let Some(ref host) = *host {
|
||||||
info!("Unregistering handler");
|
info!("Unregistering handler");
|
||||||
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
|
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
|
||||||
host.stop(&io);
|
try!(host.stop(&io));
|
||||||
}
|
}
|
||||||
*host = None;
|
*host = None;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -97,7 +97,8 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn net_service() {
|
fn net_service() {
|
||||||
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
|
let service = NetworkService::<TestProtocolMessage>::new(NetworkConfiguration::new_local()).expect("Error creating network service");
|
||||||
|
service.start().unwrap();
|
||||||
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
|
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,12 +109,14 @@ fn net_connect() {
|
|||||||
let mut config1 = NetworkConfiguration::new_local();
|
let mut config1 = NetworkConfiguration::new_local();
|
||||||
config1.use_secret = Some(key1.secret().clone());
|
config1.use_secret = Some(key1.secret().clone());
|
||||||
config1.boot_nodes = vec![ ];
|
config1.boot_nodes = vec![ ];
|
||||||
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
|
let mut service1 = NetworkService::<TestProtocolMessage>::new(config1).unwrap();
|
||||||
|
service1.start().unwrap();
|
||||||
let handler1 = TestProtocol::register(&mut service1, false);
|
let handler1 = TestProtocol::register(&mut service1, false);
|
||||||
let mut config2 = NetworkConfiguration::new_local();
|
let mut config2 = NetworkConfiguration::new_local();
|
||||||
info!("net_connect: local URL: {}", service1.local_url());
|
info!("net_connect: local URL: {}", service1.local_url().unwrap());
|
||||||
config2.boot_nodes = vec![ service1.local_url() ];
|
config2.boot_nodes = vec![ service1.local_url().unwrap() ];
|
||||||
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
|
let mut service2 = NetworkService::<TestProtocolMessage>::new(config2).unwrap();
|
||||||
|
service2.start().unwrap();
|
||||||
let handler2 = TestProtocol::register(&mut service2, false);
|
let handler2 = TestProtocol::register(&mut service2, false);
|
||||||
while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) {
|
while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) {
|
||||||
thread::sleep(Duration::from_millis(50));
|
thread::sleep(Duration::from_millis(50));
|
||||||
@ -122,17 +125,28 @@ fn net_connect() {
|
|||||||
assert!(service2.stats().sessions() >= 1);
|
assert!(service2.stats().sessions() >= 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn net_start_stop() {
|
||||||
|
let config = NetworkConfiguration::new_local();
|
||||||
|
let service = NetworkService::<TestProtocolMessage>::new(config).unwrap();
|
||||||
|
service.start().unwrap();
|
||||||
|
service.stop().unwrap();
|
||||||
|
service.start().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn net_disconnect() {
|
fn net_disconnect() {
|
||||||
let key1 = KeyPair::create().unwrap();
|
let key1 = KeyPair::create().unwrap();
|
||||||
let mut config1 = NetworkConfiguration::new_local();
|
let mut config1 = NetworkConfiguration::new_local();
|
||||||
config1.use_secret = Some(key1.secret().clone());
|
config1.use_secret = Some(key1.secret().clone());
|
||||||
config1.boot_nodes = vec![ ];
|
config1.boot_nodes = vec![ ];
|
||||||
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
|
let mut service1 = NetworkService::<TestProtocolMessage>::new(config1).unwrap();
|
||||||
|
service1.start().unwrap();
|
||||||
let handler1 = TestProtocol::register(&mut service1, false);
|
let handler1 = TestProtocol::register(&mut service1, false);
|
||||||
let mut config2 = NetworkConfiguration::new_local();
|
let mut config2 = NetworkConfiguration::new_local();
|
||||||
config2.boot_nodes = vec![ service1.local_url() ];
|
config2.boot_nodes = vec![ service1.local_url().unwrap() ];
|
||||||
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
|
let mut service2 = NetworkService::<TestProtocolMessage>::new(config2).unwrap();
|
||||||
|
service2.start().unwrap();
|
||||||
let handler2 = TestProtocol::register(&mut service2, true);
|
let handler2 = TestProtocol::register(&mut service2, true);
|
||||||
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
|
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
|
||||||
thread::sleep(Duration::from_millis(50));
|
thread::sleep(Duration::from_millis(50));
|
||||||
@ -144,7 +158,8 @@ fn net_disconnect() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn net_timeout() {
|
fn net_timeout() {
|
||||||
let config = NetworkConfiguration::new_local();
|
let config = NetworkConfiguration::new_local();
|
||||||
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
|
let mut service = NetworkService::<TestProtocolMessage>::new(config).unwrap();
|
||||||
|
service.start().unwrap();
|
||||||
let handler = TestProtocol::register(&mut service, false);
|
let handler = TestProtocol::register(&mut service, false);
|
||||||
while !handler.got_timeout() {
|
while !handler.got_timeout() {
|
||||||
thread::sleep(Duration::from_millis(50));
|
thread::sleep(Duration::from_millis(50));
|
||||||
|
Loading…
Reference in New Issue
Block a user