Merge pull request #3776 from ethcore/p2p-event-ordering
network: process packets only after connection handler finishes
This commit is contained in:
commit
d427d60156
@ -868,6 +868,12 @@ impl Host {
|
|||||||
let reserved = self.reserved_nodes.read();
|
let reserved = self.reserved_nodes.read();
|
||||||
if let Some(h) = handlers.get(&p).clone() {
|
if let Some(h) = handlers.get(&p).clone() {
|
||||||
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
|
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
|
||||||
|
|
||||||
|
// accumulate pending packets.
|
||||||
|
if let Some(session) = session.as_ref() {
|
||||||
|
let mut session = session.lock();
|
||||||
|
packet_data.extend(session.mark_connected(p));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (p, packet_id, data) in packet_data {
|
for (p, packet_id, data) in packet_data {
|
||||||
|
@ -18,6 +18,8 @@ use std::{str, io};
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use mio::*;
|
use mio::*;
|
||||||
use mio::deprecated::{Handler, EventLoop};
|
use mio::deprecated::{Handler, EventLoop};
|
||||||
use mio::tcp::*;
|
use mio::tcp::*;
|
||||||
@ -36,6 +38,14 @@ use time;
|
|||||||
const PING_TIMEOUT_SEC: u64 = 15;
|
const PING_TIMEOUT_SEC: u64 = 15;
|
||||||
const PING_INTERVAL_SEC: u64 = 30;
|
const PING_INTERVAL_SEC: u64 = 30;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
enum ProtocolState {
|
||||||
|
// Packets pending protocol on_connect event return.
|
||||||
|
Pending(Vec<(Vec<u8>, u8)>),
|
||||||
|
// Protocol connected.
|
||||||
|
Connected,
|
||||||
|
}
|
||||||
|
|
||||||
/// Peer session over encrypted connection.
|
/// Peer session over encrypted connection.
|
||||||
/// When created waits for Hello packet exchange and signals ready state.
|
/// When created waits for Hello packet exchange and signals ready state.
|
||||||
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
|
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
|
||||||
@ -49,6 +59,8 @@ pub struct Session {
|
|||||||
ping_time_ns: u64,
|
ping_time_ns: u64,
|
||||||
pong_time_ns: Option<u64>,
|
pong_time_ns: Option<u64>,
|
||||||
state: State,
|
state: State,
|
||||||
|
// Protocol states -- accumulates pending packets until signaled as ready.
|
||||||
|
protocol_states: HashMap<ProtocolId, ProtocolState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State {
|
enum State {
|
||||||
@ -186,6 +198,7 @@ impl Session {
|
|||||||
ping_time_ns: 0,
|
ping_time_ns: 0,
|
||||||
pong_time_ns: None,
|
pong_time_ns: None,
|
||||||
expired: false,
|
expired: false,
|
||||||
|
protocol_states: HashMap::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -361,6 +374,20 @@ impl Session {
|
|||||||
self.connection().token()
|
self.connection().token()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Signal that a subprotocol has handled the connection successfully and
|
||||||
|
/// get all pending packets in order received.
|
||||||
|
pub fn mark_connected(&mut self, protocol: ProtocolId) -> Vec<(ProtocolId, u8, Vec<u8>)> {
|
||||||
|
match self.protocol_states.insert(protocol, ProtocolState::Connected) {
|
||||||
|
None => Vec::new(),
|
||||||
|
Some(ProtocolState::Connected) => {
|
||||||
|
debug!(target: "network", "Protocol {:?} marked as connected more than once", protocol);
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
Some(ProtocolState::Pending(pending)) =>
|
||||||
|
pending.into_iter().map(|(data, id)| (protocol, id, data)).collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError>
|
fn read_packet<Message>(&mut self, io: &IoContext<Message>, packet: Packet, host: &HostInfo) -> Result<SessionData, NetworkError>
|
||||||
where Message: Send + Sync + Clone {
|
where Message: Send + Sync + Clone {
|
||||||
if packet.data.len() < 2 {
|
if packet.data.len() < 2 {
|
||||||
@ -409,8 +436,19 @@ impl Session {
|
|||||||
// map to protocol
|
// map to protocol
|
||||||
let protocol = self.info.capabilities[i].protocol;
|
let protocol = self.info.capabilities[i].protocol;
|
||||||
let pid = packet_id - self.info.capabilities[i].id_offset;
|
let pid = packet_id - self.info.capabilities[i].id_offset;
|
||||||
|
|
||||||
|
match *self.protocol_states.entry(protocol).or_insert_with(|| ProtocolState::Pending(Vec::new())) {
|
||||||
|
ProtocolState::Connected => {
|
||||||
trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, pid, i, self.info.capabilities);
|
trace!(target: "network", "Packet {} mapped to {:?}:{}, i={}, capabilities={:?}", packet_id, protocol, pid, i, self.info.capabilities);
|
||||||
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
|
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
|
||||||
|
}
|
||||||
|
ProtocolState::Pending(ref mut pending) => {
|
||||||
|
trace!(target: "network", "Packet {} deferred until protocol connection event completion", packet_id);
|
||||||
|
pending.push((packet.data, packet_id));
|
||||||
|
|
||||||
|
Ok(SessionData::Continue)
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
debug!(target: "network", "Unknown packet: {:?}", packet_id);
|
debug!(target: "network", "Unknown packet: {:?}", packet_id);
|
||||||
|
Loading…
Reference in New Issue
Block a user