Network tracing improvements
This commit is contained in:
@@ -170,29 +170,37 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta
|
||||
io: &'s IoContext<NetworkIoMessage<Message>>,
|
||||
protocol: ProtocolId,
|
||||
sessions: Arc<RwLock<Slab<SharedSession>>>,
|
||||
session: Option<StreamToken>,
|
||||
session: Option<SharedSession>,
|
||||
session_id: Option<StreamToken>,
|
||||
}
|
||||
|
||||
impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
|
||||
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
|
||||
fn new(io: &'s IoContext<NetworkIoMessage<Message>>,
|
||||
protocol: ProtocolId,
|
||||
session: Option<StreamToken>, sessions: Arc<RwLock<Slab<SharedSession>>>) -> NetworkContext<'s, Message> {
|
||||
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>) -> NetworkContext<'s, Message> {
|
||||
let id = session.as_ref().map(|s| s.lock().unwrap().token());
|
||||
NetworkContext {
|
||||
io: io,
|
||||
protocol: protocol,
|
||||
session_id: id,
|
||||
session: session,
|
||||
sessions: sessions,
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
|
||||
match self.session_id {
|
||||
Some(id) if id == peer => self.session.clone(),
|
||||
_ => self.sessions.read().unwrap().get(peer).cloned(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a packet over the network to another peer.
|
||||
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||
let session = { self.sessions.read().unwrap().get(peer).cloned() };
|
||||
let session = self.resolve_session(peer);
|
||||
if let Some(session) = session {
|
||||
session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
|
||||
warn!(target: "network", "Send error: {:?}", e);
|
||||
}); //TODO: don't copy vector data
|
||||
try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data));
|
||||
try!(self.io.update_registration(peer));
|
||||
} else {
|
||||
trace!(target: "network", "Send: Peer no longer exist")
|
||||
@@ -200,14 +208,10 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Respond to a current network message. Panics if no there is no packet in the context.
|
||||
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
|
||||
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||
match self.session {
|
||||
Some(session) => self.send(session, packet_id, data),
|
||||
None => {
|
||||
panic!("Respond: Session does not exist")
|
||||
}
|
||||
}
|
||||
assert!(self.session.is_some(), "Respond called without network context");
|
||||
self.send(self.session_id.unwrap(), packet_id, data)
|
||||
}
|
||||
|
||||
/// Send an IO message
|
||||
@@ -215,7 +219,6 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
||||
self.io.message(NetworkIoMessage::User(msg));
|
||||
}
|
||||
|
||||
|
||||
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
|
||||
pub fn disable_peer(&self, peer: PeerId) {
|
||||
//TODO: remove capability, disconnect if no capabilities left
|
||||
@@ -239,7 +242,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
||||
|
||||
/// Returns peer identification string
|
||||
pub fn peer_info(&self, peer: PeerId) -> String {
|
||||
let session = { self.sessions.read().unwrap().get(peer).cloned() };
|
||||
let session = self.resolve_session(peer);
|
||||
if let Some(session) = session {
|
||||
return session.lock().unwrap().info.client_version.clone()
|
||||
}
|
||||
@@ -624,7 +627,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
|
||||
let mut kill = false;
|
||||
let session = { self.sessions.read().unwrap().get(token).cloned() };
|
||||
if let Some(session) = session {
|
||||
if let Some(session) = session.clone() {
|
||||
let mut s = session.lock().unwrap();
|
||||
match s.readable(io, &self.info.read().unwrap()) {
|
||||
Err(e) => {
|
||||
@@ -656,11 +659,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
}
|
||||
for p in ready_data {
|
||||
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
||||
h.connected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token);
|
||||
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token);
|
||||
}
|
||||
if let Some((p, packet_id, data)) = packet_data {
|
||||
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
||||
h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]);
|
||||
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]);
|
||||
}
|
||||
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
|
||||
}
|
||||
@@ -718,6 +721,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
|
||||
let mut failure_id = None;
|
||||
let mut deregister = false;
|
||||
let mut expired_session = None;
|
||||
match token {
|
||||
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
|
||||
let handshakes = self.handshakes.write().unwrap();
|
||||
@@ -733,6 +737,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
FIRST_SESSION ... LAST_SESSION => {
|
||||
let sessions = self.sessions.write().unwrap();
|
||||
if let Some(session) = sessions.get(token).cloned() {
|
||||
expired_session = Some(session.clone());
|
||||
let mut s = session.lock().unwrap();
|
||||
if !s.expired() {
|
||||
if s.is_ready() {
|
||||
@@ -757,7 +762,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
}
|
||||
for p in to_disconnect {
|
||||
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
||||
h.disconnected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token);
|
||||
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token);
|
||||
}
|
||||
if deregister {
|
||||
io.deregister_stream(token).expect("Error deregistering stream");
|
||||
|
||||
Reference in New Issue
Block a user