Merge pull request #5958 from Vurich/peer-status
Report whether a peer was kept from `Handler::on_connect`
This commit is contained in:
commit
cc718bb108
@ -66,6 +66,8 @@ pub enum Error {
|
||||
BadProtocolVersion,
|
||||
/// Peer is overburdened.
|
||||
Overburdened,
|
||||
/// No handler kept the peer.
|
||||
RejectedByHandlers,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@ -85,6 +87,7 @@ impl Error {
|
||||
Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
|
||||
Error::BadProtocolVersion => Punishment::Disable,
|
||||
Error::Overburdened => Punishment::None,
|
||||
Error::RejectedByHandlers => Punishment::Disconnect,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -117,6 +120,7 @@ impl fmt::Display for Error {
|
||||
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
|
||||
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),
|
||||
Error::Overburdened => write!(f, "Peer overburdened"),
|
||||
Error::RejectedByHandlers => write!(f, "No handler kept this peer"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::ops::{BitOr, BitAnd, Not};
|
||||
|
||||
use provider::Provider;
|
||||
use request::{Request, NetworkRequests as Requests, Response};
|
||||
@ -162,6 +163,54 @@ pub struct Peer {
|
||||
awaiting_acknowledge: Option<(SteadyTime, Arc<FlowParams>)>,
|
||||
}
|
||||
|
||||
/// Whether or not a peer was kept by a handler
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PeerStatus {
|
||||
/// The peer was kept
|
||||
Kept,
|
||||
/// The peer was not kept
|
||||
Unkept,
|
||||
}
|
||||
|
||||
impl Not for PeerStatus {
|
||||
type Output = Self;
|
||||
|
||||
fn not(self) -> Self {
|
||||
use self::PeerStatus::*;
|
||||
|
||||
match self {
|
||||
Kept => Unkept,
|
||||
Unkept => Kept,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BitAnd for PeerStatus {
|
||||
type Output = Self;
|
||||
|
||||
fn bitand(self, other: Self) -> Self {
|
||||
use self::PeerStatus::*;
|
||||
|
||||
match (self, other) {
|
||||
(Kept, Kept) => Kept,
|
||||
_ => Unkept,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BitOr for PeerStatus {
|
||||
type Output = Self;
|
||||
|
||||
fn bitor(self, other: Self) -> Self {
|
||||
use self::PeerStatus::*;
|
||||
|
||||
match (self, other) {
|
||||
(_, Kept) | (Kept, _) => Kept,
|
||||
_ => Unkept,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A light protocol event handler.
|
||||
///
|
||||
/// Each handler function takes a context which describes the relevant peer
|
||||
@ -173,7 +222,12 @@ pub struct Peer {
|
||||
/// that relevant data will be stored by interested handlers.
|
||||
pub trait Handler: Send + Sync {
|
||||
/// Called when a peer connects.
|
||||
fn on_connect(&self, _ctx: &EventContext, _status: &Status, _capabilities: &Capabilities) { }
|
||||
fn on_connect(
|
||||
&self,
|
||||
_ctx: &EventContext,
|
||||
_status: &Status,
|
||||
_capabilities: &Capabilities
|
||||
) -> PeerStatus { PeerStatus::Kept }
|
||||
/// Called when a peer disconnects, with a list of unfulfilled request IDs as
|
||||
/// of yet.
|
||||
fn on_disconnect(&self, _ctx: &EventContext, _unfulfilled: &[ReqId]) { }
|
||||
@ -777,15 +831,23 @@ impl LightProtocol {
|
||||
awaiting_acknowledge: None,
|
||||
}));
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_connect(&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
}, &status, &capabilities)
|
||||
}
|
||||
let any_kept = self.handlers.iter().map(
|
||||
|handler| handler.on_connect(
|
||||
&Ctx {
|
||||
peer: *peer,
|
||||
io: io,
|
||||
proto: self,
|
||||
},
|
||||
&status,
|
||||
&capabilities
|
||||
)
|
||||
).fold(PeerStatus::Kept, PeerStatus::bitor);
|
||||
|
||||
Ok(())
|
||||
if any_kept == PeerStatus::Unkept {
|
||||
Err(Error::RejectedByHandlers)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Handle an announcement.
|
||||
|
@ -29,7 +29,10 @@ use futures::sync::oneshot::{self, Sender, Receiver, Canceled};
|
||||
use network::PeerId;
|
||||
use util::{RwLock, Mutex};
|
||||
|
||||
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
|
||||
use net::{
|
||||
self, Handler, PeerStatus, Status, Capabilities,
|
||||
Announcement, EventContext, BasicContext, ReqId,
|
||||
};
|
||||
use cache::Cache;
|
||||
use request::{self as basic_request, Request as NetworkRequest};
|
||||
use self::request::CheckedRequest;
|
||||
@ -402,9 +405,18 @@ impl OnDemand {
|
||||
}
|
||||
|
||||
impl Handler for OnDemand {
|
||||
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
|
||||
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() });
|
||||
fn on_connect(
|
||||
&self,
|
||||
ctx: &EventContext,
|
||||
status: &Status,
|
||||
capabilities: &Capabilities
|
||||
) -> PeerStatus {
|
||||
self.peers.write().insert(
|
||||
ctx.peer(),
|
||||
Peer { status: status.clone(), capabilities: capabilities.clone() }
|
||||
);
|
||||
self.attempt_dispatch(ctx.as_basic());
|
||||
PeerStatus::Kept
|
||||
}
|
||||
|
||||
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
|
||||
|
@ -39,8 +39,9 @@ use std::sync::Arc;
|
||||
use ethcore::encoded;
|
||||
use light::client::{AsLightClient, LightChainClient};
|
||||
use light::net::{
|
||||
Announcement, Handler, BasicContext, EventContext,
|
||||
Capabilities, ReqId, Status, Error as NetError,
|
||||
PeerStatus, Announcement, Handler, BasicContext,
|
||||
EventContext, Capabilities, ReqId, Status,
|
||||
Error as NetError,
|
||||
};
|
||||
use light::request::{self, CompleteHeadersRequest as HeadersRequest};
|
||||
use network::PeerId;
|
||||
@ -229,26 +230,33 @@ pub struct LightSync<L: AsLightClient> {
|
||||
}
|
||||
|
||||
impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
|
||||
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
|
||||
if !capabilities.serve_headers {
|
||||
trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer());
|
||||
ctx.disconnect_peer(ctx.peer());
|
||||
return;
|
||||
fn on_connect(
|
||||
&self,
|
||||
ctx: &EventContext,
|
||||
status: &Status,
|
||||
capabilities: &Capabilities
|
||||
) -> PeerStatus {
|
||||
use std::cmp;
|
||||
|
||||
if capabilities.serve_headers {
|
||||
let chain_info = ChainInfo {
|
||||
head_td: status.head_td,
|
||||
head_hash: status.head_hash,
|
||||
head_num: status.head_num,
|
||||
};
|
||||
|
||||
{
|
||||
let mut best = self.best_seen.lock();
|
||||
*best = cmp::max(best.clone(), Some(chain_info.clone()));
|
||||
}
|
||||
|
||||
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
|
||||
self.maintain_sync(ctx.as_basic());
|
||||
|
||||
PeerStatus::Kept
|
||||
} else {
|
||||
PeerStatus::Unkept
|
||||
}
|
||||
|
||||
let chain_info = ChainInfo {
|
||||
head_td: status.head_td,
|
||||
head_hash: status.head_hash,
|
||||
head_num: status.head_num,
|
||||
};
|
||||
|
||||
{
|
||||
let mut best = self.best_seen.lock();
|
||||
*best = ::std::cmp::max(best.clone(), Some(chain_info.clone()));
|
||||
}
|
||||
|
||||
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
|
||||
self.maintain_sync(ctx.as_basic());
|
||||
}
|
||||
|
||||
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
|
||||
|
Loading…
Reference in New Issue
Block a user