Network shutdown

This commit is contained in:
arkpar
2016-06-17 12:58:28 +02:00
parent a03da30510
commit b38488dd07
12 changed files with 205 additions and 67 deletions

View File

@@ -17,10 +17,12 @@
use std::sync::*;
use std::thread::{self, JoinHandle};
use std::collections::HashMap;
use std::ops::Deref;
use mio::*;
use crossbeam::sync::chase_lev;
use slab::Slab;
use error::*;
use io::{IoError, IoHandler};
use crossbeam::sync::chase_lev;
use io::worker::{Worker, Work, WorkType};
use panics::*;
@@ -33,6 +35,14 @@ pub type HandlerId = usize;
/// Maximum number of tokens a handler can use
pub const TOKENS_PER_HANDLER: usize = 16384;
const MAX_HANDLERS: usize = 8;
fn compare_arcs<T: ?Sized>(a: Arc<T>, b: Arc<T>) -> bool {
let p1 = &*a as *const T;
let p2 = &*b as *const T;
info!("{:p} == {:p} : {}", p1, p2 , p1 == p2);
p1 == p2
}
/// Messages used to communicate with the event loop from other threads.
#[derive(Clone)]
@@ -43,6 +53,9 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
AddHandler {
handler: Arc<IoHandler<Message>+Send>,
},
RemoveHandler {
handler_id: HandlerId,
},
AddTimer {
handler_id: HandlerId,
token: TimerToken,
@@ -138,6 +151,15 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
pub fn channel(&self) -> IoChannel<Message> {
self.channel.clone()
}
/// Unregister current IO handler.
pub fn unregister_handler(&self) -> Result<(), IoError> {
try!(self.channel.send_io(IoMessage::RemoveHandler {
handler_id: self.handler,
}));
Ok(())
}
}
#[derive(Clone)]
@@ -149,7 +171,7 @@ struct UserTimer {
/// Root IO handler. Manages user handlers, messages and IO timers.
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Vec<Arc<IoHandler<Message>>>,
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
@@ -175,7 +197,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
let mut io = IoManager {
timers: Arc::new(RwLock::new(HashMap::new())),
handlers: Vec::new(),
handlers: Slab::new(MAX_HANDLERS),
worker_channel: worker,
workers: workers,
work_ready: work_ready,
@@ -192,10 +214,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if handler_index >= self.handlers.len() {
panic!("Unexpected stream token: {}", token.as_usize());
}
let handler = self.handlers[handler_index].clone();
let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone();
if events.is_hup() {
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
@@ -214,12 +233,9 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if handler_index >= self.handlers.len() {
panic!("Unexpected timer token: {}", token.as_usize());
}
let handler = self.handlers.get(handler_index).unwrap_or_else(|| panic!("Unexpected stream token: {}", token.as_usize())).clone();
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
let handler = self.handlers[handler_index].clone();
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index });
self.work_ready.notify_all();
}
@@ -232,12 +248,14 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
event_loop.shutdown();
},
IoMessage::AddHandler { handler } => {
let handler_id = {
self.handlers.push(handler.clone());
self.handlers.len() - 1
};
let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id));
},
IoMessage::RemoveHandler { handler_id } => {
// TODO: flush event loop
self.handlers.remove(handler_id);
info!("{} left", self.handlers.count());
},
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
@@ -267,9 +285,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
},
IoMessage::UserMessage(data) => {
for n in 0 .. self.handlers.len() {
let handler = self.handlers[n].clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: n });
//TODO: better way to iterate the slab
for id in 0 .. MAX_HANDLERS {
if let Some(h) = self.handlers.get(id) {
let handler = h.clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id });
}
}
self.work_ready.notify_all();
}
@@ -351,8 +372,8 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
})
}
/// Regiter a IO hadnler with the event loop.
pub fn register_handler(&mut self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
/// Regiter an IO handler with the event loop.
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::AddHandler {
handler: handler,
}));
@@ -360,13 +381,13 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
}
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
pub fn send_message(&mut self, message: Message) -> Result<(), IoError> {
pub fn send_message(&self, message: Message) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::UserMessage(message)));
Ok(())
}
/// Create a new message channel
pub fn channel(&mut self) -> IoChannel<Message> {
pub fn channel(&self) -> IoChannel<Message> {
IoChannel { channel: Some(self.host_channel.clone()) }
}
}

