Peer sync timeouts

This commit is contained in:
arkpar 2016-02-02 14:54:46 +01:00
parent bd684e3732
commit ce0cc11c56
9 changed files with 74 additions and 14 deletions

View File

@ -13,4 +13,5 @@ ethcore = { path = ".." }
clippy = "0.0.37" clippy = "0.0.37"
log = "0.3" log = "0.3"
env_logger = "0.3" env_logger = "0.3"
time = "0.1.34"

View File

@ -22,6 +22,7 @@ use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*; use ethcore::error::*;
use ethcore::block::Block; use ethcore::block::Block;
use io::SyncIo; use io::SyncIo;
use time;
impl ToUsize for BlockNumber { impl ToUsize for BlockNumber {
fn to_usize(&self) -> usize { fn to_usize(&self) -> usize {
@ -61,6 +62,8 @@ const RECEIPTS_PACKET: u8 = 0x10;
const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent
const CONNECTION_TIMEOUT_SEC: f64 = 30f64;
struct Header { struct Header {
/// Header data /// Header data
data: Bytes, data: Bytes,
@ -138,6 +141,8 @@ struct PeerInfo {
asking: PeerAsking, asking: PeerAsking,
/// A set of block numbers being requested /// A set of block numbers being requested
asking_blocks: Vec<BlockNumber>, asking_blocks: Vec<BlockNumber>,
/// Request timestamp
ask_time: f64,
} }
/// Blockchain sync handler. /// Blockchain sync handler.
@ -250,6 +255,7 @@ impl ChainSync {
genesis: try!(r.val_at(4)), genesis: try!(r.val_at(4)),
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
ask_time: 0f64,
}; };
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis);
@ -803,6 +809,7 @@ impl ChainSync {
Ok(_) => { Ok(_) => {
let mut peer = self.peers.get_mut(&peer_id).unwrap(); let mut peer = self.peers.get_mut(&peer_id).unwrap();
peer.asking = asking; peer.asking = asking;
peer.ask_time = time::precise_time_s();
} }
} }
} }
@ -977,6 +984,16 @@ impl ChainSync {
}) })
} }
/// Handle peer timeouts
pub fn maintain_peers(&self, io: &mut SyncIo) {
let tick = time::precise_time_s();
for (peer_id, peer) in &self.peers {
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
io.disconnect_peer(*peer_id);
}
}
}
/// Maintain other peers. Send out any new blocks and transactions /// Maintain other peers. Send out any new blocks and transactions
pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { pub fn _maintain_sync(&mut self, _io: &mut SyncIo) {
} }

View File

@ -9,6 +9,8 @@ use ethcore::service::SyncMessage;
pub trait SyncIo { pub trait SyncIo {
/// Disable a peer /// Disable a peer
fn disable_peer(&mut self, peer_id: PeerId); fn disable_peer(&mut self, peer_id: PeerId);
/// Disconnect peer
fn disconnect_peer(&mut self, peer_id: PeerId);
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet. /// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>; fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>;
/// Send a packet to a peer. /// Send a packet to a peer.
@ -42,6 +44,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
self.network.disable_peer(peer_id); self.network.disable_peer(peer_id);
} }
fn disconnect_peer(&mut self, peer_id: PeerId) {
self.network.disconnect_peer(peer_id);
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>{ fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>{
self.network.respond(packet_id, data) self.network.respond(packet_id, data)
} }

View File

@ -33,11 +33,13 @@ extern crate log;
extern crate ethcore_util as util; extern crate ethcore_util as util;
extern crate ethcore; extern crate ethcore;
extern crate env_logger; extern crate env_logger;
extern crate time;
use std::ops::*; use std::ops::*;
use std::sync::*; use std::sync::*;
use ethcore::client::Client; use ethcore::client::Client;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::io::TimerToken;
use chain::ChainSync; use chain::ChainSync;
use ethcore::service::SyncMessage; use ethcore::service::SyncMessage;
use io::NetSyncIo; use io::NetSyncIo;
@ -87,7 +89,8 @@ impl EthSync {
} }
impl NetworkProtocolHandler<SyncMessage> for EthSync { impl NetworkProtocolHandler<SyncMessage> for EthSync {
fn initialize(&self, _io: &NetworkContext<SyncMessage>) { fn initialize(&self, io: &NetworkContext<SyncMessage>) {
io.register_timer(0, 1000).expect("Error registering sync timer");
} }
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
@ -101,6 +104,10 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) { fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
} }
fn timeout(&self, io: &NetworkContext<SyncMessage>, _timer: TimerToken) {
self.sync.write().unwrap().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref()));
}
} }

