Peers keep-alive
This commit is contained in:
parent
81e339a77a
commit
05e86ca63c
@ -207,6 +207,12 @@ pub struct EncryptedConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedConnection {
|
impl EncryptedConnection {
|
||||||
|
|
||||||
|
/// Get socket token
|
||||||
|
pub fn token(&self) -> StreamToken {
|
||||||
|
self.connection.token
|
||||||
|
}
|
||||||
|
|
||||||
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
||||||
pub fn new(mut handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
|
pub fn new(mut handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
|
||||||
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
||||||
|
@ -15,7 +15,7 @@ pub enum DisconnectReason
|
|||||||
_ClientQuit,
|
_ClientQuit,
|
||||||
_UnexpectedIdentity,
|
_UnexpectedIdentity,
|
||||||
_LocalIdentity,
|
_LocalIdentity,
|
||||||
_PingTimeout,
|
PingTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -346,6 +346,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
|
self.keep_alive(io);
|
||||||
self.connect_peers(io);
|
self.connect_peers(io);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,6 +358,21 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
|
self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
|
let mut to_kill = Vec::new();
|
||||||
|
for e in self.connections.write().unwrap().iter_mut() {
|
||||||
|
if let ConnectionEntry::Session(ref mut s) = *e.lock().unwrap().deref_mut() {
|
||||||
|
if !s.keep_alive() {
|
||||||
|
s.disconnect(DisconnectReason::PingTimeout);
|
||||||
|
to_kill.push(s.token());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for p in to_kill {
|
||||||
|
self.kill_connection(p, io);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
struct NodeInfo {
|
struct NodeInfo {
|
||||||
id: NodeId,
|
id: NodeId,
|
||||||
|
@ -4,10 +4,14 @@ use rlp::*;
|
|||||||
use network::connection::{EncryptedConnection, Packet};
|
use network::connection::{EncryptedConnection, Packet};
|
||||||
use network::handshake::Handshake;
|
use network::handshake::Handshake;
|
||||||
use error::*;
|
use error::*;
|
||||||
use io::{IoContext};
|
use io::{IoContext, StreamToken};
|
||||||
use network::error::{NetworkError, DisconnectReason};
|
use network::error::{NetworkError, DisconnectReason};
|
||||||
use network::host::*;
|
use network::host::*;
|
||||||
use network::node::NodeId;
|
use network::node::NodeId;
|
||||||
|
use time;
|
||||||
|
|
||||||
|
const PING_TIMEOUT_SEC: u64 = 30;
|
||||||
|
const PING_INTERVAL_SEC: u64 = 30;
|
||||||
|
|
||||||
/// Peer session over encrypted connection.
|
/// Peer session over encrypted connection.
|
||||||
/// When created waits for Hello packet exchange and signals ready state.
|
/// When created waits for Hello packet exchange and signals ready state.
|
||||||
@ -19,6 +23,8 @@ pub struct Session {
|
|||||||
connection: EncryptedConnection,
|
connection: EncryptedConnection,
|
||||||
/// Session ready flag. Set after successfull Hello packet exchange
|
/// Session ready flag. Set after successfull Hello packet exchange
|
||||||
had_hello: bool,
|
had_hello: bool,
|
||||||
|
ping_time_ns: u64,
|
||||||
|
pong_time_ns: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Structure used to report various session events.
|
/// Structure used to report various session events.
|
||||||
@ -47,6 +53,8 @@ pub struct SessionInfo {
|
|||||||
pub protocol_version: u32,
|
pub protocol_version: u32,
|
||||||
/// Peer protocol capabilities
|
/// Peer protocol capabilities
|
||||||
capabilities: Vec<SessionCapabilityInfo>,
|
capabilities: Vec<SessionCapabilityInfo>,
|
||||||
|
/// Peer ping delay in milliseconds
|
||||||
|
pub ping_ms: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
@ -95,10 +103,13 @@ impl Session {
|
|||||||
client_version: String::new(),
|
client_version: String::new(),
|
||||||
protocol_version: 0,
|
protocol_version: 0,
|
||||||
capabilities: Vec::new(),
|
capabilities: Vec::new(),
|
||||||
|
ping_ms: None,
|
||||||
},
|
},
|
||||||
|
ping_time_ns: 0,
|
||||||
|
pong_time_ns: None,
|
||||||
};
|
};
|
||||||
try!(session.write_hello(host));
|
try!(session.write_hello(host));
|
||||||
try!(session.write_ping());
|
try!(session.send_ping());
|
||||||
Ok(session)
|
Ok(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,7 +152,7 @@ impl Session {
|
|||||||
while protocol != self.info.capabilities[i].protocol {
|
while protocol != self.info.capabilities[i].protocol {
|
||||||
i += 1;
|
i += 1;
|
||||||
if i == self.info.capabilities.len() {
|
if i == self.info.capabilities.len() {
|
||||||
debug!(target: "net", "Unkown protocol: {:?}", protocol);
|
debug!(target: "net", "Unknown protocol: {:?}", protocol);
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -152,6 +163,26 @@ impl Session {
|
|||||||
self.connection.send_packet(&rlp.out())
|
self.connection.send_packet(&rlp.out())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Keep this session alive. Returns false if ping timeout happened
|
||||||
|
pub fn keep_alive(&mut self) -> bool {
|
||||||
|
let timed_out = if let Some(pong) = self.pong_time_ns {
|
||||||
|
pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000
|
||||||
|
} else {
|
||||||
|
time::precise_time_ns() - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000
|
||||||
|
};
|
||||||
|
|
||||||
|
if !timed_out && time::precise_time_ns() - self.ping_time_ns > PING_INTERVAL_SEC * 1000_000_000 {
|
||||||
|
if let Err(e) = self.send_ping() {
|
||||||
|
debug!("Error sending ping message: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
!timed_out
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn token(&self) -> StreamToken {
|
||||||
|
self.connection.token()
|
||||||
|
}
|
||||||
|
|
||||||
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> {
|
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> {
|
||||||
if packet.data.len() < 2 {
|
if packet.data.len() < 2 {
|
||||||
return Err(From::from(NetworkError::BadProtocol));
|
return Err(From::from(NetworkError::BadProtocol));
|
||||||
@ -168,7 +199,12 @@ impl Session {
|
|||||||
},
|
},
|
||||||
PACKET_DISCONNECT => Err(From::from(NetworkError::Disconnect(DisconnectReason::DisconnectRequested))),
|
PACKET_DISCONNECT => Err(From::from(NetworkError::Disconnect(DisconnectReason::DisconnectRequested))),
|
||||||
PACKET_PING => {
|
PACKET_PING => {
|
||||||
try!(self.write_pong());
|
try!(self.send_pong());
|
||||||
|
Ok(SessionData::None)
|
||||||
|
},
|
||||||
|
PACKET_PONG => {
|
||||||
|
self.pong_time_ns = Some(time::precise_time_ns());
|
||||||
|
self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000);
|
||||||
Ok(SessionData::None)
|
Ok(SessionData::None)
|
||||||
},
|
},
|
||||||
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
||||||
@ -178,7 +214,7 @@ impl Session {
|
|||||||
while packet_id < self.info.capabilities[i].id_offset {
|
while packet_id < self.info.capabilities[i].id_offset {
|
||||||
i += 1;
|
i += 1;
|
||||||
if i == self.info.capabilities.len() {
|
if i == self.info.capabilities.len() {
|
||||||
debug!(target: "net", "Unkown packet: {:?}", packet_id);
|
debug!(target: "net", "Unknown packet: {:?}", packet_id);
|
||||||
return Ok(SessionData::None)
|
return Ok(SessionData::None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,7 +225,7 @@ impl Session {
|
|||||||
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
|
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
debug!(target: "net", "Unkown packet: {:?}", packet_id);
|
debug!(target: "net", "Unknown packet: {:?}", packet_id);
|
||||||
Ok(SessionData::None)
|
Ok(SessionData::None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -255,11 +291,15 @@ impl Session {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_ping(&mut self) -> Result<(), UtilError> {
|
/// Senf ping packet
|
||||||
self.send(try!(Session::prepare(PACKET_PING)))
|
pub fn send_ping(&mut self) -> Result<(), UtilError> {
|
||||||
|
try!(self.send(try!(Session::prepare(PACKET_PING))));
|
||||||
|
self.ping_time_ns = time::precise_time_ns();
|
||||||
|
self.pong_time_ns = None;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_pong(&mut self) -> Result<(), UtilError> {
|
fn send_pong(&mut self) -> Result<(), UtilError> {
|
||||||
self.send(try!(Session::prepare(PACKET_PONG)))
|
self.send(try!(Session::prepare(PACKET_PONG)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user