use request set to provide better timeouts
This commit is contained in:
parent
0b9a0b138b
commit
04282be721
@ -39,11 +39,13 @@ use request::{self, HashOrNumber, Request};
|
||||
use self::buffer_flow::{Buffer, FlowParams};
|
||||
use self::context::{Ctx, TickCtx};
|
||||
use self::error::Punishment;
|
||||
use self::request_set::RequestSet;
|
||||
|
||||
mod buffer_flow;
|
||||
mod context;
|
||||
mod error;
|
||||
mod status;
|
||||
mod request_set;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@ -119,7 +121,7 @@ mod timeout {
|
||||
}
|
||||
|
||||
/// A request id.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
|
||||
pub struct ReqId(usize);
|
||||
|
||||
// A pending peer: one we've sent our status to but
|
||||
@ -137,6 +139,7 @@ struct Peer {
|
||||
remote_flow: Option<(Buffer, FlowParams)>,
|
||||
sent_head: H256, // last chain head we've given them.
|
||||
last_update: SteadyTime,
|
||||
pending_requests: RequestSet,
|
||||
idle: bool, // make into a current percentage of max buffer being requested?
|
||||
}
|
||||
|
||||
@ -201,13 +204,6 @@ pub trait Handler: Send + Sync {
|
||||
fn on_abort(&self) { }
|
||||
}
|
||||
|
||||
// a request, the peer who it was made to, and the time it was made.
|
||||
struct Requested {
|
||||
request: Request,
|
||||
timestamp: SteadyTime,
|
||||
peer_id: PeerId,
|
||||
}
|
||||
|
||||
/// Protocol parameters.
|
||||
pub struct Params {
|
||||
/// Network id.
|
||||
@ -234,7 +230,6 @@ pub struct LightProtocol {
|
||||
network_id: u64,
|
||||
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
|
||||
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
|
||||
pending_requests: RwLock<HashMap<usize, Requested>>,
|
||||
capabilities: RwLock<Capabilities>,
|
||||
flow_params: FlowParams, // assumed static and same for every peer.
|
||||
handlers: Vec<Box<Handler>>,
|
||||
@ -253,7 +248,6 @@ impl LightProtocol {
|
||||
network_id: params.network_id,
|
||||
pending_peers: RwLock::new(HashMap::new()),
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
pending_requests: RwLock::new(HashMap::new()),
|
||||
capabilities: RwLock::new(params.capabilities),
|
||||
flow_params: params.flow_params,
|
||||
handlers: Vec::new(),
|
||||
@ -322,11 +316,7 @@ impl LightProtocol {
|
||||
io.send(*peer_id, packet_id, packet_data);
|
||||
|
||||
peer.idle = false;
|
||||
self.pending_requests.write().insert(req_id, Requested {
|
||||
request: request,
|
||||
timestamp: SteadyTime::now(),
|
||||
peer_id: *peer_id,
|
||||
});
|
||||
peer.pending_requests.insert(ReqId(req_id), request, SteadyTime::now());
|
||||
|
||||
Ok(ReqId(req_id))
|
||||
}
|
||||
@ -394,11 +384,9 @@ impl LightProtocol {
|
||||
// acquire in order and hold.
|
||||
let mut pending_peers = self.pending_peers.write();
|
||||
let mut peers = self.peers.write();
|
||||
let mut pending_requests = self.pending_requests.write();
|
||||
|
||||
pending_peers.clear();
|
||||
peers.clear();
|
||||
pending_requests.clear();
|
||||
}
|
||||
|
||||
// Does the common pre-verification of responses before the response itself
|
||||
@ -407,26 +395,26 @@ impl LightProtocol {
|
||||
// - check whether request was made
|
||||
// - check whether request kinds match
|
||||
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<ReqId, Error> {
|
||||
let req_id: usize = raw.val_at(0)?;
|
||||
let req_id = ReqId(raw.val_at(0)?);
|
||||
let cur_buffer: U256 = raw.val_at(1)?;
|
||||
|
||||
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
|
||||
|
||||
match self.pending_requests.write().remove(&req_id) {
|
||||
None => return Err(Error::UnsolicitedResponse),
|
||||
Some(requested) => {
|
||||
if requested.peer_id != *peer || requested.request.kind() != kind {
|
||||
return Err(Error::UnsolicitedResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let peers = self.peers.read();
|
||||
match peers.get(peer) {
|
||||
Some(peer_info) => {
|
||||
let mut peer_info = peer_info.lock();
|
||||
peer_info.idle = true;
|
||||
|
||||
match peer_info.pending_requests.remove(&req_id, SteadyTime::now()) {
|
||||
None => return Err(Error::UnsolicitedResponse),
|
||||
Some(request) => {
|
||||
if request.kind() != kind {
|
||||
return Err(Error::UnsolicitedResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match peer_info.remote_flow.as_mut() {
|
||||
Some(&mut (ref mut buf, ref mut flow)) => {
|
||||
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
|
||||
@ -434,7 +422,7 @@ impl LightProtocol {
|
||||
}
|
||||
None => return Err(Error::NotServer), // this really should be impossible.
|
||||
}
|
||||
Ok(ReqId(req_id))
|
||||
Ok(req_id)
|
||||
}
|
||||
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
|
||||
}
|
||||
@ -504,26 +492,10 @@ impl LightProtocol {
|
||||
|
||||
// request timeouts
|
||||
{
|
||||
for r in self.pending_requests.read().values() {
|
||||
let kind_timeout = match r.request.kind() {
|
||||
request::Kind::Headers => timeout::HEADERS,
|
||||
request::Kind::Bodies => timeout::BODIES,
|
||||
request::Kind::Receipts => timeout::RECEIPTS,
|
||||
request::Kind::StateProofs => timeout::PROOFS,
|
||||
request::Kind::Codes => timeout::CONTRACT_CODES,
|
||||
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
|
||||
};
|
||||
|
||||
if r.timestamp + Duration::milliseconds(kind_timeout) <= now {
|
||||
debug!(target: "les", "Request for {:?} from peer {} timed out",
|
||||
r.request.kind(), r.peer_id);
|
||||
|
||||
// keep the request in the `pending` set for now so
|
||||
// on_disconnect will pass unfulfilled ReqIds to handlers.
|
||||
// in the case that a response is received after this, the
|
||||
// disconnect won't be cancelled but the ReqId won't be
|
||||
// marked as abandoned.
|
||||
io.disconnect_peer(r.peer_id);
|
||||
for (peer_id, peer) in self.peers.read().iter() {
|
||||
if peer.lock().pending_requests.check_timeout(now) {
|
||||
debug!(target: "les", "Peer {} request timeout", peer_id);
|
||||
io.disconnect_peer(*peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -579,21 +551,9 @@ impl LightProtocol {
|
||||
fn on_disconnect(&self, peer: PeerId, io: &IoContext) {
|
||||
trace!(target: "les", "Peer {} disconnecting", peer);
|
||||
|
||||
|
||||
self.pending_peers.write().remove(&peer);
|
||||
if self.peers.write().remove(&peer).is_some() {
|
||||
let unfulfilled: Vec<_> = self.pending_requests.read()
|
||||
.iter()
|
||||
.filter(|&(_, r)| r.peer_id == peer)
|
||||
.map(|(&id, _)| ReqId(id))
|
||||
.collect();
|
||||
|
||||
{
|
||||
let mut pending = self.pending_requests.write();
|
||||
for &ReqId(ref inner) in &unfulfilled {
|
||||
pending.remove(inner);
|
||||
}
|
||||
}
|
||||
if let Some(peer_info) = self.peers.write().remove(&peer) {
|
||||
let unfulfilled: Vec<_> = peer_info.into_inner().pending_requests.collect_ids();
|
||||
|
||||
for handler in &self.handlers {
|
||||
handler.on_disconnect(&Ctx {
|
||||
@ -635,6 +595,7 @@ impl LightProtocol {
|
||||
remote_flow: remote_flow,
|
||||
sent_head: pending.sent_head,
|
||||
last_update: pending.last_update,
|
||||
pending_requests: RequestSet::default(),
|
||||
idle: true,
|
||||
}));
|
||||
|
||||
|
113
ethcore/light/src/net/request_set.rs
Normal file
113
ethcore/light/src/net/request_set.rs
Normal file
@ -0,0 +1,113 @@
|
||||
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Pending request set.
|
||||
//!
|
||||
//! Stores pending requests and does timeout computation according to the rule
|
||||
//! that only the earliest submitted request within the structure may time out.
|
||||
//!
|
||||
//! Whenever a request becomes the earliest, its timeout period begins at that moment.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::iter::FromIterator;
|
||||
|
||||
use request::{self, Request};
|
||||
use net::{timeout, ReqId};
|
||||
|
||||
use time::{Duration, SteadyTime};
|
||||
|
||||
/// Request set.
|
||||
#[derive(Debug)]
|
||||
pub struct RequestSet {
|
||||
counter: u64,
|
||||
base: Option<SteadyTime>,
|
||||
ids: HashMap<ReqId, u64>,
|
||||
reqs: BTreeMap<u64, Request>,
|
||||
}
|
||||
|
||||
impl Default for RequestSet {
|
||||
fn default() -> Self {
|
||||
RequestSet {
|
||||
counter: 0,
|
||||
base: None,
|
||||
ids: HashMap::new(),
|
||||
reqs: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestSet {
|
||||
/// Push a request onto the stack.
|
||||
pub fn insert(&mut self, req_id: ReqId, req: Request, now: SteadyTime) {
|
||||
let counter = self.counter;
|
||||
self.ids.insert(req_id, counter);
|
||||
self.reqs.insert(counter, req);
|
||||
|
||||
if self.reqs.keys().next().map_or(true, |x| *x == counter) {
|
||||
self.base = Some(now);
|
||||
}
|
||||
|
||||
self.counter += 1;
|
||||
}
|
||||
|
||||
/// Remove a request from the stack.
|
||||
pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option<Request> {
|
||||
let id = match self.ids.remove(&req_id) {
|
||||
Some(id) => id,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed");
|
||||
|
||||
match self.reqs.keys().next() {
|
||||
Some(k) if *k > id => self.base = Some(now),
|
||||
None => self.base = None,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Some(req)
|
||||
}
|
||||
|
||||
/// Check for timeout against the given time. Returns true if
|
||||
/// has timed out, false otherwise.
|
||||
pub fn check_timeout(&self, now: SteadyTime) -> bool {
|
||||
let base = match self.base.as_ref().cloned() {
|
||||
Some(base) => base,
|
||||
None => return false,
|
||||
};
|
||||
|
||||
let kind = self.reqs.values()
|
||||
.next()
|
||||
.map(|r| r.kind())
|
||||
.expect("base time implies `reqs` non-empty; qed");
|
||||
|
||||
let kind_timeout = match kind {
|
||||
request::Kind::Headers => timeout::HEADERS,
|
||||
request::Kind::Bodies => timeout::BODIES,
|
||||
request::Kind::Receipts => timeout::RECEIPTS,
|
||||
request::Kind::StateProofs => timeout::PROOFS,
|
||||
request::Kind::Codes => timeout::CONTRACT_CODES,
|
||||
request::Kind::HeaderProofs => timeout::HEADER_PROOFS,
|
||||
};
|
||||
|
||||
base + Duration::milliseconds(kind_timeout) <= now
|
||||
}
|
||||
|
||||
/// Collect all pending request ids.
|
||||
pub fn collect_ids<F>(&self) -> F where F: FromIterator<ReqId> {
|
||||
self.ids.keys().cloned().collect()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user