View File

@ -209,6 +209,9 @@ impl<'p> SyncIo for TestIo<'p> {
fn disable_peer(&mut self, _peer_id: PeerId) { fn disable_peer(&mut self, _peer_id: PeerId) {
} }
fn disconnect_peer(&mut self, _peer_id: PeerId) {
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> { fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
self.queue.push_back(TestPacket { self.queue.push_back(TestPacket {
data: data, data: data,

View File

@ -5,17 +5,17 @@ use rlp::*;
pub enum DisconnectReason pub enum DisconnectReason
{ {
DisconnectRequested, DisconnectRequested,
//TCPError, _TCPError,
//BadProtocol, _BadProtocol,
UselessPeer, UselessPeer,
//TooManyPeers, _TooManyPeers,
//DuplicatePeer, _DuplicatePeer,
//IncompatibleProtocol, _IncompatibleProtocol,
//NullIdentity, _NullIdentity,
//ClientQuit, _ClientQuit,
//UnexpectedIdentity, _UnexpectedIdentity,
//LocalIdentity, _LocalIdentity,
//PingTimeout, _PingTimeout,
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -18,6 +18,7 @@ use io::*;
use network::NetworkProtocolHandler; use network::NetworkProtocolHandler;
use network::node::*; use network::node::*;
use network::stats::NetworkStats; use network::stats::NetworkStats;
use network::error::DisconnectReason;
type Slab<T> = ::slab::Slab<T, usize>; type Slab<T> = ::slab::Slab<T, usize>;
@ -107,6 +108,11 @@ pub enum NetworkIoMessage<Message> where Message: Send + Sync + Clone {
/// Timer delay in milliseconds. /// Timer delay in milliseconds.
delay: u64, delay: u64,
}, },
/// Disconnect a peer
Disconnect {
/// Peer Id
peer: PeerId,
},
/// User message /// User message
User(Message), User(Message),
} }
@ -180,8 +186,16 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
} }
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, _peer: PeerId) { pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left //TODO: remove capability, disconnect if no capabilities left
self.disconnect_peer(peer);
}
/// Disconnect peer. Reconnect can be attempted later.
pub fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect {
peer: peer,
});
} }
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token. /// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
@ -683,6 +697,17 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
io.register_timer(handler_token, *delay).expect("Error registering timer"); io.register_timer(handler_token, *delay).expect("Error registering timer");
}, },
NetworkIoMessage::Disconnect {
ref peer,
} => {
if let Some(connection) = self.connections.read().unwrap().get(*peer).cloned() {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(_) => {},
ConnectionEntry::Session(ref mut s) => { s.disconnect(DisconnectReason::DisconnectRequested); }
}
}
self.kill_connection(*peer, io);
},
NetworkIoMessage::User(ref message) => { NetworkIoMessage::User(ref message) => {
for (p, h) in self.handlers.read().unwrap().iter() { for (p, h) in self.handlers.read().unwrap().iter() {
h.message(&NetworkContext::new(io, p, None, self.connections.clone()), &message); h.message(&NetworkContext::new(io, p, None, self.connections.clone()), &message);

View File

@ -21,7 +21,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
let host = Arc::new(Host::new(config)); let host = Arc::new(Host::new(config));
let stats = host.stats().clone(); let stats = host.stats().clone();
let host_info = host.client_version(); let host_info = host.client_version();
info!("NetworkService::start(): id={:?}", host.client_id()); info!("Host ID={:?}", host.client_id());
try!(io_service.register_handler(host)); try!(io_service.register_handler(host));
Ok(NetworkService { Ok(NetworkService {
io_service: io_service, io_service: io_service,

View File

@ -263,7 +263,8 @@ impl Session {
self.send(try!(Session::prepare(PACKET_PONG))) self.send(try!(Session::prepare(PACKET_PONG)))
} }
fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError { /// Disconnect this session
pub fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32)); rlp.append(&(PACKET_DISCONNECT as u32));
rlp.begin_list(1); rlp.begin_list(1);