From acbb50d70092392409d45bc704c4169527569dd6 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 22 Jan 2016 18:13:59 +0100 Subject: [PATCH] Stream deregistration --- util/src/io/mod.rs | 2 ++ util/src/io/service.rs | 18 ++++++++++++++++++ util/src/network/connection.rs | 13 +++++++++++++ util/src/network/handshake.rs | 6 ++++++ util/src/network/host.rs | 24 ++++++++++++++++++++++-- util/src/network/session.rs | 5 +++++ 6 files changed, 66 insertions(+), 2 deletions(-) diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 48c02f6ee..1906e7438 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -74,6 +74,8 @@ pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + ' fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop>) {} /// Re-register a stream with the event loop fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop>) {} + /// Deregister a stream. Called whenstream is removed from event loop + fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop>) {} } /// TODO [arkpar] Please document me diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 8a1653056..9a3187f8e 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -42,6 +42,10 @@ pub enum IoMessage where Message: Send + Clone + Sized { handler_id: HandlerId, token: StreamToken, }, + DeregisterStream { + handler_id: HandlerId, + token: StreamToken, + }, UpdateStreamRegistration { handler_id: HandlerId, token: StreamToken, @@ -83,6 +87,7 @@ impl IoContext where Message: Send + Clone + 'static { })); Ok(()) } + /// Register a new IO stream. pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> { try!(self.channel.send_io(IoMessage::RegisterStream { @@ -92,6 +97,15 @@ impl IoContext where Message: Send + Clone + 'static { Ok(()) } + /// Deregister an IO stream. + pub fn deregister_stream(&self, token: StreamToken) -> Result<(), UtilError> { + try!(self.channel.send_io(IoMessage::DeregisterStream { + token: token, + handler_id: self.handler, + })); + Ok(()) + } + /// Reregister an IO stream. pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> { try!(self.channel.send_io(IoMessage::UpdateStreamRegistration { @@ -214,6 +228,10 @@ impl Handler for IoManager where Message: Send + Clone + Sync let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); }, + IoMessage::DeregisterStream { handler_id, token } => { + let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); + handler.deregister_stream(token, event_loop); + }, IoMessage::UpdateStreamRegistration { handler_id, token } => { let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 33cafd708..c4c8d29c6 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -150,6 +150,13 @@ impl Connection { Err(e) }) } + + /// Delete connection registration. Should be called at the end of the IO handler. + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> io::Result<()> { + trace!(target: "net", "connection deregister; token={:?}", self.token); + event_loop.deregister(&self.socket).ok(); // ignore errors here + Ok(()) + } } /// RLPx packet @@ -371,6 +378,12 @@ impl EncryptedConnection { try!(self.connection.update_socket(reg, event_loop)); Ok(()) } + + /// Delete connection registration. This should be called at the end of the event loop. + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + try!(self.connection.deregister_socket(event_loop)); + Ok(()) + } } #[test] diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index ddeab17b7..acac77d04 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -134,6 +134,12 @@ impl Handshake { Ok(()) } + /// Delete registration + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + try!(self.connection.deregister_socket(event_loop)); + Ok(()) + } + /// Parse, validate and confirm auth message fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 724f11ddf..559f69f49 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -285,7 +285,7 @@ impl Host where Message: Send + Sync + Clone { host.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR host.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG // ETH/DEV cpp-ethereum (poc-9.ethdev.com) - host.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303"); + //host.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303"); host } @@ -395,7 +395,8 @@ impl Host where Message: Send + Sync + Clone { }; let nonce = self.info.write().unwrap().next_nonce(); - if self.connections.write().unwrap().insert_with(|token| { + let mut connections = self.connections.write().unwrap(); + if connections.insert_with(|token| { let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"); handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { debug!(target: "net", "Handshake create error: {:?}", e); @@ -552,6 +553,7 @@ impl Host where Message: Send + Sync + Clone { _ => {}, } } + io.deregister_stream(token).expect("Error deregistering stream"); } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); @@ -664,6 +666,24 @@ impl IoHandler> for Host where Messa } } + fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop>>) { + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => { + let mut connections = self.connections.write().unwrap(); + if let Some(connection) = connections.get(stream).cloned() { + match *connection.lock().unwrap().deref() { + ConnectionEntry::Handshake(ref h) => h.deregister_socket(event_loop).expect("Error deregistering socket"), + ConnectionEntry::Session(ref s) => s.deregister_socket(event_loop).expect("Error deregistering session socket"), + } + connections.remove(stream); + } + }, + NODETABLE_RECEIVE => event_loop.deregister(self.udp_socket.lock().unwrap().deref()).unwrap(), + TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(), + _ => warn!("Unexpected stream deregistration") + } + } + fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>>) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => { diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 8f580f476..2817f008d 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -131,6 +131,11 @@ impl Session { self.connection.update_socket(reg, event_loop) } + /// Delete registration + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + self.connection.deregister_socket(event_loop) + } + /// Send a protocol packet to peer. pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { let mut i = 0usize;