diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 20073c0af..fd9295eae 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -39,11 +39,13 @@ use request::{self, HashOrNumber, Request}; use self::buffer_flow::{Buffer, FlowParams}; use self::context::{Ctx, TickCtx}; use self::error::Punishment; +use self::request_set::RequestSet; mod buffer_flow; mod context; mod error; mod status; +mod request_set; #[cfg(test)] mod tests; @@ -119,7 +121,7 @@ mod timeout { } /// A request id. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] pub struct ReqId(usize); // A pending peer: one we've sent our status to but @@ -137,6 +139,7 @@ struct Peer { remote_flow: Option<(Buffer, FlowParams)>, sent_head: H256, // last chain head we've given them. last_update: SteadyTime, + pending_requests: RequestSet, idle: bool, // make into a current percentage of max buffer being requested? } @@ -201,13 +204,6 @@ pub trait Handler: Send + Sync { fn on_abort(&self) { } } -// a request, the peer who it was made to, and the time it was made. -struct Requested { - request: Request, - timestamp: SteadyTime, - peer_id: PeerId, -} - /// Protocol parameters. pub struct Params { /// Network id. @@ -234,7 +230,6 @@ pub struct LightProtocol { network_id: u64, pending_peers: RwLock>, peers: RwLock>>, - pending_requests: RwLock>, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. handlers: Vec>, @@ -253,7 +248,6 @@ impl LightProtocol { network_id: params.network_id, pending_peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()), - pending_requests: RwLock::new(HashMap::new()), capabilities: RwLock::new(params.capabilities), flow_params: params.flow_params, handlers: Vec::new(), @@ -322,11 +316,7 @@ impl LightProtocol { io.send(*peer_id, packet_id, packet_data); peer.idle = false; - self.pending_requests.write().insert(req_id, Requested { - request: request, - timestamp: SteadyTime::now(), - peer_id: *peer_id, - }); + peer.pending_requests.insert(ReqId(req_id), request, SteadyTime::now()); Ok(ReqId(req_id)) } @@ -394,11 +384,9 @@ impl LightProtocol { // acquire in order and hold. let mut pending_peers = self.pending_peers.write(); let mut peers = self.peers.write(); - let mut pending_requests = self.pending_requests.write(); pending_peers.clear(); peers.clear(); - pending_requests.clear(); } // Does the common pre-verification of responses before the response itself @@ -407,26 +395,26 @@ impl LightProtocol { // - check whether request was made // - check whether request kinds match fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result { - let req_id: usize = raw.val_at(0)?; + let req_id = ReqId(raw.val_at(0)?); let cur_buffer: U256 = raw.val_at(1)?; trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind); - match self.pending_requests.write().remove(&req_id) { - None => return Err(Error::UnsolicitedResponse), - Some(requested) => { - if requested.peer_id != *peer || requested.request.kind() != kind { - return Err(Error::UnsolicitedResponse) - } - } - } - let peers = self.peers.read(); match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); peer_info.idle = true; + match peer_info.pending_requests.remove(&req_id, SteadyTime::now()) { + None => return Err(Error::UnsolicitedResponse), + Some(request) => { + if request.kind() != kind { + return Err(Error::UnsolicitedResponse) + } + } + } + match peer_info.remote_flow.as_mut() { Some(&mut (ref mut buf, ref mut flow)) => { let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit()); @@ -434,7 +422,7 @@ impl LightProtocol { } None => return Err(Error::NotServer), // this really should be impossible. } - Ok(ReqId(req_id)) + Ok(req_id) } None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. } @@ -504,26 +492,10 @@ impl LightProtocol { // request timeouts { - for r in self.pending_requests.read().values() { - let kind_timeout = match r.request.kind() { - request::Kind::Headers => timeout::HEADERS, - request::Kind::Bodies => timeout::BODIES, - request::Kind::Receipts => timeout::RECEIPTS, - request::Kind::StateProofs => timeout::PROOFS, - request::Kind::Codes => timeout::CONTRACT_CODES, - request::Kind::HeaderProofs => timeout::HEADER_PROOFS, - }; - - if r.timestamp + Duration::milliseconds(kind_timeout) <= now { - debug!(target: "les", "Request for {:?} from peer {} timed out", - r.request.kind(), r.peer_id); - - // keep the request in the `pending` set for now so - // on_disconnect will pass unfulfilled ReqIds to handlers. - // in the case that a response is received after this, the - // disconnect won't be cancelled but the ReqId won't be - // marked as abandoned. - io.disconnect_peer(r.peer_id); + for (peer_id, peer) in self.peers.read().iter() { + if peer.lock().pending_requests.check_timeout(now) { + debug!(target: "les", "Peer {} request timeout", peer_id); + io.disconnect_peer(*peer_id); } } } @@ -579,21 +551,9 @@ impl LightProtocol { fn on_disconnect(&self, peer: PeerId, io: &IoContext) { trace!(target: "les", "Peer {} disconnecting", peer); - self.pending_peers.write().remove(&peer); - if self.peers.write().remove(&peer).is_some() { - let unfulfilled: Vec<_> = self.pending_requests.read() - .iter() - .filter(|&(_, r)| r.peer_id == peer) - .map(|(&id, _)| ReqId(id)) - .collect(); - - { - let mut pending = self.pending_requests.write(); - for &ReqId(ref inner) in &unfulfilled { - pending.remove(inner); - } - } + if let Some(peer_info) = self.peers.write().remove(&peer) { + let unfulfilled: Vec<_> = peer_info.into_inner().pending_requests.collect_ids(); for handler in &self.handlers { handler.on_disconnect(&Ctx { @@ -635,6 +595,7 @@ impl LightProtocol { remote_flow: remote_flow, sent_head: pending.sent_head, last_update: pending.last_update, + pending_requests: RequestSet::default(), idle: true, })); diff --git a/ethcore/light/src/net/request_set.rs b/ethcore/light/src/net/request_set.rs new file mode 100644 index 000000000..409f0c113 --- /dev/null +++ b/ethcore/light/src/net/request_set.rs @@ -0,0 +1,113 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Pending request set. +//! +//! Stores pending requests and does timeout computation according to the rule +//! that only the earliest submitted request within the structure may time out. +//! +//! Whenever a request becomes the earliest, its timeout period begins at that moment. + +use std::collections::{BTreeMap, HashMap}; +use std::iter::FromIterator; + +use request::{self, Request}; +use net::{timeout, ReqId}; + +use time::{Duration, SteadyTime}; + +/// Request set. +#[derive(Debug)] +pub struct RequestSet { + counter: u64, + base: Option, + ids: HashMap, + reqs: BTreeMap, +} + +impl Default for RequestSet { + fn default() -> Self { + RequestSet { + counter: 0, + base: None, + ids: HashMap::new(), + reqs: BTreeMap::new(), + } + } +} + +impl RequestSet { + /// Push a request onto the stack. + pub fn insert(&mut self, req_id: ReqId, req: Request, now: SteadyTime) { + let counter = self.counter; + self.ids.insert(req_id, counter); + self.reqs.insert(counter, req); + + if self.reqs.keys().next().map_or(true, |x| *x == counter) { + self.base = Some(now); + } + + self.counter += 1; + } + + /// Remove a request from the stack. + pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option { + let id = match self.ids.remove(&req_id) { + Some(id) => id, + None => return None, + }; + + let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed"); + + match self.reqs.keys().next() { + Some(k) if *k > id => self.base = Some(now), + None => self.base = None, + _ => {} + } + + Some(req) + } + + /// Check for timeout against the given time. Returns true if + /// has timed out, false otherwise. + pub fn check_timeout(&self, now: SteadyTime) -> bool { + let base = match self.base.as_ref().cloned() { + Some(base) => base, + None => return false, + }; + + let kind = self.reqs.values() + .next() + .map(|r| r.kind()) + .expect("base time implies `reqs` non-empty; qed"); + + let kind_timeout = match kind { + request::Kind::Headers => timeout::HEADERS, + request::Kind::Bodies => timeout::BODIES, + request::Kind::Receipts => timeout::RECEIPTS, + request::Kind::StateProofs => timeout::PROOFS, + request::Kind::Codes => timeout::CONTRACT_CODES, + request::Kind::HeaderProofs => timeout::HEADER_PROOFS, + }; + + base + Duration::milliseconds(kind_timeout) <= now + } + + /// Collect all pending request ids. + pub fn collect_ids(&self) -> F where F: FromIterator { + self.ids.keys().cloned().collect() + } +}