diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 18425152c..c5071cc02 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -40,10 +40,13 @@ use request::{self, HashOrNumber, Request}; use self::buffer_flow::{Buffer, FlowParams}; use self::context::{Ctx, TickCtx}; use self::error::Punishment; +use self::request_set::RequestSet; +use self::id_guard::IdGuard; mod context; mod error; mod status; +mod request_set; #[cfg(test)] mod tests; @@ -121,7 +124,7 @@ mod timeout { } /// A request id. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] pub struct ReqId(usize); impl fmt::Display for ReqId { @@ -137,15 +140,17 @@ struct PendingPeer { last_update: SteadyTime, } -// data about each peer. -struct Peer { +/// Relevant data to each peer. Not accessible publicly, only `pub` due to +/// limitations of the privacy system. +pub struct Peer { local_buffer: Buffer, // their buffer relative to us status: Status, capabilities: Capabilities, remote_flow: Option<(Buffer, FlowParams)>, sent_head: H256, // last chain head we've given them. last_update: SteadyTime, - idle: bool, // make into a current percentage of max buffer being requested? + pending_requests: RequestSet, + failed_requests: Vec, } impl Peer { @@ -209,13 +214,6 @@ pub trait Handler: Send + Sync { fn on_abort(&self) { } } -// a request, the peer who it was made to, and the time it was made. -struct Requested { - request: Request, - timestamp: SteadyTime, - peer_id: PeerId, -} - /// Protocol parameters. pub struct Params { /// Network id. @@ -226,6 +224,57 @@ pub struct Params { pub capabilities: Capabilities, } +/// Type alias for convenience. +pub type PeerMap = HashMap>; + +mod id_guard { + + use network::PeerId; + use util::RwLockReadGuard; + + use super::{PeerMap, ReqId}; + + // Guards success or failure of given request. + // On drop, inserts the req_id into the "failed requests" + // set for the peer unless defused. In separate module to enforce correct usage. + pub struct IdGuard<'a> { + peers: RwLockReadGuard<'a, PeerMap>, + peer_id: PeerId, + req_id: ReqId, + active: bool, + } + + impl<'a> IdGuard<'a> { + /// Create a new `IdGuard`, which will prevent access of the inner ReqId + /// (for forming responses, triggering handlers) until defused + pub fn new(peers: RwLockReadGuard<'a, PeerMap>, peer_id: PeerId, req_id: ReqId) -> Self { + IdGuard { + peers: peers, + peer_id: peer_id, + req_id: req_id, + active: true, + } + } + + /// Defuse the guard, signalling that the request has been successfully decoded. + pub fn defuse(mut self) -> ReqId { + // can't use the mem::forget trick here since we need the + // read guard to drop. + self.active = false; + self.req_id + } + } + + impl<'a> Drop for IdGuard<'a> { + fn drop(&mut self) { + if !self.active { return } + if let Some(p) = self.peers.get(&self.peer_id) { + p.lock().failed_requests.push(self.req_id); + } + } + } +} + /// This is an implementation of the light ethereum network protocol, abstracted /// over a `Provider` of data and a p2p network. /// @@ -241,8 +290,7 @@ pub struct LightProtocol { genesis_hash: H256, network_id: u64, pending_peers: RwLock>, - peers: RwLock>>, - pending_requests: RwLock>, + peers: RwLock, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. handlers: Vec>, @@ -261,7 +309,6 @@ impl LightProtocol { network_id: params.network_id, pending_peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()), - pending_requests: RwLock::new(HashMap::new()), capabilities: RwLock::new(params.capabilities), flow_params: params.flow_params, handlers: Vec::new(), @@ -275,16 +322,10 @@ impl LightProtocol { fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize { self.peers.read().get(&peer).and_then(|peer| { let mut peer = peer.lock(); - let idle = peer.idle; match peer.remote_flow { Some((ref mut buf, ref flow)) => { flow.recharge(buf); - - if !idle { - Some(0) - } else { - Some(flow.max_amount(&*buf, kind)) - } + Some(flow.max_amount(&*buf, kind)) } None => None, } @@ -302,8 +343,6 @@ impl LightProtocol { let peer = peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)?; let mut peer = peer.lock(); - if !peer.idle { return Err(Error::Overburdened) } - match peer.remote_flow { Some((ref mut buf, ref flow)) => { flow.recharge(buf); @@ -329,12 +368,7 @@ impl LightProtocol { io.send(*peer_id, packet_id, packet_data); - peer.idle = false; - self.pending_requests.write().insert(req_id, Requested { - request: request, - timestamp: SteadyTime::now(), - peer_id: *peer_id, - }); + peer.pending_requests.insert(ReqId(req_id), request, SteadyTime::now()); Ok(ReqId(req_id)) } @@ -402,11 +436,9 @@ impl LightProtocol { // acquire in order and hold. let mut pending_peers = self.pending_peers.write(); let mut peers = self.peers.write(); - let mut pending_requests = self.pending_requests.write(); pending_peers.clear(); peers.clear(); - pending_requests.clear(); } // Does the common pre-verification of responses before the response itself @@ -414,37 +446,49 @@ impl LightProtocol { // - check whether peer exists // - check whether request was made // - check whether request kinds match - fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result { - let req_id: usize = raw.val_at(0)?; + fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result { + let req_id = ReqId(raw.val_at(0)?); let cur_buffer: U256 = raw.val_at(1)?; trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind); - match self.pending_requests.write().remove(&req_id) { - None => return Err(Error::UnsolicitedResponse), - Some(requested) => { - if requested.peer_id != *peer || requested.request.kind() != kind { - return Err(Error::UnsolicitedResponse) - } - } - } - + let mut had_req = false; let peers = self.peers.read(); - match peers.get(peer) { + let maybe_err = match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); - peer_info.idle = true; + let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now()); + let flow_info = peer_info.remote_flow.as_mut(); - match peer_info.remote_flow.as_mut() { - Some(&mut (ref mut buf, ref mut flow)) => { + match (req_info, flow_info) { + (Some(request), Some(flow_info)) => { + had_req = true; + + let &mut (ref mut buf, ref mut flow) = flow_info; let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit()); - buf.update_to(actual_buffer) + buf.update_to(actual_buffer); + + if request.kind() != kind { + Some(Error::UnsolicitedResponse) + } else { + None + } } - None => return Err(Error::NotServer), // this really should be impossible. + (None, _) => Some(Error::UnsolicitedResponse), + (_, None) => Some(Error::NotServer), // really should be impossible. } - Ok(ReqId(req_id)) } - None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. + None => Some(Error::UnknownPeer), // probably only occurs in a race of some kind. + }; + + if had_req { + let id_guard = IdGuard::new(peers, *peer, req_id); + match maybe_err { + Some(err) => Err(err), + None => Ok(id_guard) + } + } else { + Err(maybe_err.expect("every branch without a request leads to error; qed")) } } @@ -491,7 +535,39 @@ impl LightProtocol { } } - /// called when a peer connects. + // check timeouts and punish peers. + fn timeout_check(&self, io: &IoContext) { + let now = SteadyTime::now(); + + // handshake timeout + { + let mut pending = self.pending_peers.write(); + let slowpokes: Vec<_> = pending.iter() + .filter(|&(_, ref peer)| { + peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now + }) + .map(|(&p, _)| p) + .collect(); + + for slowpoke in slowpokes { + debug!(target: "les", "Peer {} handshake timed out", slowpoke); + pending.remove(&slowpoke); + io.disconnect_peer(slowpoke); + } + } + + // request timeouts + { + for (peer_id, peer) in self.peers.read().iter() { + if peer.lock().pending_requests.check_timeout(now) { + debug!(target: "les", "Peer {} request timeout", peer_id); + io.disconnect_peer(*peer_id); + } + } + } + } + + /// called when a peer connects. pub fn on_connect(&self, peer: &PeerId, io: &IoContext) { let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) { Ok(pv) => pv, @@ -530,21 +606,11 @@ impl LightProtocol { pub fn on_disconnect(&self, peer: PeerId, io: &IoContext) { trace!(target: "les", "Peer {} disconnecting", peer); - self.pending_peers.write().remove(&peer); - if self.peers.write().remove(&peer).is_some() { - let unfulfilled: Vec<_> = self.pending_requests.read() - .iter() - .filter(|&(_, r)| r.peer_id == peer) - .map(|(&id, _)| ReqId(id)) - .collect(); - - { - let mut pending = self.pending_requests.write(); - for &ReqId(ref inner) in &unfulfilled { - pending.remove(inner); - } - } + if let Some(peer_info) = self.peers.write().remove(&peer) { + let peer_info = peer_info.into_inner(); + let mut unfulfilled: Vec<_> = peer_info.pending_requests.collect_ids(); + unfulfilled.extend(peer_info.failed_requests); for handler in &self.handlers { handler.on_disconnect(&Ctx { @@ -556,54 +622,6 @@ impl LightProtocol { } } - // check timeouts and punish peers. - fn timeout_check(&self, io: &IoContext) { - let now = SteadyTime::now(); - - // handshake timeout - { - let mut pending = self.pending_peers.write(); - let slowpokes: Vec<_> = pending.iter() - .filter(|&(_, ref peer)| { - peer.last_update + Duration::milliseconds(timeout::HANDSHAKE) <= now - }) - .map(|(&p, _)| p) - .collect(); - - for slowpoke in slowpokes { - debug!(target: "les", "Peer {} handshake timed out", slowpoke); - pending.remove(&slowpoke); - io.disconnect_peer(slowpoke); - } - } - - // request timeouts - { - for r in self.pending_requests.read().values() { - let kind_timeout = match r.request.kind() { - request::Kind::Headers => timeout::HEADERS, - request::Kind::Bodies => timeout::BODIES, - request::Kind::Receipts => timeout::RECEIPTS, - request::Kind::StateProofs => timeout::PROOFS, - request::Kind::Codes => timeout::CONTRACT_CODES, - request::Kind::HeaderProofs => timeout::HEADER_PROOFS, - }; - - if r.timestamp + Duration::milliseconds(kind_timeout) <= now { - debug!(target: "les", "Request for {:?} from peer {} timed out", - r.request.kind(), r.peer_id); - - // keep the request in the `pending` set for now so - // on_disconnect will pass unfulfilled ReqIds to handlers. - // in the case that a response is received after this, the - // disconnect won't be cancelled but the ReqId won't be - // marked as abandoned. - io.disconnect_peer(r.peer_id); - } - } - } - } - /// Execute the given closure with a basic context derived from the I/O context. pub fn with_context(&self, io: &IoContext, f: F) -> T where F: FnOnce(&BasicContext) -> T @@ -655,7 +673,8 @@ impl LightProtocol { remote_flow: remote_flow, sent_head: pending.sent_head, last_update: pending.last_update, - idle: true, + pending_requests: RequestSet::default(), + failed_requests: Vec::new(), })); for handler in &self.handlers { @@ -770,9 +789,10 @@ impl LightProtocol { // Receive a response for block headers. fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Headers, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Headers, &raw)?; let raw_headers: Vec<_> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect(); + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_block_headers(&Ctx { peer: *peer, @@ -835,9 +855,10 @@ impl LightProtocol { // Receive a response for block bodies. fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?; let raw_bodies: Vec = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect(); + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_block_bodies(&Ctx { peer: *peer, @@ -897,12 +918,13 @@ impl LightProtocol { // Receive a response for receipts. fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?; let raw_receipts: Vec> = raw.at(2)? .iter() .map(|x| x.as_val()) .collect::>()?; + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_receipts(&Ctx { peer: *peer, @@ -970,12 +992,13 @@ impl LightProtocol { // Receive a response for proofs. fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?; let raw_proofs: Vec> = raw.at(2)?.iter() .map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect()) .collect(); + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_state_proofs(&Ctx { peer: *peer, @@ -1041,12 +1064,13 @@ impl LightProtocol { // Receive a response for contract code. fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Codes, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Codes, &raw)?; let raw_code: Vec = raw.at(2)?.iter() .map(|x| x.as_val()) .collect::>()?; + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_code(&Ctx { peer: *peer, @@ -1120,11 +1144,12 @@ impl LightProtocol { )) } - let req_id = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?; let raw_proofs: Vec<_> = raw.at(2)?.iter() .map(decode_res) .collect::>()?; + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_header_proofs(&Ctx { peer: *peer, diff --git a/ethcore/light/src/net/request_set.rs b/ethcore/light/src/net/request_set.rs new file mode 100644 index 000000000..506affa63 --- /dev/null +++ b/ethcore/light/src/net/request_set.rs @@ -0,0 +1,140 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Pending request set. +//! +//! Stores pending requests and does timeout computation according to the rule +//! that only the earliest submitted request within the structure may time out. +//! +//! Whenever a request becomes the earliest, its timeout period begins at that moment. + +use std::collections::{BTreeMap, HashMap}; +use std::iter::FromIterator; + +use request::{self, Request}; +use net::{timeout, ReqId}; + +use time::{Duration, SteadyTime}; + +/// Request set. +#[derive(Debug)] +pub struct RequestSet { + counter: u64, + base: Option, + ids: HashMap, + reqs: BTreeMap, +} + +impl Default for RequestSet { + fn default() -> Self { + RequestSet { + counter: 0, + base: None, + ids: HashMap::new(), + reqs: BTreeMap::new(), + } + } +} + +impl RequestSet { + /// Push a request onto the stack. + pub fn insert(&mut self, req_id: ReqId, req: Request, now: SteadyTime) { + let counter = self.counter; + self.ids.insert(req_id, counter); + self.reqs.insert(counter, req); + + if self.reqs.keys().next().map_or(true, |x| *x == counter) { + self.base = Some(now); + } + + self.counter += 1; + } + + /// Remove a request from the stack. + pub fn remove(&mut self, req_id: &ReqId, now: SteadyTime) -> Option { + let id = match self.ids.remove(&req_id) { + Some(id) => id, + None => return None, + }; + + let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed"); + + match self.reqs.keys().next() { + Some(k) if *k > id => self.base = Some(now), + None => self.base = None, + _ => {} + } + + Some(req) + } + + /// Check for timeout against the given time. Returns true if + /// has timed out, false otherwise. + pub fn check_timeout(&self, now: SteadyTime) -> bool { + let base = match self.base.as_ref().cloned() { + Some(base) => base, + None => return false, + }; + + let kind = self.reqs.values() + .next() + .map(|r| r.kind()) + .expect("base time implies `reqs` non-empty; qed"); + + let kind_timeout = match kind { + request::Kind::Headers => timeout::HEADERS, + request::Kind::Bodies => timeout::BODIES, + request::Kind::Receipts => timeout::RECEIPTS, + request::Kind::StateProofs => timeout::PROOFS, + request::Kind::Codes => timeout::CONTRACT_CODES, + request::Kind::HeaderProofs => timeout::HEADER_PROOFS, + }; + + base + Duration::milliseconds(kind_timeout) <= now + } + + /// Collect all pending request ids. + pub fn collect_ids(&self) -> F where F: FromIterator { + self.ids.keys().cloned().collect() + } +} + +#[cfg(test)] +mod tests { + use net::{timeout, ReqId}; + use request::{Request, Receipts}; + use time::{SteadyTime, Duration}; + use super::RequestSet; + + #[test] + fn multi_timeout() { + let test_begin = SteadyTime::now(); + let mut req_set = RequestSet::default(); + + let the_req = Request::Receipts(Receipts { block_hashes: Vec::new() }); + req_set.insert(ReqId(0), the_req.clone(), test_begin); + req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1)); + + assert_eq!(req_set.base, Some(test_begin)); + + let test_end = test_begin + Duration::milliseconds(timeout::RECEIPTS); + assert!(req_set.check_timeout(test_end)); + + req_set.remove(&ReqId(0), test_begin + Duration::seconds(1)).unwrap(); + assert!(!req_set.check_timeout(test_end)); + assert!(req_set.check_timeout(test_end + Duration::seconds(1))); + } +} diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index a0a9feee4..8a55c5ee6 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -27,7 +27,7 @@ use network::{PeerId, NodeId}; use net::buffer_flow::FlowParams; use net::context::IoContext; use net::status::{Capabilities, Status, write_handshake}; -use net::{encode_request, LightProtocol, Params, packet}; +use net::{encode_request, LightProtocol, Params, packet, Peer}; use provider::Provider; use request::{self, Request, Headers}; @@ -487,3 +487,81 @@ fn get_contract_code() { let expected = Expect::Respond(packet::CONTRACT_CODES, response); proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body); } + +#[test] +fn id_guard() { + use super::request_set::RequestSet; + use super::ReqId; + + let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); + let capabilities = capabilities(); + + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + + let req_id_1 = ReqId(5143); + let req_id_2 = ReqId(1111); + let req = Request::Headers(request::Headers { + start: 5u64.into(), + max: 100, + skip: 0, + reverse: false, + }); + + let peer_id = 9876; + + let mut pending_requests = RequestSet::default(); + + pending_requests.insert(req_id_1, req.clone(), ::time::SteadyTime::now()); + pending_requests.insert(req_id_2, req, ::time::SteadyTime::now()); + + proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer { + local_buffer: flow_params.create_buffer(), + status: status(provider.client.chain_info()), + capabilities: capabilities.clone(), + remote_flow: Some((flow_params.create_buffer(), flow_params)), + sent_head: provider.client.chain_info().best_block_hash, + last_update: ::time::SteadyTime::now(), + pending_requests: pending_requests, + failed_requests: Vec::new(), + })); + + // first, supply wrong request type. + { + let mut stream = RlpStream::new_list(3); + stream.append(&req_id_1.0); + stream.append(&4_000_000usize); + stream.begin_list(0); + + let packet = stream.out(); + assert!(proto.block_bodies(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err()); + } + + // next, do an unexpected response. + { + let mut stream = RlpStream::new_list(3); + stream.append(&10000usize); + stream.append(&3_000_000usize); + stream.begin_list(0); + + let packet = stream.out(); + assert!(proto.receipts(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_err()); + } + + // lastly, do a valid (but empty) response. + { + let mut stream = RlpStream::new_list(3); + stream.append(&req_id_2.0); + stream.append(&3_000_000usize); + stream.begin_list(0); + + let packet = stream.out(); + assert!(proto.block_headers(&peer_id, &Expect::Nothing, UntrustedRlp::new(&packet)).is_ok()); + } + + let peers = proto.peers.read(); + if let Some(ref peer_info) = peers.get(&peer_id) { + let peer_info = peer_info.lock(); + assert!(peer_info.pending_requests.collect_ids::>().is_empty()); + assert_eq!(peer_info.failed_requests, &[req_id_1]); + } +}