network: process packets only after connection handler finishes
This commit is contained in:
		
							parent
							
								
									fd23a2972c
								
							
						
					
					
						commit
						b772901d77
					
				@ -868,6 +868,12 @@ impl Host {
 | 
				
			|||||||
			let reserved = self.reserved_nodes.read();
 | 
								let reserved = self.reserved_nodes.read();
 | 
				
			||||||
			if let Some(h) = handlers.get(&p).clone() {
 | 
								if let Some(h) = handlers.get(&p).clone() {
 | 
				
			||||||
				h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
 | 
									h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// accumulate pending packets.
 | 
				
			||||||
 | 
									if let Some(session) = session.as_ref() {
 | 
				
			||||||
 | 
										let mut session = session.lock();
 | 
				
			||||||
 | 
										packet_data.extend(session.mark_connected(p));
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		for (p, packet_id, data) in packet_data {
 | 
							for (p, packet_id, data) in packet_data {
 | 
				
			||||||
 | 
				
			|||||||
@ -18,6 +18,8 @@ use std::{str, io};
 | 
				
			|||||||
use std::net::SocketAddr;
 | 
					use std::net::SocketAddr;
 | 
				
			||||||
use std::cmp::Ordering;
 | 
					use std::cmp::Ordering;
 | 
				
			||||||
use std::sync::*;
 | 
					use std::sync::*;
 | 
				
			||||||
 | 
					use std::collections::HashMap;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use mio::*;
 | 
					use mio::*;
 | 
				
			||||||
use mio::deprecated::{Handler, EventLoop};
 | 
					use mio::deprecated::{Handler, EventLoop};
 | 
				
			||||||
use mio::tcp::*;
 | 
					use mio::tcp::*;
 | 
				
			||||||
@ -36,6 +38,14 @@ use time;
 | 
				
			|||||||
const PING_TIMEOUT_SEC: u64 = 15;
 | 
					const PING_TIMEOUT_SEC: u64 = 15;
 | 
				
			||||||
const PING_INTERVAL_SEC: u64 = 30;
 | 
					const PING_INTERVAL_SEC: u64 = 30;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Debug, Clone)]
 | 
				
			||||||
 | 
					enum ProtocolState {
 | 
				
			||||||
 | 
						// Packets pending protocol on_connect event return.
 | 
				
			||||||
 | 
						Pending(Vec<(Vec<u8>, u8)>),
 | 
				
			||||||
 | 
						// Protocol connected.
 | 
				
			||||||
 | 
						Connected,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// Peer session over encrypted connection.
 | 
					/// Peer session over encrypted connection.
 | 
				
			||||||
/// When created waits for Hello packet exchange and signals ready state.
 | 
					/// When created waits for Hello packet exchange and signals ready state.
 | 
				
			||||||
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
 | 
					/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
 | 
				
			||||||
@ -49,6 +59,8 @@ pub struct Session {
 | 
				
			|||||||
	ping_time_ns: u64,
 | 
						ping_time_ns: u64,
 | 
				
			||||||
	pong_time_ns: Option<u64>,
 | 
						pong_time_ns: Option<u64>,
 | 
				
			||||||
	state: State,
 | 
						state: State,
 | 
				
			||||||
 | 
						// Protocol states -- accumulates pending packets until signaled as ready.	
 | 
				
			||||||
 | 
						protocol_states: HashMap<ProtocolId, ProtocolState>,		
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
enum State {
 | 
					enum State {
 | 
				
			||||||
@ -186,6 +198,7 @@ impl Session {
 | 
				
			|||||||
			ping_time_ns: 0,
 | 
								ping_time_ns: 0,
 | 
				
			||||||
			pong_time_ns: None,
 | 
								pong_time_ns: None,
 | 
				
			||||||
			expired: false,
 | 
								expired: false,
 | 
				
			||||||
 | 
								protocol_states: HashMap::new(),			
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -361,6 +374,20 @@ impl Session {
 | 
				
			|||||||
		self.connection().token()
 | 
							self.connection().token()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/// Signal that a subprotocol has handled the connection successfully and 
 | 
				
			||||||
 | 
						/// get all pending packets in order received.
 | 
				
			||||||
 | 
						pub fn mark_connected(&mut self, protocol: ProtocolId) -> Vec<(ProtocolId, u8, Vec<u8>)> {
 | 
				
			||||||
 | 
							match self.protocol_states.insert(protocol, ProtocolState::Connected) {
 | 
				
			||||||
 | 
								None => Vec::new(),			
 | 
				
			||||||
 | 
								Some(ProtocolState::Connected) => {
 | 
				
			||||||
 | 
									debug!(target: "network", "Protocol {:?} marked as connected more than once", protocol);
 | 
				
			||||||
 | 
									Vec::new()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								Some(ProtocolState::Pending(pending)) => 
 | 
				
			||||||
 | 
									pending.into_iter().map(|(data, id)| (protocol, id, data)).collect(),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError>
 | 
						fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError>
 | 
				
			||||||
	where Message: Send + Sync + Clone {
 | 
						where Message: Send + Sync + Clone {
 | 
				
			||||||
		if packet.data.len() < 2 {
 | 
							if packet.data.len() < 2 {
 | 
				
			||||||
@ -409,8 +436,19 @@ impl Session {
 | 
				
			|||||||
				// map to protocol
 | 
									// map to protocol
 | 
				
			||||||
				let protocol = self.info.capabilities[i].protocol;
 | 
									let protocol = self.info.capabilities[i].protocol;
 | 
				
			||||||
				let pid = packet_id - self.info.capabilities[i].id_offset;
 | 
									let pid = packet_id - self.info.capabilities[i].id_offset;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									match *self.protocol_states.entry(protocol).or_insert_with(|| ProtocolState::Pending(Vec::new())) {
 | 
				
			||||||
 | 
										ProtocolState::Connected => {
 | 
				
			||||||
						trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, pid, i, self.info.capabilities);
 | 
											trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, pid, i, self.info.capabilities);
 | 
				
			||||||
						Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
 | 
											Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										ProtocolState::Pending(ref mut pending) => {
 | 
				
			||||||
 | 
											trace!(target: "network", "Packet {} deferred until protocol connection event completion", packet_id);
 | 
				
			||||||
 | 
											pending.push((packet.data, packet_id));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
											Ok(SessionData::Continue)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			_ => {
 | 
								_ => {
 | 
				
			||||||
				debug!(target: "network", "Unknown packet: {:?}", packet_id);
 | 
									debug!(target: "network", "Unknown packet: {:?}", packet_id);
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user