View File

@@ -49,7 +49,7 @@ const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)]
#[derive(Debug, Clone)]
/// Network service configuration
pub struct NetworkConfiguration {
/// Directory path to store network configuration. None means nothing will be saved
@@ -233,6 +233,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::User(msg));
}
/// Send an IO message
pub fn io_channel(&self) -> IoChannel<NetworkIoMessage<Message>> {
self.io.channel()
}
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
@@ -327,7 +332,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
impl<Message> Host<Message> where Message: Send + Sync + Clone {
/// Create a new instance
pub fn new(config: NetworkConfiguration) -> Result<Host<Message>, UtilError> {
pub fn new(config: NetworkConfiguration, stats: Arc<NetworkStats>) -> Result<Host<Message>, UtilError> {
let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
Some(addr) => addr,
@@ -371,7 +376,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(USER_TIMER),
stats: Arc::new(NetworkStats::default()),
stats: stats,
pinned_nodes: Vec::new(),
num_sessions: AtomicUsize::new(0),
};
@@ -383,10 +388,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Ok(host)
}
pub fn stats(&self) -> Arc<NetworkStats> {
self.stats.clone()
}
pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
@@ -401,8 +402,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
pub fn client_version(&self) -> String {
self.info.read().unwrap().client_version.clone()
pub fn client_version() -> String {
version()
}
pub fn external_url(&self) -> Option<String> {
@@ -415,6 +416,23 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
r
}
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
to_kill.push(s.token());
}
}
for p in to_kill {
trace!(target: "network", "Ping timeout: {}", p);
self.kill_connection(p, io, true);
}
io.unregister_handler();
Ok(())
}
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap();
if self.info.read().unwrap().public_endpoint.is_some() {
@@ -767,6 +785,13 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
impl<Message> Drop for Host<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
info!("Dropping host");
}
}
impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Message: Send + Sync + Clone + 'static {
/// Initialize networking
fn initialize(&self, io: &IoContext<NetworkIoMessage<Message>>) {
@@ -831,8 +856,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
},
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
},
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
}

View File

@@ -28,33 +28,33 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>,
host_info: String,
host: Arc<Host<Message>>,
host: RwLock<Option<Arc<Host<Message>>>>,
stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler>
panic_handler: Arc<PanicHandler>,
config: NetworkConfiguration,
}
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
pub fn new(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
let io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service);
let host = Arc::new(try!(Host::new(config)));
let stats = host.stats().clone();
let host_info = host.client_version();
try!(io_service.register_handler(host.clone()));
let stats = Arc::new(NetworkStats::new());
let host_info = Host::<Message>::client_version();
Ok(NetworkService {
io_service: io_service,
host_info: host_info,
stats: stats,
panic_handler: panic_handler,
host: host,
host: RwLock::new(None),
config: config,
})
}
/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&mut self, handler: Arc<NetworkProtocolHandler<Message>+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler<Message>+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
try!(self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler,
protocol: protocol,
@@ -69,8 +69,8 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
}
/// Returns underlying io service.
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
&mut self.io_service
pub fn io(&self) -> &IoService<NetworkIoMessage<Message>> {
&self.io_service
}
/// Returns network statistics.
@@ -80,12 +80,37 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
self.host.external_url()
let host = self.host.read().unwrap();
host.as_ref().and_then(|h| h.external_url())
}
/// Returns external url if available.
pub fn local_url(&self) -> String {
self.host.local_url()
pub fn local_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
host.as_ref().map(|h| h.local_url())
}
/// Start network IO
pub fn start(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
if host.is_none() {
let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone())));
try!(self.io_service.register_handler(h.clone()));
*host = Some(h);
}
Ok(())
}
/// Stop network IO
pub fn stop(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
if let Some(ref host) = *host {
info!("Unregistering handler");
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
host.stop(&io);
}
*host = None;
Ok(())
}
}

View File

@@ -65,7 +65,7 @@ impl NetworkStats {
self.sessions.load(Ordering::Relaxed)
}
#[cfg(test)]
/// Create a new empty instance.
pub fn new() -> NetworkStats {
NetworkStats {
recv: AtomicUsize::new(0),