From ce0cc11c56db7981ca9a04ac3403ce817d38a426 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 2 Feb 2016 14:54:46 +0100 Subject: [PATCH] Peer sync timeouts --- sync/Cargo.toml | 1 + sync/src/chain.rs | 17 +++++++++++++++++ sync/src/io.rs | 6 ++++++ sync/src/lib.rs | 9 ++++++++- sync/src/tests.rs | 3 +++ util/src/network/error.rs | 20 ++++++++++---------- util/src/network/host.rs | 27 ++++++++++++++++++++++++++- util/src/network/service.rs | 2 +- util/src/network/session.rs | 3 ++- 9 files changed, 74 insertions(+), 14 deletions(-) diff --git a/sync/Cargo.toml b/sync/Cargo.toml index c3ae470fd..5f098bc26 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -13,4 +13,5 @@ ethcore = { path = ".." } clippy = "0.0.37" log = "0.3" env_logger = "0.3" +time = "0.1.34" diff --git a/sync/src/chain.rs b/sync/src/chain.rs index dcc5c52e3..9752a5013 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -22,6 +22,7 @@ use range_collection::{RangeCollection, ToUsize, FromUsize}; use ethcore::error::*; use ethcore::block::Block; use io::SyncIo; +use time; impl ToUsize for BlockNumber { fn to_usize(&self) -> usize { @@ -61,6 +62,8 @@ const RECEIPTS_PACKET: u8 = 0x10; const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent +const CONNECTION_TIMEOUT_SEC: f64 = 30f64; + struct Header { /// Header data data: Bytes, @@ -138,6 +141,8 @@ struct PeerInfo { asking: PeerAsking, /// A set of block numbers being requested asking_blocks: Vec, + /// Request timestamp + ask_time: f64, } /// Blockchain sync handler. @@ -250,6 +255,7 @@ impl ChainSync { genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), + ask_time: 0f64, }; trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); @@ -803,6 +809,7 @@ impl ChainSync { Ok(_) => { let mut peer = self.peers.get_mut(&peer_id).unwrap(); peer.asking = asking; + peer.ask_time = time::precise_time_s(); } } } @@ -977,6 +984,16 @@ impl ChainSync { }) } + /// Handle peer timeouts + pub fn maintain_peers(&self, io: &mut SyncIo) { + let tick = time::precise_time_s(); + for (peer_id, peer) in &self.peers { + if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC { + io.disconnect_peer(*peer_id); + } + } + } + /// Maintain other peers. Send out any new blocks and transactions pub fn _maintain_sync(&mut self, _io: &mut SyncIo) { } diff --git a/sync/src/io.rs b/sync/src/io.rs index 4425a2555..8f415f582 100644 --- a/sync/src/io.rs +++ b/sync/src/io.rs @@ -9,6 +9,8 @@ use ethcore::service::SyncMessage; pub trait SyncIo { /// Disable a peer fn disable_peer(&mut self, peer_id: PeerId); + /// Disconnect peer + fn disconnect_peer(&mut self, peer_id: PeerId); /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>; /// Send a packet to a peer. @@ -42,6 +44,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { self.network.disable_peer(peer_id); } + fn disconnect_peer(&mut self, peer_id: PeerId) { + self.network.disconnect_peer(peer_id); + } + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError>{ self.network.respond(packet_id, data) } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 09f3eb521..40b67dc5b 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -33,11 +33,13 @@ extern crate log; extern crate ethcore_util as util; extern crate ethcore; extern crate env_logger; +extern crate time; use std::ops::*; use std::sync::*; use ethcore::client::Client; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; +use util::io::TimerToken; use chain::ChainSync; use ethcore::service::SyncMessage; use io::NetSyncIo; @@ -87,7 +89,8 @@ impl EthSync { } impl NetworkProtocolHandler for EthSync { - fn initialize(&self, _io: &NetworkContext) { + fn initialize(&self, io: &NetworkContext) { + io.register_timer(0, 1000).expect("Error registering sync timer"); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -101,6 +104,10 @@ impl NetworkProtocolHandler for EthSync { fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); } + + fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + self.sync.write().unwrap().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); + } } diff --git a/sync/src/tests.rs b/sync/src/tests.rs index 41516ef60..5b796e6f1 100644 --- a/sync/src/tests.rs +++ b/sync/src/tests.rs @@ -209,6 +209,9 @@ impl<'p> SyncIo for TestIo<'p> { fn disable_peer(&mut self, _peer_id: PeerId) { } + fn disconnect_peer(&mut self, _peer_id: PeerId) { + } + fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { self.queue.push_back(TestPacket { data: data, diff --git a/util/src/network/error.rs b/util/src/network/error.rs index b9dfdc892..f03e128e6 100644 --- a/util/src/network/error.rs +++ b/util/src/network/error.rs @@ -5,17 +5,17 @@ use rlp::*; pub enum DisconnectReason { DisconnectRequested, - //TCPError, - //BadProtocol, + _TCPError, + _BadProtocol, UselessPeer, - //TooManyPeers, - //DuplicatePeer, - //IncompatibleProtocol, - //NullIdentity, - //ClientQuit, - //UnexpectedIdentity, - //LocalIdentity, - //PingTimeout, + _TooManyPeers, + _DuplicatePeer, + _IncompatibleProtocol, + _NullIdentity, + _ClientQuit, + _UnexpectedIdentity, + _LocalIdentity, + _PingTimeout, } #[derive(Debug)] diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 95b1e3668..5e44c5854 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -18,6 +18,7 @@ use io::*; use network::NetworkProtocolHandler; use network::node::*; use network::stats::NetworkStats; +use network::error::DisconnectReason; type Slab = ::slab::Slab; @@ -107,6 +108,11 @@ pub enum NetworkIoMessage where Message: Send + Sync + Clone { /// Timer delay in milliseconds. delay: u64, }, + /// Disconnect a peer + Disconnect { + /// Peer Id + peer: PeerId, + }, /// User message User(Message), } @@ -180,8 +186,16 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone } /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. - pub fn disable_peer(&self, _peer: PeerId) { + pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left + self.disconnect_peer(peer); + } + + /// Disconnect peer. Reconnect can be attempted later. + pub fn disconnect_peer(&self, peer: PeerId) { + self.io.message(NetworkIoMessage::Disconnect { + peer: peer, + }); } /// Register a new IO timer. 'IoHandler::timeout' will be called with the token. @@ -683,6 +697,17 @@ impl IoHandler> for Host where Messa self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token }); io.register_timer(handler_token, *delay).expect("Error registering timer"); }, + NetworkIoMessage::Disconnect { + ref peer, + } => { + if let Some(connection) = self.connections.read().unwrap().get(*peer).cloned() { + match *connection.lock().unwrap().deref_mut() { + ConnectionEntry::Handshake(_) => {}, + ConnectionEntry::Session(ref mut s) => { s.disconnect(DisconnectReason::DisconnectRequested); } + } + } + self.kill_connection(*peer, io); + }, NetworkIoMessage::User(ref message) => { for (p, h) in self.handlers.read().unwrap().iter() { h.message(&NetworkContext::new(io, p, None, self.connections.clone()), &message); diff --git a/util/src/network/service.rs b/util/src/network/service.rs index cbf400872..41a23cda6 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -21,7 +21,7 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat let host = Arc::new(Host::new(config)); let stats = host.stats().clone(); let host_info = host.client_version(); - info!("NetworkService::start(): id={:?}", host.client_id()); + info!("Host ID={:?}", host.client_id()); try!(io_service.register_handler(host)); Ok(NetworkService { io_service: io_service, diff --git a/util/src/network/session.rs b/util/src/network/session.rs index fb385b487..20a9c9a48 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -263,7 +263,8 @@ impl Session { self.send(try!(Session::prepare(PACKET_PONG))) } - fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError { + /// Disconnect this session + pub fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError { let mut rlp = RlpStream::new(); rlp.append(&(PACKET_DISCONNECT as u32)); rlp.begin_list(1);