Report whether a peer was kept from Handler::on_connect

This commit is contained in:
Vurich 2017-06-30 10:58:48 +02:00
parent a3e693d5c3
commit 5eba9078fc
4 changed files with 117 additions and 26 deletions

View File

@ -66,6 +66,8 @@ pub enum Error {
BadProtocolVersion, BadProtocolVersion,
/// Peer is overburdened. /// Peer is overburdened.
Overburdened, Overburdened,
/// No handler kept the peer.
RejectedByHandlers,
} }
impl Error { impl Error {
@ -85,6 +87,7 @@ impl Error {
Error::UnsupportedProtocolVersion(_) => Punishment::Disable, Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
Error::BadProtocolVersion => Punishment::Disable, Error::BadProtocolVersion => Punishment::Disable,
Error::Overburdened => Punishment::None, Error::Overburdened => Punishment::None,
Error::RejectedByHandlers => Punishment::Disconnect,
} }
} }
} }
@ -117,6 +120,7 @@ impl fmt::Display for Error {
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv), Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"), Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),
Error::Overburdened => write!(f, "Peer overburdened"), Error::Overburdened => write!(f, "Peer overburdened"),
Error::RejectedByHandlers => write!(f, "No handler kept this peer"),
} }
} }
} }

View File

@ -31,6 +31,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::ops::{BitOr, BitAnd, Not};
use provider::Provider; use provider::Provider;
use request::{Request, NetworkRequests as Requests, Response}; use request::{Request, NetworkRequests as Requests, Response};
@ -157,6 +158,54 @@ pub struct Peer {
awaiting_acknowledge: Option<(SteadyTime, Arc<FlowParams>)>, awaiting_acknowledge: Option<(SteadyTime, Arc<FlowParams>)>,
} }
/// Whether or not a peer was kept by a handler
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerStatus {
/// The peer was kept
Kept,
/// The peer was not kept
Unkept,
}
impl Not for PeerStatus {
type Output = Self;
fn not(self) -> Self {
use self::PeerStatus::*;
match self {
Kept => Unkept,
Unkept => Kept,
}
}
}
impl BitAnd for PeerStatus {
type Output = Self;
fn bitand(self, other: Self) -> Self {
use self::PeerStatus::*;
match (self, other) {
(Kept, Kept) => Kept,
_ => Unkept,
}
}
}
impl BitOr for PeerStatus {
type Output = Self;
fn bitor(self, other: Self) -> Self {
use self::PeerStatus::*;
match (self, other) {
(_, Kept) | (Kept, _) => Kept,
_ => Unkept,
}
}
}
/// A light protocol event handler. /// A light protocol event handler.
/// ///
/// Each handler function takes a context which describes the relevant peer /// Each handler function takes a context which describes the relevant peer
@ -168,7 +217,12 @@ pub struct Peer {
/// that relevant data will be stored by interested handlers. /// that relevant data will be stored by interested handlers.
pub trait Handler: Send + Sync { pub trait Handler: Send + Sync {
/// Called when a peer connects. /// Called when a peer connects.
fn on_connect(&self, _ctx: &EventContext, _status: &Status, _capabilities: &Capabilities) { } fn on_connect(
&self,
_ctx: &EventContext,
_status: &Status,
_capabilities: &Capabilities
) -> PeerStatus { PeerStatus::Kept }
/// Called when a peer disconnects, with a list of unfulfilled request IDs as /// Called when a peer disconnects, with a list of unfulfilled request IDs as
/// of yet. /// of yet.
fn on_disconnect(&self, _ctx: &EventContext, _unfulfilled: &[ReqId]) { } fn on_disconnect(&self, _ctx: &EventContext, _unfulfilled: &[ReqId]) { }
@ -766,16 +820,21 @@ impl LightProtocol {
awaiting_acknowledge: None, awaiting_acknowledge: None,
})); }));
for handler in &self.handlers { let any_kept = self.handlers.iter().map(
|handler|
handler.on_connect(&Ctx { handler.on_connect(&Ctx {
peer: *peer, peer: *peer,
io: io, io: io,
proto: self, proto: self,
}, &status, &capabilities) }, &status, &capabilities)
} ).fold(PeerStatus::Kept, PeerStatus::bitor);
if any_kept == PeerStatus::Unkept {
Err(Error::RejectedByHandlers)
} else {
Ok(()) Ok(())
} }
}
// Handle an announcement. // Handle an announcement.
fn announcement(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> { fn announcement(&self, peer: &PeerId, io: &IoContext, data: UntrustedRlp) -> Result<(), Error> {

View File

@ -29,7 +29,17 @@ use futures::sync::oneshot::{self, Sender, Receiver, Canceled};
use network::PeerId; use network::PeerId;
use util::{RwLock, Mutex}; use util::{RwLock, Mutex};
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use net::{
self,
Handler,
PeerStatus,
Status,
Capabilities,
Announcement,
EventContext,
BasicContext,
ReqId,
};
use cache::Cache; use cache::Cache;
use request::{self as basic_request, Request as NetworkRequest}; use request::{self as basic_request, Request as NetworkRequest};
use self::request::CheckedRequest; use self::request::CheckedRequest;
@ -402,9 +412,18 @@ impl OnDemand {
} }
impl Handler for OnDemand { impl Handler for OnDemand {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { fn on_connect(
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); &self,
ctx: &EventContext,
status: &Status,
capabilities: &Capabilities
) -> PeerStatus {
self.peers.write().insert(
ctx.peer(),
Peer { status: status.clone(), capabilities: capabilities.clone() }
);
self.attempt_dispatch(ctx.as_basic()); self.attempt_dispatch(ctx.as_basic());
PeerStatus::Kept
} }
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {

View File

@ -39,8 +39,9 @@ use std::sync::Arc;
use ethcore::encoded; use ethcore::encoded;
use light::client::{AsLightClient, LightChainClient}; use light::client::{AsLightClient, LightChainClient};
use light::net::{ use light::net::{
Announcement, Handler, BasicContext, EventContext, PeerStatus, Announcement, Handler, BasicContext,
Capabilities, ReqId, Status, Error as NetError, EventContext, Capabilities, ReqId, Status,
Error as NetError,
}; };
use light::request::{self, CompleteHeadersRequest as HeadersRequest}; use light::request::{self, CompleteHeadersRequest as HeadersRequest};
use network::PeerId; use network::PeerId;
@ -229,13 +230,13 @@ pub struct LightSync<L: AsLightClient> {
} }
impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> { impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { fn on_connect(
if !capabilities.serve_headers { &self,
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer()); ctx: &EventContext,
ctx.disconnect_peer(ctx.peer()); status: &Status,
return; capabilities: &Capabilities
} ) -> PeerStatus {
if capabilities.serve_headers {
let chain_info = ChainInfo { let chain_info = ChainInfo {
head_td: status.head_td, head_td: status.head_td,
head_hash: status.head_hash, head_hash: status.head_hash,
@ -249,6 +250,14 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info))); self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
self.maintain_sync(ctx.as_basic()); self.maintain_sync(ctx.as_basic());
PeerStatus::Kept
} else {
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
ctx.disconnect_peer(ctx.peer());
PeerStatus::Unkept
}
} }
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {