diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 7ed8c3c18..d7bfe5284 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -207,6 +207,12 @@ pub struct EncryptedConnection { } impl EncryptedConnection { + + /// Get socket token + pub fn token(&self) -> StreamToken { + self.connection.token + } + /// Create an encrypted connection out of the handshake. Consumes a handshake object. pub fn new(mut handshake: Handshake) -> Result { let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public)); diff --git a/util/src/network/error.rs b/util/src/network/error.rs index f03e128e6..78f015c37 100644 --- a/util/src/network/error.rs +++ b/util/src/network/error.rs @@ -15,7 +15,7 @@ pub enum DisconnectReason _ClientQuit, _UnexpectedIdentity, _LocalIdentity, - _PingTimeout, + PingTimeout, } #[derive(Debug)] diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 7fe4937e4..01df8bc41 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -346,6 +346,7 @@ impl Host where Message: Send + Sync + Clone { } fn maintain_network(&self, io: &IoContext>) { + self.keep_alive(io); self.connect_peers(io); } @@ -357,6 +358,21 @@ impl Host where Message: Send + Sync + Clone { self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } + fn keep_alive(&self, io: &IoContext>) { + let mut to_kill = Vec::new(); + for e in self.connections.write().unwrap().iter_mut() { + if let ConnectionEntry::Session(ref mut s) = *e.lock().unwrap().deref_mut() { + if !s.keep_alive() { + s.disconnect(DisconnectReason::PingTimeout); + to_kill.push(s.token()); + } + } + } + for p in to_kill { + self.kill_connection(p, io); + } + } + fn connect_peers(&self, io: &IoContext>) { struct NodeInfo { id: NodeId, diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 20a9c9a48..41e8e9c5d 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -4,10 +4,14 @@ use rlp::*; use network::connection::{EncryptedConnection, Packet}; use network::handshake::Handshake; use error::*; -use io::{IoContext}; +use io::{IoContext, StreamToken}; use network::error::{NetworkError, DisconnectReason}; use network::host::*; use network::node::NodeId; +use time; + +const PING_TIMEOUT_SEC: u64 = 30; +const PING_INTERVAL_SEC: u64 = 30; /// Peer session over encrypted connection. /// When created waits for Hello packet exchange and signals ready state. @@ -19,6 +23,8 @@ pub struct Session { connection: EncryptedConnection, /// Session ready flag. Set after successfull Hello packet exchange had_hello: bool, + ping_time_ns: u64, + pong_time_ns: Option, } /// Structure used to report various session events. @@ -47,6 +53,8 @@ pub struct SessionInfo { pub protocol_version: u32, /// Peer protocol capabilities capabilities: Vec, + /// Peer ping delay in milliseconds + pub ping_ms: Option, } #[derive(Debug, PartialEq, Eq)] @@ -95,10 +103,13 @@ impl Session { client_version: String::new(), protocol_version: 0, capabilities: Vec::new(), + ping_ms: None, }, + ping_time_ns: 0, + pong_time_ns: None, }; try!(session.write_hello(host)); - try!(session.write_ping()); + try!(session.send_ping()); Ok(session) } @@ -141,7 +152,7 @@ impl Session { while protocol != self.info.capabilities[i].protocol { i += 1; if i == self.info.capabilities.len() { - debug!(target: "net", "Unkown protocol: {:?}", protocol); + debug!(target: "net", "Unknown protocol: {:?}", protocol); return Ok(()) } } @@ -152,6 +163,26 @@ impl Session { self.connection.send_packet(&rlp.out()) } + /// Keep this session alive. Returns false if ping timeout happened + pub fn keep_alive(&mut self) -> bool { + let timed_out = if let Some(pong) = self.pong_time_ns { + pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000 + } else { + time::precise_time_ns() - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000 + }; + + if !timed_out && time::precise_time_ns() - self.ping_time_ns > PING_INTERVAL_SEC * 1000_000_000 { + if let Err(e) = self.send_ping() { + debug!("Error sending ping message: {:?}", e); + } + } + !timed_out + } + + pub fn token(&self) -> StreamToken { + self.connection.token() + } + fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result { if packet.data.len() < 2 { return Err(From::from(NetworkError::BadProtocol)); @@ -168,7 +199,12 @@ impl Session { }, PACKET_DISCONNECT => Err(From::from(NetworkError::Disconnect(DisconnectReason::DisconnectRequested))), PACKET_PING => { - try!(self.write_pong()); + try!(self.send_pong()); + Ok(SessionData::None) + }, + PACKET_PONG => { + self.pong_time_ns = Some(time::precise_time_ns()); + self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000); Ok(SessionData::None) }, PACKET_GET_PEERS => Ok(SessionData::None), //TODO; @@ -178,7 +214,7 @@ impl Session { while packet_id < self.info.capabilities[i].id_offset { i += 1; if i == self.info.capabilities.len() { - debug!(target: "net", "Unkown packet: {:?}", packet_id); + debug!(target: "net", "Unknown packet: {:?}", packet_id); return Ok(SessionData::None) } } @@ -189,7 +225,7 @@ impl Session { Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } ) }, _ => { - debug!(target: "net", "Unkown packet: {:?}", packet_id); + debug!(target: "net", "Unknown packet: {:?}", packet_id); Ok(SessionData::None) } } @@ -255,11 +291,15 @@ impl Session { Ok(()) } - fn write_ping(&mut self) -> Result<(), UtilError> { - self.send(try!(Session::prepare(PACKET_PING))) + /// Senf ping packet + pub fn send_ping(&mut self) -> Result<(), UtilError> { + try!(self.send(try!(Session::prepare(PACKET_PING)))); + self.ping_time_ns = time::precise_time_ns(); + self.pong_time_ns = None; + Ok(()) } - fn write_pong(&mut self) -> Result<(), UtilError> { + fn send_pong(&mut self) -> Result<(), UtilError> { self.send(try!(Session::prepare(PACKET_PONG))) }