Merge pull request #4093 from ethcore/better-timeouts
LES: Better timeouts + Track failed requests
This commit is contained in:
commit
3040a1c83e
@ -40,10 +40,13 @@ use request::{self, HashOrNumber, Request};
|
|||||||
use self::buffer_flow::{Buffer, FlowParams};
|
use self::buffer_flow::{Buffer, FlowParams};
|
||||||
use self::context::{Ctx, TickCtx};
|
use self::context::{Ctx, TickCtx};
|
||||||
use self::error::Punishment;
|
use self::error::Punishment;
|
||||||
|
use self::request_set::RequestSet;
|
||||||
|
use self::id_guard::IdGuard;
|
||||||
|
|
||||||
mod context;
|
mod context;
|
||||||
mod error;
|
mod error;
|
||||||
mod status;
|
mod status;
|
||||||
|
mod request_set;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
@ -121,7 +124,7 @@ mod timeout {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A request id.
|
/// A request id.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
|
||||||
pub struct ReqId(usize);
|
pub struct ReqId(usize);
|
||||||
|
|
||||||
impl fmt::Display for ReqId {
|
impl fmt::Display for ReqId {
|
||||||
@ -137,15 +140,17 @@ struct PendingPeer {
|
|||||||
last_update: SteadyTime,
|
last_update: SteadyTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
// data about each peer.
|
/// Relevant data to each peer. Not accessible publicly, only `pub` due to
|
||||||
struct Peer {
|
/// limitations of the privacy system.
|
||||||
|
pub struct Peer {
|
||||||
local_buffer: Buffer, // their buffer relative to us
|
local_buffer: Buffer, // their buffer relative to us
|
||||||
status: Status,
|
status: Status,
|
||||||
capabilities: Capabilities,
|
capabilities: Capabilities,
|
||||||
remote_flow: Option<(Buffer, FlowParams)>,
|
remote_flow: Option<(Buffer, FlowParams)>,
|
||||||
sent_head: H256, // last chain head we've given them.
|
sent_head: H256, // last chain head we've given them.
|
||||||
last_update: SteadyTime,
|
last_update: SteadyTime,
|
||||||
idle: bool, // make into a current percentage of max buffer being requested?
|
pending_requests: RequestSet,
|
||||||
|
failed_requests: Vec<ReqId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
@ -209,13 +214,6 @@ pub trait Handler: Send + Sync {
|
|||||||
fn on_abort(&self) { }
|
fn on_abort(&self) { }
|
||||||
}
|
}
|
||||||
|
|
||||||
// a request, the peer who it was made to, and the time it was made.
|
|
||||||
struct Requested {
|
|
||||||
request: Request,
|
|
||||||
timestamp: SteadyTime,
|
|
||||||
peer_id: PeerId,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Protocol parameters.
|
/// Protocol parameters.
|
||||||
pub struct Params {
|
pub struct Params {
|
||||||
/// Network id.
|
/// Network id.
|
||||||
@ -226,6 +224,57 @@ pub struct Params {
|
|||||||
pub capabilities: Capabilities,
|
pub capabilities: Capabilities,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Type alias for convenience.
|
||||||
|
pub type PeerMap = HashMap<PeerId, Mutex<Peer>>;
|
||||||
|
|
||||||
|
mod id_guard {
|
||||||
|
|
||||||
|
use network::PeerId;
|
||||||
|
use util::RwLockReadGuard;
|
||||||
|
|
||||||
|
use super::{PeerMap, ReqId};
|
||||||
|
|
||||||
|
// Guards success or failure of given request.
|
||||||
|
// On drop, inserts the req_id into the "failed requests"
|
||||||
|
// set for the peer unless defused. In separate module to enforce correct usage.
|
||||||
|
pub struct IdGuard<'a> {
|
||||||
|
peers: RwLockReadGuard<'a, PeerMap>,
|
||||||
|
peer_id: PeerId,
|
||||||
|
req_id: ReqId,
|
||||||
|
active: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> IdGuard<'a> {
|
||||||
|
/// Create a new `IdGuard`, which will prevent access of the inner ReqId
|
||||||
|
/// (for forming responses, triggering handlers) until defused
|
||||||
|
pub fn new(peers: RwLockReadGuard<'a, PeerMap>, peer_id: PeerId, req_id: ReqId) -> Self {
|
||||||
|
IdGuard {
|
||||||
|
peers: peers,
|
||||||
|
peer_id: peer_id,
|
||||||
|
req_id: req_id,
|
||||||
|
active: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Defuse the guard, signalling that the request has been successfully decoded.
|
||||||
|
pub fn defuse(mut self) -> ReqId {
|
||||||
|
// can't use the mem::forget trick here since we need the
|
||||||
|
// read guard to drop.
|
||||||
|
self.active = false;
|
||||||
|
self.req_id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Drop for IdGuard<'a> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if !self.active { return }
|
||||||
|
if let Some(p) = self.peers.get(&self.peer_id) {
|
||||||
|
p.lock().failed_requests.push(self.req_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// This is an implementation of the light ethereum network protocol, abstracted
|
/// This is an implementation of the light ethereum network protocol, abstracted
|
||||||
/// over a `Provider` of data and a p2p network.
|
/// over a `Provider` of data and a p2p network.
|
||||||
///
|
///
|
||||||
@ -241,8 +290,7 @@ pub struct LightProtocol {
|
|||||||
genesis_hash: H256,
|
genesis_hash: H256,
|
||||||
network_id: u64,
|
network_id: u64,
|
||||||
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
|
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
|
||||||
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
|
peers: RwLock<PeerMap>,
|
||||||
pending_requests: RwLock<HashMap<usize, Requested>>,
|
|
||||||
capabilities: RwLock<Capabilities>,
|
capabilities: RwLock<Capabilities>,
|
||||||
flow_params: FlowParams, // assumed static and same for every peer.
|
flow_params: FlowParams, // assumed static and same for every peer.
|
||||||
handlers: Vec<Arc<Handler>>,
|
handlers: Vec<Arc<Handler>>,
|
||||||
@ -261,7 +309,6 @@ impl LightProtocol {
|
|||||||
network_id: params.network_id,
|
network_id: params.network_id,
|
||||||
pending_peers: RwLock::new(HashMap::new()),
|
pending_peers: RwLock::new(HashMap::new()),
|
||||||
peers: RwLock::new(HashMap::new()),
|
peers: RwLock::new(HashMap::new()),
|
||||||
pending_requests: RwLock::new(HashMap::new()),
|
|
||||||
capabilities: RwLock::new(params.capabilities),
|
capabilities: RwLock::new(params.capabilities),
|
||||||
flow_params: params.flow_params,
|
flow_params: params.flow_params,
|
||||||
handlers: Vec::new(),
|
handlers: Vec::new(),
|
||||||
@ -275,17 +322,11 @@ impl LightProtocol {
|
|||||||
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
|
fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
|
||||||
self.peers.read().get(&peer).and_then(|peer| {
|
self.peers.read().get(&peer).and_then(|peer| {
|
||||||
let mut peer = peer.lock();
|
let mut peer = peer.lock();
|
||||||
let idle = peer.idle;
|
|
||||||
match peer.remote_flow {
|
match peer.remote_flow {
|
||||||
Some((ref mut buf, ref flow)) => {
|
Some((ref mut buf, ref flow)) => {
|
||||||
flow.recharge(buf);
|
flow.recharge(buf);
|
||||||
|
|
||||||
if !idle {
|
|
||||||
Some(0)
|
|
||||||
} else {
|
|
||||||
Some(flow.max_amount(&*buf, kind))
|
Some(flow.max_amount(&*buf, kind))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
None => None,
|
None => None,
|
||||||
}
|
}
|
||||||
}).unwrap_or(0)
|
}).unwrap_or(0)
|
||||||
@ -302,8 +343,6 @@ impl LightProtocol {
|
|||||||
let peer = peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)?;
|
let peer = peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)?;
|
||||||
let mut peer = peer.lock();
|
let mut peer = peer.lock();
|
||||||
|
|
||||||
if !peer.idle { return Err(Error::Overburdened) }
|
|
||||||
|
|
||||||
match peer.remote_flow {
|
match peer.remote_flow {
|
||||||
Some((ref mut buf, ref flow)) => {
|
Some((ref mut buf, ref flow)) => {
|
||||||
flow.recharge(buf);
|
flow.recharge(buf);
|
||||||
@ -329,12 +368,7 @@ impl LightProtocol {
|
|||||||
|
|
||||||
io.send(*peer_id, packet_id, packet_data);
|
io.send(*peer_id, packet_id, packet_data);
|
||||||
|
|
||||||
peer.idle = false;
|
peer.pending_requests.insert(ReqId(req_id), request, SteadyTime::now());
|
||||||
self.pending_requests.write().insert(req_id, Requested {
|
|
||||||
request: request,
|
|
||||||
timestamp: SteadyTime::now(),
|
|
||||||
peer_id: *peer_id,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(ReqId(req_id))
|
Ok(ReqId(req_id))
|
||||||
}
|
}
|
||||||
@ -402,11 +436,9 @@ impl LightProtocol {
|
|||||||
// acquire in order and hold.
|
// acquire in order and hold.
|
||||||
let mut pending_peers = self.pending_peers.write();
|
let mut pending_peers = self.pending_peers.write();
|
||||||
let mut peers = self.peers.write();
|
let mut peers = self.peers.write();
|
||||||
let mut pending_requests = self.pending_requests.write();
|
|
||||||
|
|
||||||
pending_peers.clear();
|
pending_peers.clear();
|
||||||
peers.clear();
|
peers.clear();
|
||||||
pending_requests.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Does the common pre-verification of responses before the response itself
|
// Does the common pre-verification of responses before the response itself
|
||||||
@ -414,37 +446,49 @@ impl LightProtocol {
|
|||||||
// - check whether peer exists
|
// - check whether peer exists
|
||||||
// - check whether request was made
|
// - check whether request was made
|
||||||
// - check whether request kinds match
|
// - check whether request kinds match
|
||||||
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<ReqId, Error> {
|
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<IdGuard, Error> {
|
||||||
let req_id: usize = raw.val_at(0)?;
|
let req_id = ReqId(raw.val_at(0)?);
|
||||||
let cur_buffer: U256 = raw.val_at(1)?;
|
let cur_buffer: U256 = raw.val_at(1)?;
|
||||||
|
|
||||||
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
|
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
|
||||||
|
|
||||||
match self.pending_requests.write().remove(&req_id) {
|
let mut had_req = false;
|
||||||
None => return Err(Error::UnsolicitedResponse),
|
|
||||||
Some(requested) => {
|
|
||||||
if requested.peer_id != *peer || requested.request.kind() != kind {
|
|
||||||
return Err(Error::UnsolicitedResponse)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let peers = self.peers.read();
|
let peers = self.peers.read();
|
||||||
match peers.get(peer) {
|
let maybe_err = match peers.get(peer) {
|
||||||
Some(peer_info) => {
|
Some(peer_info) => {
|
||||||
let mut peer_info = peer_info.lock();
|
let mut peer_info = peer_info.lock();
|
||||||
peer_info.idle = true;
|
let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now());
|
||||||
|
let flow_info = peer_info.remote_flow.as_mut();
|
||||||
|
|
||||||
match peer_info.remote_flow.as_mut() {
|
match (req_info, flow_info) {
|
||||||
Some(&mut (ref mut buf, ref mut flow)) => {
|
(Some(request), Some(flow_info)) => {
|
||||||
|
had_req = true;
|
||||||
|
|
||||||
|
let &mut (ref mut buf, ref mut flow) = flow_info;
|
||||||
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
|
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
|
||||||
buf.update_to(actual_buffer)
|
buf.update_to(actual_buffer);
|
||||||
|
|
||||||
|
if request.kind() != kind {
|
||||||
|
Some(Error::UnsolicitedResponse)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
None => return Err(Error::NotServer), // this really should be impossible.
|
|
||||||
}
|
}
|
||||||
Ok(ReqId(req_id))
|
(None, _) => Some(Error::UnsolicitedResponse),
|
||||||
|
(_, None) => Some(Error::NotServer), // really should be impossible.
|
||||||
}
|
}
|
||||||
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
|
}
|
||||||
|
None => Some(Error::UnknownPeer), // probably only occurs in a race of some kind.
|
||||||
|
};
|
||||||
|
|
||||||
|
if had_req {
|
||||||
|
let id_guard = IdGuard::new(peers, *peer, req_id);
|
||||||
|
match maybe_err {
|
||||||
|
Some(err) => Err(err),
|
||||||
|
None => Ok(id_guard)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(maybe_err.expect("every branch without a request leads to error; qed"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -491,6 +535,38 @@ impl LightProtocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check timeouts and punish peers.
|
||||||
|
fn timeout_check(&self, io: &IoContext) {
|
||||||
|
let now = SteadyTime::now();
|
||||||
|
|
||||||
|
// handshake timeout
|
||||||
|
{
|
||||||
|
let mut pending = self.pending_peers.write();
|
||||||
|
let slowpokes: Vec<_> = pending.iter()
|
||||||
|
.filter(|&(_, ref peer)| {
|
||||||
|
peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now
|
||||||
|
})
|
||||||
|
.map(|(&p, _)| p)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for slowpoke in slowpokes {
|
||||||
|
debug!(target: "les", "Peer {} handshake timed out", slowpoke);
|
||||||
|
pending.remove(&slowpoke);
|
||||||
|
io.disconnect_peer(slowpoke);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// request timeouts
|
||||||
|
{
|
||||||
|
for (peer_id, peer) in self.peers.read().iter() {
|
||||||
|
if peer.lock().pending_requests.check_timeout(now) {
|
||||||
|
debug!(target: "les", "Peer {} request timeout", peer_id);
|
||||||
|
io.disconnect_peer(*peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// called when a peer connects.
|
/// called when a peer connects.
|
||||||
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
|
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
|
||||||
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
|
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
|
||||||
@ -530,21 +606,11 @@ impl LightProtocol {
|
|||||||
pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
|
pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
|
||||||
trace!(target: "les", "Peer {} disconnecting", peer);
|
trace!(target: "les", "Peer {} disconnecting", peer);
|
||||||
|
|
||||||
|
|
||||||
self.pending_peers.write().remove(&peer);
|
self.pending_peers.write().remove(&peer);
|
||||||
if self.peers.write().remove(&peer).is_some() {
|
if let Some(peer_info) = self.peers.write().remove(&peer) {
|
||||||
let unfulfilled: Vec<_> = self.pending_requests.read()
|
let peer_info = peer_info.into_inner();
|
||||||
.iter()
|
let mut unfulfilled: Vec<_> = peer_info.pending_requests.collect_ids();
|
||||||
.filter(|&(_, r)| r.peer_id == peer)
|
unfulfilled.extend(peer_info.failed_requests);
|
||||||
.map(|(&id, _)| ReqId(id))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut pending = self.pending_requests.write();
|
|
||||||
for &ReqId(ref inner) in &unfulfilled {
|
|
||||||
pending.remove(inner);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_disconnect(&Ctx {
|
handler.on_disconnect(&Ctx {
|
||||||
@ -556,54 +622,6 @@ impl LightProtocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check timeouts and punish peers.
|
|
||||||
fn timeout_check(&self, io: &IoContext) {
|
|
||||||
let now = SteadyTime::now();
|
|
||||||
|
|
||||||
// handshake timeout
|
|
||||||
{
|
|
||||||
let mut pending = self.pending_peers.write();
|
|
||||||
let slowpokes: Vec<_> = pending.iter()
|
|
||||||
.filter(|&(_, ref peer)| {
|
|
||||||
peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now
|
|
||||||
})
|
|
||||||
.map(|(&p, _)| p)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for slowpoke in slowpokes {
|
|
||||||
debug!(target: "les", "Peer {} handshake timed out", slowpoke);
|
|
||||||
pending.remove(&slowpoke);
|
|
||||||
io.disconnect_peer(slowpoke);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// request timeouts
|
|
||||||
{
|
|
||||||
for r in self.pending_requests.read().values() {
|
|
||||||
let kind_timeout = match r.request.kind() {
|
|
||||||
request::Kind::Headers => timeout::HEADERS,
|
|
||||||
request::Kind::Bodies => timeout::BODIES,
|
|
||||||
request::Kind::Receipts => timeout::RECEIPTS,
|
|
||||||
request::Kind::StateProofs => timeout::PROOFS,
|
|
||||||
request::Kind::Codes => timeout::CONTRACT_CODES,
|
|
||||||
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
|
|
||||||
};
|
|
||||||
|
|
||||||
if r.timestamp + Duration::milliseconds(kind_timeout) <= now {
|
|
||||||
debug!(target: "les", "Request for {:?} from peer {} timed out",
|
|
||||||
r.request.kind(), r.peer_id);
|
|
||||||
|
|
||||||
// keep the request in the `pending` set for now so
|
|
||||||
// on_disconnect will pass unfulfilled ReqIds to handlers.
|
|
||||||
// in the case that a response is received after this, the
|
|
||||||
// disconnect won't be cancelled but the ReqId won't be
|
|
||||||
// marked as abandoned.
|
|
||||||
io.disconnect_peer(r.peer_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute the given closure with a basic context derived from the I/O context.
|
/// Execute the given closure with a basic context derived from the I/O context.
|
||||||
pub fn with_context<F, T>(&self, io: &IoContext, f: F) -> T
|
pub fn with_context<F, T>(&self, io: &IoContext, f: F) -> T
|
||||||
where F: FnOnce(&BasicContext) -> T
|
where F: FnOnce(&BasicContext) -> T
|
||||||
@ -655,7 +673,8 @@ impl LightProtocol {
|
|||||||
remote_flow: remote_flow,
|
remote_flow: remote_flow,
|
||||||
sent_head: pending.sent_head,
|
sent_head: pending.sent_head,
|
||||||
last_update: pending.last_update,
|
last_update: pending.last_update,
|
||||||
idle: true,
|
pending_requests: RequestSet::default(),
|
||||||
|
failed_requests: Vec::new(),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
@ -770,9 +789,10 @@ impl LightProtocol {
|
|||||||
|
|
||||||
// Receive a response for block headers.
|
// Receive a response for block headers.
|
||||||
fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||||
let req_id = self.pre_verify_response(peer, request::Kind::Headers, &raw)?;
|
let id_guard = self.pre_verify_response(peer, request::Kind::Headers, &raw)?;
|
||||||
let raw_headers: Vec<_> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
|
let raw_headers: Vec<_> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
|
||||||
|
|
||||||
|
let req_id = id_guard.defuse();
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_block_headers(&Ctx {
|
handler.on_block_headers(&Ctx {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
@ -835,9 +855,10 @@ impl LightProtocol {
|
|||||||
|
|
||||||
// Receive a response for block bodies.
|
// Receive a response for block bodies.
|
||||||
fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||||
let req_id = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?;
|
let id_guard = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?;
|
||||||
let raw_bodies: Vec<Bytes> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
|
let raw_bodies: Vec<Bytes> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
|
||||||
|
|
||||||
|
let req_id = id_guard.defuse();
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_block_bodies(&Ctx {
|
handler.on_block_bodies(&Ctx {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
@ -897,12 +918,13 @@ impl LightProtocol {
|
|||||||
|
|
||||||
// Receive a response for receipts.
|
// Receive a response for receipts.
|
||||||
fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||||
let req_id = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?;
|
let id_guard = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?;
|
||||||
let raw_receipts: Vec<Vec<Receipt>> = raw.at(2)?
|
let raw_receipts: Vec<Vec<Receipt>> = raw.at(2)?
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| x.as_val())
|
.map(|x| x.as_val())
|
||||||
.collect::<Result<_,_>>()?;
|
.collect::<Result<_,_>>()?;
|
||||||
|
|
||||||
|
let req_id = id_guard.defuse();
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_receipts(&Ctx {
|
handler.on_receipts(&Ctx {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
@ -970,12 +992,13 @@ impl LightProtocol {
|
|||||||
|
|
||||||
// Receive a response for proofs.
|
// Receive a response for proofs.
|
||||||
fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||||
let req_id = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?;
|
let id_guard = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?;
|
||||||
|
|
||||||
let raw_proofs: Vec<Vec<Bytes>> = raw.at(2)?.iter()
|
let raw_proofs: Vec<Vec<Bytes>> = raw.at(2)?.iter()
|
||||||
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
|
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let req_id = id_guard.defuse();
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_state_proofs(&Ctx {
|
handler.on_state_proofs(&Ctx {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
@ -1041,12 +1064,13 @@ impl LightProtocol {
|
|||||||
|
|
||||||
// Receive a response for contract code.
|
// Receive a response for contract code.
|
||||||
fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
|
||||||
let req_id = self.pre_verify_response(peer, request::Kind::Codes, &raw)?;
|
let id_guard = self.pre_verify_response(peer, request::Kind::Codes, &raw)?;
|
||||||
|
|
||||||
let raw_code: Vec<Bytes> = raw.at(2)?.iter()
|
let raw_code: Vec<Bytes> = raw.at(2)?.iter()
|
||||||
.map(|x| x.as_val())
|
.map(|x| x.as_val())
|
||||||
.collect::<Result<_,_>>()?;
|
.collect::<Result<_,_>>()?;
|
||||||
|
|
||||||
|
let req_id = id_guard.defuse();
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_code(&Ctx {
|
handler.on_code(&Ctx {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
@ -1120,11 +1144,12 @@ impl LightProtocol {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
let req_id = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
|
let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
|
||||||
let raw_proofs: Vec<_> = raw.at(2)?.iter()
|
let raw_proofs: Vec<_> = raw.at(2)?.iter()
|
||||||
.map(decode_res)
|
.map(decode_res)
|
||||||
.collect::<Result<_,_>>()?;
|
.collect::<Result<_,_>>()?;
|
||||||
|
|
||||||
|
let req_id = id_guard.defuse();
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
handler.on_header_proofs(&Ctx {
|
handler.on_header_proofs(&Ctx {
|
||||||
peer: *peer,
|
peer: *peer,
|
||||||
|
140
ethcore/light/src/net/request_set.rs
Normal file
140
ethcore/light/src/net/request_set.rs
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Pending request set.
|
||||||
|
//!
|
||||||
|
//! Stores pending requests and does timeout computation according to the rule
|
||||||
|
//! that only the earliest submitted request within the structure may time out.
|
||||||
|
//!
|
||||||
|
//! Whenever a request becomes the earliest, its timeout period begins at that moment.
|
||||||
|
|
||||||
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
use std::iter::FromIterator;
|
||||||
|
|
||||||
|
use request::{self, Request};
|
||||||
|
use net::{timeout, ReqId};
|
||||||
|
|
||||||
|
use time::{Duration, SteadyTime};
|
||||||
|
|
||||||
|
/// Request set.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RequestSet {
|
||||||
|
counter: u64,
|
||||||
|
base: Option<SteadyTime>,
|
||||||
|
ids: HashMap<ReqId, u64>,
|
||||||
|
reqs: BTreeMap<u64, Request>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RequestSet {
|
||||||
|
fn default() -> Self {
|
||||||
|
RequestSet {
|
||||||
|
counter: 0,
|
||||||
|
base: None,
|
||||||
|
ids: HashMap::new(),
|
||||||
|
reqs: BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestSet {
|
||||||
|
/// Push a request onto the stack.
|
||||||
|
pub fn insert(&mut self, req_id: ReqId, req: Request, now: SteadyTime) {
|
||||||
|
let counter = self.counter;
|
||||||
|
self.ids.insert(req_id, counter);
|
||||||
|
self.reqs.insert(counter, req);
|
||||||
|
|
||||||
|
if self.reqs.keys().next().map_or(true, |x| *x == counter) {
|
||||||
|
self.base = Some(now);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.counter += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a request from the stack.
|
||||||
|
pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option<Request> {
|
||||||
|
let id = match self.ids.remove(&req_id) {
|
||||||
|
Some(id) => id,
|
||||||
|
None => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed");
|
||||||
|
|
||||||
|
match self.reqs.keys().next() {
|
||||||
|
Some(k) if *k > id => self.base = Some(now),
|
||||||
|
None => self.base = None,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check for timeout against the given time. Returns true if
|
||||||
|
/// has timed out, false otherwise.
|
||||||
|
pub fn check_timeout(&self, now: SteadyTime) -> bool {
|
||||||
|
let base = match self.base.as_ref().cloned() {
|
||||||
|
Some(base) => base,
|
||||||
|
None => return false,
|
||||||
|
};
|
||||||
|
|
||||||
|
let kind = self.reqs.values()
|
||||||
|
.next()
|
||||||
|
.map(|r| r.kind())
|
||||||
|
.expect("base time implies `reqs` non-empty; qed");
|
||||||
|
|
||||||
|
let kind_timeout = match kind {
|
||||||
|
request::Kind::Headers => timeout::HEADERS,
|
||||||
|
request::Kind::Bodies => timeout::BODIES,
|
||||||
|
request::Kind::Receipts => timeout::RECEIPTS,
|
||||||
|
request::Kind::StateProofs => timeout::PROOFS,
|
||||||
|
request::Kind::Codes => timeout::CONTRACT_CODES,
|
||||||
|
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
|
||||||
|
};
|
||||||
|
|
||||||
|
base + Duration::milliseconds(kind_timeout) <= now
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collect all pending request ids.
|
||||||
|
pub fn collect_ids<F>(&self) -> F where F: FromIterator<ReqId> {
|
||||||
|
self.ids.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use net::{timeout, ReqId};
|
||||||
|
use request::{Request, Receipts};
|
||||||
|
use time::{SteadyTime, Duration};
|
||||||
|
use super::RequestSet;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn multi_timeout() {
|
||||||
|
let test_begin = SteadyTime::now();
|
||||||
|
let mut req_set = RequestSet::default();
|
||||||
|
|
||||||
|
let the_req = Request::Receipts(Receipts { block_hashes: Vec::new() });
|
||||||
|
req_set.insert(ReqId(0), the_req.clone(), test_begin);
|
||||||
|
req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1));
|
||||||
|
|
||||||
|
assert_eq!(req_set.base, Some(test_begin));
|
||||||
|
|
||||||
|
let test_end = test_begin + Duration::milliseconds(timeout::RECEIPTS);
|
||||||
|
assert!(req_set.check_timeout(test_end));
|
||||||
|
|
||||||
|
req_set.remove(&ReqId(0), test_begin + Duration::seconds(1)).unwrap();
|
||||||
|
assert!(!req_set.check_timeout(test_end));
|
||||||
|
assert!(req_set.check_timeout(test_end + Duration::seconds(1)));
|
||||||
|
}
|
||||||
|
}
|
@ -27,7 +27,7 @@ use network::{PeerId, NodeId};
|
|||||||
use net::buffer_flow::FlowParams;
|
use net::buffer_flow::FlowParams;
|
||||||
use net::context::IoContext;
|
use net::context::IoContext;
|
||||||
use net::status::{Capabilities, Status, write_handshake};
|
use net::status::{Capabilities, Status, write_handshake};
|
||||||
use net::{encode_request, LightProtocol, Params, packet};
|
use net::{encode_request, LightProtocol, Params, packet, Peer};
|
||||||
use provider::Provider;
|
use provider::Provider;
|
||||||
use request::{self, Request, Headers};
|
use request::{self, Request, Headers};
|
||||||
|
|
||||||
@ -487,3 +487,81 @@ fn get_contract_code() {
|
|||||||
let expected = Expect::Respond(packet::CONTRACT_CODES, response);
|
let expected = Expect::Respond(packet::CONTRACT_CODES, response);
|
||||||
proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body);
|
proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn id_guard() {
|
||||||
|
use super::request_set::RequestSet;
|
||||||
|
use super::ReqId;
|
||||||
|
|
||||||
|
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
|
||||||
|
let capabilities = capabilities();
|
||||||
|
|
||||||
|
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
|
||||||
|
|
||||||
|
let req_id_1 = ReqId(5143);
|
||||||
|
let req_id_2 = ReqId(1111);
|
||||||
|
let req = Request::Headers(request::Headers {
|
||||||
|
start: 5u64.into(),
|
||||||
|
max: 100,
|
||||||
|
skip: 0,
|
||||||
|
reverse: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
let peer_id = 9876;
|
||||||
|
|
||||||
|
let mut pending_requests = RequestSet::default();
|
||||||
|
|
||||||
|
pending_requests.insert(req_id_1, req.clone(), ::time::SteadyTime::now());
|
||||||
|
pending_requests.insert(req_id_2, req, ::time::SteadyTime::now());
|
||||||
|
|
||||||
|
proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer {
|
||||||
|
local_buffer: flow_params.create_buffer(),
|
||||||
|
status: status(provider.client.chain_info()),
|
||||||
|
capabilities: capabilities.clone(),
|
||||||
|
remote_flow: Some((flow_params.create_buffer(), flow_params)),
|
||||||
|
sent_head: provider.client.chain_info().best_block_hash,
|
||||||
|
last_update: ::time::SteadyTime::now(),
|
||||||
|
pending_requests: pending_requests,
|
||||||
|
failed_requests: Vec::new(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// first, supply wrong request type.
|
||||||
|
{
|
||||||
|
let mut stream = RlpStream::new_list(3);
|
||||||
|
stream.append(&req_id_1.0);
|
||||||
|
stream.append(&4_000_000usize);
|
||||||
|
stream.begin_list(0);
|
||||||
|
|
||||||
|
let packet = stream.out();
|
||||||
|
assert!(proto.block_bodies(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
// next, do an unexpected response.
|
||||||
|
{
|
||||||
|
let mut stream = RlpStream::new_list(3);
|
||||||
|
stream.append(&10000usize);
|
||||||
|
stream.append(&3_000_000usize);
|
||||||
|
stream.begin_list(0);
|
||||||
|
|
||||||
|
let packet = stream.out();
|
||||||
|
assert!(proto.receipts(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastly, do a valid (but empty) response.
|
||||||
|
{
|
||||||
|
let mut stream = RlpStream::new_list(3);
|
||||||
|
stream.append(&req_id_2.0);
|
||||||
|
stream.append(&3_000_000usize);
|
||||||
|
stream.begin_list(0);
|
||||||
|
|
||||||
|
let packet = stream.out();
|
||||||
|
assert!(proto.block_headers(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
let peers = proto.peers.read();
|
||||||
|
if let Some(ref peer_info) = peers.get(&peer_id) {
|
||||||
|
let peer_info = peer_info.lock();
|
||||||
|
assert!(peer_info.pending_requests.collect_ids::<Vec<_>>().is_empty());
|
||||||
|
assert_eq!(peer_info.failed_requests, &[req_id_1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user