Zombie connections
This commit is contained in:
		
							parent
							
								
									69df91de68
								
							
						
					
					
						commit
						b1bfd00875
					
				| @ -1 +1 @@ | |||||||
| Subproject commit 3116f85a499ceaf4dfdc46726060fc056e2d7829 | Subproject commit f32954b3ddb5af2dc3dc9ec6d9a28bee848fdf70 | ||||||
| @ -181,7 +181,7 @@ impl Connection { | |||||||
| 			socket: try!(self.socket.try_clone()), | 			socket: try!(self.socket.try_clone()), | ||||||
| 			rec_buf: Vec::new(), | 			rec_buf: Vec::new(), | ||||||
| 			rec_size: 0, | 			rec_size: 0, | ||||||
| 			send_queue: VecDeque::new(), | 			send_queue: self.send_queue.clone(), | ||||||
| 			interest: EventSet::hup() | EventSet::readable(), | 			interest: EventSet::hup() | EventSet::readable(), | ||||||
| 			stats: self.stats.clone(), | 			stats: self.stats.clone(), | ||||||
| 		}) | 		}) | ||||||
| @ -190,10 +190,10 @@ impl Connection { | |||||||
| 	/// Register this connection with the IO event loop.
 | 	/// Register this connection with the IO event loop.
 | ||||||
| 	pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> { | 	pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> { | ||||||
| 		trace!(target: "net", "connection register; token={:?}", reg); | 		trace!(target: "net", "connection register; token={:?}", reg); | ||||||
| 		event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { | 		if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) { | ||||||
| 			debug!("Failed to register {:?}, {:?}", reg, e); | 			debug!("Failed to register {:?}, {:?}", reg, e); | ||||||
| 			Ok(()) | 		} | ||||||
| 		}) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Update connection registration. Should be called at the end of the IO handler.
 | 	/// Update connection registration. Should be called at the end of the IO handler.
 | ||||||
|  | |||||||
| @ -63,7 +63,9 @@ pub struct Handshake { | |||||||
| 	/// A copy of received encryped auth packet 
 | 	/// A copy of received encryped auth packet 
 | ||||||
| 	pub auth_cipher: Bytes, | 	pub auth_cipher: Bytes, | ||||||
| 	/// A copy of received encryped ack packet 
 | 	/// A copy of received encryped ack packet 
 | ||||||
| 	pub ack_cipher: Bytes | 	pub ack_cipher: Bytes, | ||||||
|  | 	/// This Handshake is marked for deleteion flag
 | ||||||
|  | 	pub expired: bool, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| const AUTH_PACKET_SIZE: usize = 307; | const AUTH_PACKET_SIZE: usize = 307; | ||||||
| @ -84,6 +86,7 @@ impl Handshake { | |||||||
| 			remote_nonce: H256::new(), | 			remote_nonce: H256::new(), | ||||||
| 			auth_cipher: Bytes::new(), | 			auth_cipher: Bytes::new(), | ||||||
| 			ack_cipher: Bytes::new(), | 			ack_cipher: Bytes::new(), | ||||||
|  | 			expired: false, | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -97,6 +100,16 @@ impl Handshake { | |||||||
| 		self.connection.token() | 		self.connection.token() | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/// Mark this handshake as inactive to be deleted lated.
 | ||||||
|  | 	pub fn set_expired(&mut self) { | ||||||
|  | 		self.expired = true; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/// Check if this handshake is expired.
 | ||||||
|  | 	pub fn expired(&self) -> bool { | ||||||
|  | 		self.expired | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	/// Start a handhsake
 | 	/// Start a handhsake
 | ||||||
| 	pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), UtilError> where Message: Send + Clone{ | 	pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), UtilError> where Message: Send + Clone{ | ||||||
| 		self.originated = originated; | 		self.originated = originated; | ||||||
| @ -118,47 +131,55 @@ impl Handshake { | |||||||
| 
 | 
 | ||||||
| 	/// Readable IO handler. Drives the state change.
 | 	/// Readable IO handler. Drives the state change.
 | ||||||
| 	pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { | 	pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { | ||||||
| 		io.clear_timer(self.connection.token).unwrap(); | 		if !self.expired() { | ||||||
| 		match self.state { | 			io.clear_timer(self.connection.token).unwrap(); | ||||||
| 			HandshakeState::ReadingAuth => { | 			match self.state { | ||||||
| 				if let Some(data) = try!(self.connection.readable()) { | 				HandshakeState::ReadingAuth => { | ||||||
| 					try!(self.read_auth(host, &data)); | 					if let Some(data) = try!(self.connection.readable()) { | ||||||
| 					try!(self.write_ack()); | 						try!(self.read_auth(host, &data)); | ||||||
| 				}; | 						try!(self.write_ack()); | ||||||
| 			}, | 					}; | ||||||
| 			HandshakeState::ReadingAck => { | 				}, | ||||||
| 				if let Some(data) = try!(self.connection.readable()) { | 				HandshakeState::ReadingAck => { | ||||||
| 					try!(self.read_ack(host, &data)); | 					if let Some(data) = try!(self.connection.readable()) { | ||||||
| 					self.state = HandshakeState::StartSession; | 						try!(self.read_ack(host, &data)); | ||||||
| 				}; | 						self.state = HandshakeState::StartSession; | ||||||
| 			}, | 					}; | ||||||
| 			HandshakeState::StartSession => {}, | 				}, | ||||||
| 			_ => { panic!("Unexpected state"); } | 				HandshakeState::StartSession => {}, | ||||||
| 		} | 				_ => { panic!("Unexpected state"); } | ||||||
| 		if self.state != HandshakeState::StartSession { | 			} | ||||||
| 			try!(io.update_registration(self.connection.token)); | 			if self.state != HandshakeState::StartSession { | ||||||
|  | 				try!(io.update_registration(self.connection.token)); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Writabe IO handler.
 | 	/// Writabe IO handler.
 | ||||||
| 	pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { | 	pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { | ||||||
| 		io.clear_timer(self.connection.token).unwrap(); | 		if !self.expired() { | ||||||
| 		try!(self.connection.writable()); | 			io.clear_timer(self.connection.token).unwrap(); | ||||||
| 		if self.state != HandshakeState::StartSession { | 			try!(self.connection.writable()); | ||||||
| 			io.update_registration(self.connection.token).unwrap(); | 			if self.state != HandshakeState::StartSession { | ||||||
|  | 				io.update_registration(self.connection.token).unwrap(); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Register the socket with the event loop
 | 	/// Register the socket with the event loop
 | ||||||
| 	pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | 	pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | ||||||
| 		try!(self.connection.register_socket(reg, event_loop)); | 		if !self.expired() { | ||||||
|  | 			try!(self.connection.register_socket(reg, event_loop)); | ||||||
|  | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pub fn update_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | 	pub fn update_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | ||||||
| 		try!(self.connection.update_socket(reg, event_loop)); | 		if !self.expired() { | ||||||
|  | 			try!(self.connection.update_socket(reg, event_loop)); | ||||||
|  | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -190,11 +190,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone | |||||||
| 		let session = { self.sessions.read().unwrap().get(peer).cloned() }; | 		let session = { self.sessions.read().unwrap().get(peer).cloned() }; | ||||||
| 		if let Some(session) = session { | 		if let Some(session) = session { | ||||||
| 			session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { | 			session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { | ||||||
| 						warn!(target: "net", "Send error: {:?}", e); | 						warn!(target: "network", "Send error: {:?}", e); | ||||||
| 					}); //TODO: don't copy vector data
 | 					}); //TODO: don't copy vector data
 | ||||||
| 			try!(self.io.update_registration(peer)); | 			try!(self.io.update_registration(peer)); | ||||||
| 		} else  { | 		} else  { | ||||||
| 			trace!(target: "net", "Send: Peer no longer exist") | 			trace!(target: "network", "Send: Peer no longer exist") | ||||||
| 		} | 		} | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| @ -470,18 +470,18 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 			.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) { | 			.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) { | ||||||
| 			self.connect_peer(&id, io); | 			self.connect_peer(&id, io); | ||||||
| 		} | 		} | ||||||
| 		debug!(target: "net", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count()); | 		debug!(target: "network", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count()); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	#[cfg_attr(feature="dev", allow(single_match))] | 	#[cfg_attr(feature="dev", allow(single_match))] | ||||||
| 	fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) { | 	fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) { | ||||||
| 		if self.have_session(id) | 		if self.have_session(id) | ||||||
| 		{ | 		{ | ||||||
| 			trace!("Aborted connect. Node already connected."); | 			trace!(target: "network", "Aborted connect. Node already connected."); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 		if self.connecting_to(id) { | 		if self.connecting_to(id) { | ||||||
| 			trace!("Aborted connect. Node already connecting."); | 			trace!(target: "network", "Aborted connect. Node already connecting."); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| @ -493,7 +493,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 					node.endpoint.address | 					node.endpoint.address | ||||||
| 				} | 				} | ||||||
| 				else { | 				else { | ||||||
| 					debug!("Connection to expired node aborted"); | 					debug!(target: "network", "Connection to expired node aborted"); | ||||||
| 					return; | 					return; | ||||||
| 				} | 				} | ||||||
| 			}; | 			}; | ||||||
| @ -515,16 +515,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 		if handshakes.insert_with(|token| { | 		if handshakes.insert_with(|token| { | ||||||
| 			let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake"); | 			let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake"); | ||||||
| 			handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { | 			handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { | ||||||
| 				debug!(target: "net", "Handshake create error: {:?}", e); | 				debug!(target: "network", "Handshake create error: {:?}", e); | ||||||
| 			}); | 			}); | ||||||
| 			Arc::new(Mutex::new(handshake)) | 			Arc::new(Mutex::new(handshake)) | ||||||
| 		}).is_none() { | 		}).is_none() { | ||||||
| 			debug!("Max handshakes reached"); | 			debug!(target: "network", "Max handshakes reached"); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) { | 	fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) { | ||||||
| 		trace!(target: "net", "accept"); | 		trace!(target: "network", "Accepting incoming connection"); | ||||||
| 		loop { | 		loop { | ||||||
| 			let socket = match self.tcp_listener.lock().unwrap().accept() { | 			let socket = match self.tcp_listener.lock().unwrap().accept() { | ||||||
| 				Ok(None) => break, | 				Ok(None) => break, | ||||||
| @ -544,7 +544,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 		if let Some(handshake) = handshake { | 		if let Some(handshake) = handshake { | ||||||
| 			let mut h = handshake.lock().unwrap(); | 			let mut h = handshake.lock().unwrap(); | ||||||
| 			if let Err(e) = h.writable(io, &self.info.read().unwrap()) { | 			if let Err(e) = h.writable(io, &self.info.read().unwrap()) { | ||||||
| 				trace!(target: "net", "Handshake write error: {}: {:?}", token, e); | 				trace!(target: "network", "Handshake write error: {}: {:?}", token, e); | ||||||
| 			} | 			} | ||||||
| 		} 
 | 		} 
 | ||||||
| 	} | 	} | ||||||
| @ -554,9 +554,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 		if let Some(session) = session { | 		if let Some(session) = session { | ||||||
| 			let mut s = session.lock().unwrap(); | 			let mut s = session.lock().unwrap(); | ||||||
| 			if let Err(e) = s.writable(io, &self.info.read().unwrap()) { | 			if let Err(e) = s.writable(io, &self.info.read().unwrap()) { | ||||||
| 				trace!(target: "net", "Session write error: {}: {:?}", token, e); | 				trace!(target: "network", "Session write error: {}: {:?}", token, e); | ||||||
| 			} | 			} | ||||||
| 			io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); | 			io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e)); | ||||||
| 		} 
 | 		} 
 | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -571,7 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 		if let Some(handshake) = handshake { | 		if let Some(handshake) = handshake { | ||||||
| 			let mut h = handshake.lock().unwrap(); | 			let mut h = handshake.lock().unwrap(); | ||||||
| 			if let Err(e) = h.readable(io, &self.info.read().unwrap()) { | 			if let Err(e) = h.readable(io, &self.info.read().unwrap()) { | ||||||
| 				debug!(target: "net", "Handshake read error: {}: {:?}", token, e); | 				debug!(target: "network", "Handshake read error: {}: {:?}", token, e); | ||||||
| 				kill = true; | 				kill = true; | ||||||
| 			} | 			} | ||||||
| 			if h.done() { | 			if h.done() { | ||||||
| @ -585,7 +585,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 			self.start_session(token, io); | 			self.start_session(token, io); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 		io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Token registration error: {:?}", e)); | 		io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e)); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { | 	fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { | ||||||
| @ -597,7 +597,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 			let mut s = session.lock().unwrap(); | 			let mut s = session.lock().unwrap(); | ||||||
| 			match s.readable(io, &self.info.read().unwrap()) { | 			match s.readable(io, &self.info.read().unwrap()) { | ||||||
| 				Err(e) => { | 				Err(e) => { | ||||||
| 					debug!(target: "net", "Session read error: {}: {:?}", token, e); | 					debug!(target: "network", "Session read error: {}: {:?}", token, e); | ||||||
| 					kill = true; | 					kill = true; | ||||||
| 				}, | 				}, | ||||||
| 				Ok(SessionData::Ready) => { | 				Ok(SessionData::Ready) => { | ||||||
| @ -613,7 +613,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 					packet_id, | 					packet_id, | ||||||
| 				}) => { | 				}) => { | ||||||
| 					match self.handlers.read().unwrap().get(protocol) { | 					match self.handlers.read().unwrap().get(protocol) { | ||||||
| 						None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, | 						None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) }, | ||||||
| 						Some(_) => packet_data = Some((protocol, packet_id, data)), | 						Some(_) => packet_data = Some((protocol, packet_id, data)), | ||||||
| 					} | 					} | ||||||
| 				}, | 				}, | ||||||
| @ -631,7 +631,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 			let h = self.handlers.read().unwrap().get(p).unwrap().clone(); | 			let h = self.handlers.read().unwrap().get(p).unwrap().clone(); | ||||||
| 			h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]); | 			h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]); | ||||||
| 		} | 		} | ||||||
| 		io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Token registration error: {:?}", e)); | 		io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e)); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { | 	fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) { | ||||||
| @ -643,19 +643,24 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 		// turn a handshake into a session
 | 		// turn a handshake into a session
 | ||||||
| 		let mut sessions = self.sessions.write().unwrap(); | 		let mut sessions = self.sessions.write().unwrap(); | ||||||
| 		let mut h = handshakes.get_mut(token).unwrap().lock().unwrap(); | 		let mut h = handshakes.get_mut(token).unwrap().lock().unwrap(); | ||||||
|  | 		if h.expired { | ||||||
|  | 			return; | ||||||
|  | 		} | ||||||
| 		let originated = h.originated; | 		let originated = h.originated; | ||||||
| 		let mut session = match Session::new(&mut h, &self.info.read().unwrap()) { | 		let mut session = match Session::new(&mut h, &self.info.read().unwrap()) { | ||||||
| 			Ok(s) => s, | 			Ok(s) => s, | ||||||
| 			Err(e) => { | 			Err(e) => { | ||||||
| 				debug!("Session creation error: {:?}", e); | 				debug!(target: "network", "Session creation error: {:?}", e); | ||||||
| 				return; | 				return; | ||||||
| 			} | 			} | ||||||
| 		}; | 		}; | ||||||
| 		let result = sessions.insert_with(move |session_token| { | 		let result = sessions.insert_with(move |session_token| { | ||||||
| 			session.set_token(session_token); | 			session.set_token(session_token); | ||||||
| 			io.deregister_stream(token).expect("Error deleting handshake registration"); | 			io.deregister_stream(token).expect("Error deleting handshake registration"); | ||||||
|  | 			h.set_expired(); | ||||||
| 			io.register_stream(session_token).expect("Error creating session registration"); | 			io.register_stream(session_token).expect("Error creating session registration"); | ||||||
| 			self.stats.inc_sessions(); | 			self.stats.inc_sessions(); | ||||||
|  | 			trace!(target: "network", "Creating session {} -> {}", token, session_token); | ||||||
| 			if !originated { | 			if !originated { | ||||||
| 				// Add it no node table
 | 				// Add it no node table
 | ||||||
| 				if let Ok(address) = session.remote_addr() { | 				if let Ok(address) = session.remote_addr() { | ||||||
| @ -684,26 +689,34 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone { | |||||||
| 			FIRST_HANDSHAKE ... LAST_HANDSHAKE => { | 			FIRST_HANDSHAKE ... LAST_HANDSHAKE => { | ||||||
| 				let handshakes = self.handshakes.write().unwrap(); | 				let handshakes = self.handshakes.write().unwrap(); | ||||||
| 				if let Some(handshake) = handshakes.get(token).cloned() { | 				if let Some(handshake) = handshakes.get(token).cloned() { | ||||||
| 					failure_id = Some(handshake.lock().unwrap().id().clone()); | 					let mut handshake = handshake.lock().unwrap(); | ||||||
|  | 					if !handshake.expired() { | ||||||
|  | 						handshake.set_expired(); | ||||||
|  | 						failure_id = Some(handshake.id().clone()); | ||||||
|  | 						io.deregister_stream(token).expect("Error deregistering stream"); | ||||||
|  | 					} | ||||||
| 				} | 				} | ||||||
| 			}, | 			}, | ||||||
| 			FIRST_SESSION ... LAST_SESSION => { | 			FIRST_SESSION ... LAST_SESSION => { | ||||||
| 				let sessions = self.sessions.write().unwrap(); | 				let sessions = self.sessions.write().unwrap(); | ||||||
| 				if let Some(session) = sessions.get(token).cloned() { | 				if let Some(session) = sessions.get(token).cloned() { | ||||||
| 					let s = session.lock().unwrap(); | 					let mut s = session.lock().unwrap(); | ||||||
| 					if s.is_ready() { | 					if !s.expired() { | ||||||
| 						for (p, _) in self.handlers.read().unwrap().iter() { | 						if s.is_ready() { | ||||||
| 							if s.have_capability(p)  { | 							for (p, _) in self.handlers.read().unwrap().iter() { | ||||||
| 								to_disconnect.push(p); | 								if s.have_capability(p)  { | ||||||
|  | 									to_disconnect.push(p); | ||||||
|  | 								} | ||||||
| 							} | 							} | ||||||
| 						} | 						} | ||||||
|  | 						s.set_expired(); | ||||||
|  | 						failure_id = Some(s.id().clone()); | ||||||
|  | 						io.deregister_stream(token).expect("Error deregistering stream"); | ||||||
| 					} | 					} | ||||||
| 					failure_id = Some(s.id().clone()); |  | ||||||
| 				} | 				} | ||||||
| 			}, | 			}, | ||||||
| 			_ => {}, | 			_ => {}, | ||||||
| 		} | 		} | ||||||
| 		io.deregister_stream(token).expect("Error deregistering stream"); |  | ||||||
| 		if let Some(id) = failure_id { | 		if let Some(id) = failure_id { | ||||||
| 			if remote { | 			if remote { | ||||||
| 				self.nodes.write().unwrap().note_failure(&id); | 				self.nodes.write().unwrap().note_failure(&id); | ||||||
| @ -758,11 +771,11 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn stream_hup(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) { | 	fn stream_hup(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) { | ||||||
| 		trace!(target: "net", "Hup: {}", stream); | 		trace!(target: "network", "Hup: {}", stream); | ||||||
| 		match stream { | 		match stream { | ||||||
| 			FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io), | 			FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io), | ||||||
| 			FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_closed(stream, io), | 			FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_closed(stream, io), | ||||||
| 			_ => warn!(target: "net", "Unexpected hup"), | 			_ => warn!(target: "network", "Unexpected hup"), | ||||||
| 		}; | 		}; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -810,7 +823,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa | |||||||
| 			}, | 			}, | ||||||
| 			_ => match self.timers.read().unwrap().get(&token).cloned() { | 			_ => match self.timers.read().unwrap().get(&token).cloned() { | ||||||
| 				Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { | 				Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { | ||||||
| 						None => { warn!(target: "net", "No handler found for protocol: {:?}", timer.protocol) }, | 						None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, | ||||||
| 						Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } | 						Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } | ||||||
| 				}, | 				}, | ||||||
| 				None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
 | 				None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
 | ||||||
|  | |||||||
| @ -41,6 +41,8 @@ pub struct Session { | |||||||
| 	connection: EncryptedConnection, | 	connection: EncryptedConnection, | ||||||
| 	/// Session ready flag. Set after successfull Hello packet exchange
 | 	/// Session ready flag. Set after successfull Hello packet exchange
 | ||||||
| 	had_hello: bool, | 	had_hello: bool, | ||||||
|  | 	/// Session is no longer active flag.
 | ||||||
|  | 	expired: bool, | ||||||
| 	ping_time_ns: u64, | 	ping_time_ns: u64, | ||||||
| 	pong_time_ns: Option<u64>, | 	pong_time_ns: Option<u64>, | ||||||
| } | } | ||||||
| @ -125,6 +127,7 @@ impl Session { | |||||||
| 			}, | 			}, | ||||||
| 			ping_time_ns: 0, | 			ping_time_ns: 0, | ||||||
| 			pong_time_ns: None, | 			pong_time_ns: None, | ||||||
|  | 			expired: false, | ||||||
| 		}; | 		}; | ||||||
| 		try!(session.write_hello(host)); | 		try!(session.write_hello(host)); | ||||||
| 		try!(session.send_ping()); | 		try!(session.send_ping()); | ||||||
| @ -141,6 +144,16 @@ impl Session { | |||||||
| 		self.had_hello | 		self.had_hello | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/// Mark this session as inactive to be deleted lated.
 | ||||||
|  | 	pub fn set_expired(&mut self) { | ||||||
|  | 		self.expired = true; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/// Check if this session is expired.
 | ||||||
|  | 	pub fn expired(&self) -> bool { | ||||||
|  | 		self.expired | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	/// Replace socket token 
 | 	/// Replace socket token 
 | ||||||
| 	pub fn set_token(&mut self, token: StreamToken) { | 	pub fn set_token(&mut self, token: StreamToken) { | ||||||
| 		self.connection.set_token(token); | 		self.connection.set_token(token); | ||||||
| @ -153,6 +166,9 @@ impl Session { | |||||||
| 
 | 
 | ||||||
| 	/// Readable IO handler. Returns packet data if available.
 | 	/// Readable IO handler. Returns packet data if available.
 | ||||||
| 	pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<SessionData, UtilError>  where Message: Send + Sync + Clone { | 	pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<SessionData, UtilError>  where Message: Send + Sync + Clone { | ||||||
|  | 		if self.expired() { | ||||||
|  | 			return Ok(SessionData::None) 
 | ||||||
|  | 		} | ||||||
| 		match try!(self.connection.readable(io)) { | 		match try!(self.connection.readable(io)) { | ||||||
| 			Some(data) => Ok(try!(self.read_packet(data, host))), | 			Some(data) => Ok(try!(self.read_packet(data, host))), | ||||||
| 			None => Ok(SessionData::None) | 			None => Ok(SessionData::None) | ||||||
| @ -161,6 +177,9 @@ impl Session { | |||||||
| 
 | 
 | ||||||
| 	/// Writable IO handler. Sends pending packets.
 | 	/// Writable IO handler. Sends pending packets.
 | ||||||
| 	pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone { | 	pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone { | ||||||
|  | 		if self.expired() { | ||||||
|  | 			return Ok(()) 
 | ||||||
|  | 		} | ||||||
| 		self.connection.writable(io) | 		self.connection.writable(io) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -171,12 +190,18 @@ impl Session { | |||||||
| 
 | 
 | ||||||
| 	/// Register the session socket with the event loop
 | 	/// Register the session socket with the event loop
 | ||||||
| 	pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | 	pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | ||||||
|  | 		if self.expired() { | ||||||
|  | 			return Ok(()); | ||||||
|  | 		} | ||||||
| 		try!(self.connection.register_socket(reg, event_loop)); | 		try!(self.connection.register_socket(reg, event_loop)); | ||||||
| 		Ok(()) | 		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Update registration with the event loop. Should be called at the end of the IO handler.
 | 	/// Update registration with the event loop. Should be called at the end of the IO handler.
 | ||||||
| 	pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | 	pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { | ||||||
|  | 		if self.expired() { | ||||||
|  | 			return Ok(()); | ||||||
|  | 		} | ||||||
| 		self.connection.update_socket(reg, event_loop) | 		self.connection.update_socket(reg, event_loop) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -74,7 +74,7 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn connected(&self, io: &NetworkContext<TestProtocolMessage>, peer: &PeerId) { | 	fn connected(&self, io: &NetworkContext<TestProtocolMessage>, peer: &PeerId) { | ||||||
| 		assert!(io.peer_info(*peer).contains("parity")); | 		assert!(io.peer_info(*peer).contains("Parity")); | ||||||
| 		if self.drop_session { | 		if self.drop_session { | ||||||
| 			io.disconnect_peer(*peer) | 			io.disconnect_peer(*peer) | ||||||
| 		} else { | 		} else { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user