From 54058e3712f65dbb5931b1974005586123067614 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 9 Jan 2017 11:29:06 +0100 Subject: [PATCH] ensure failed requests considered unfulfilled --- ethcore/light/src/net/mod.rs | 128 ++++++++++++++++++++++++++++------- 1 file changed, 102 insertions(+), 26 deletions(-) diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index e92e5f475..654e87a76 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -40,6 +40,7 @@ use self::buffer_flow::{Buffer, FlowParams}; use self::context::{Ctx, TickCtx}; use self::error::Punishment; use self::request_set::RequestSet; +use self::id_guard::IdGuard; mod buffer_flow; mod context; @@ -131,8 +132,9 @@ struct PendingPeer { last_update: SteadyTime, } -// data about each peer. -struct Peer { +/// Relevant data to each peer. Not accessible publicly, only `pub` due to +/// limitations of the privacy system. +pub struct Peer { local_buffer: Buffer, // their buffer relative to us status: Status, capabilities: Capabilities, @@ -140,6 +142,7 @@ struct Peer { sent_head: H256, // last chain head we've given them. last_update: SteadyTime, pending_requests: RequestSet, + failed_requests: Vec, } impl Peer { @@ -213,6 +216,57 @@ pub struct Params { pub capabilities: Capabilities, } +/// Type alias for convenience. +pub type PeerMap = HashMap>; + +mod id_guard { + + use network::PeerId; + use util::RwLockReadGuard; + + use super::{PeerMap, ReqId}; + + // Guards success or failure of given request. + // On drop, inserts the req_id into the "failed requests" + // set for the peer unless defused. In separate module to enforce correct usage. + pub struct IdGuard<'a> { + peers: RwLockReadGuard<'a, PeerMap>, + peer_id: PeerId, + req_id: ReqId, + active: bool, + } + + impl<'a> IdGuard<'a> { + /// Create a new `IdGuard`, which will prevent access of the inner ReqId + /// (for forming responses, triggering handlers) until defused + pub fn new(peers: RwLockReadGuard<'a, PeerMap>, peer_id: PeerId, req_id: ReqId) -> Self { + IdGuard { + peers: peers, + peer_id: peer_id, + req_id: req_id, + active: true, + } + } + + /// Defuse the guard, signalling that the request has been successfully decoded. + pub fn defuse(mut self) -> ReqId { + // can't use the mem::forget trick here since we need the + // read guard to drop. + self.active = false; + self.req_id + } + } + + impl<'a> Drop for IdGuard<'a> { + fn drop(&mut self) { + if !self.active { return } + if let Some(p) = self.peers.get(&self.peer_id) { + p.lock().failed_requests.push(self.req_id); + } + } + } +} + /// This is an implementation of the light ethereum network protocol, abstracted /// over a `Provider` of data and a p2p network. /// @@ -228,7 +282,7 @@ pub struct LightProtocol { genesis_hash: H256, network_id: u64, pending_peers: RwLock>, - peers: RwLock>>, + peers: RwLock, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. handlers: Vec>, @@ -384,36 +438,49 @@ impl LightProtocol { // - check whether peer exists // - check whether request was made // - check whether request kinds match - fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result { + fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result { let req_id = ReqId(raw.val_at(0)?); let cur_buffer: U256 = raw.val_at(1)?; trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind); + let mut had_req = false; let peers = self.peers.read(); - match peers.get(peer) { + let maybe_err = match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); + let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now()); + let flow_info = peer_info.remote_flow.as_mut(); + + match (req_info, flow_info) { + (Some(request), Some(flow_info)) => { + had_req = true; + + let &mut (ref mut buf, ref mut flow) = flow_info; + let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit()); + buf.update_to(actual_buffer); - match peer_info.pending_requests.remove(&req_id, SteadyTime::now()) { - None => return Err(Error::UnsolicitedResponse), - Some(request) => { if request.kind() != kind { - return Err(Error::UnsolicitedResponse) + Some(Error::UnsolicitedResponse) + } else { + None } } + (None, _) => Some(Error::UnsolicitedResponse), + (_, None) => Some(Error::NotServer), // really should be impossible. } - - match peer_info.remote_flow.as_mut() { - Some(&mut (ref mut buf, ref mut flow)) => { - let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit()); - buf.update_to(actual_buffer) - } - None => return Err(Error::NotServer), // this really should be impossible. - } - Ok(req_id) } - None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. + None => Some(Error::UnknownPeer), // probably only occurs in a race of some kind. + }; + + if had_req { + let id_guard = IdGuard::new(peers, *peer, req_id); + match maybe_err { + Some(err) => Err(err), + None => Ok(id_guard) + } + } else { + Err(maybe_err.expect("every branch without a request leads to error; qed")) } } @@ -542,7 +609,9 @@ impl LightProtocol { self.pending_peers.write().remove(&peer); if let Some(peer_info) = self.peers.write().remove(&peer) { - let unfulfilled: Vec<_> = peer_info.into_inner().pending_requests.collect_ids(); + let peer_info = peer_info.into_inner(); + let mut unfulfilled: Vec<_> = peer_info.pending_requests.collect_ids(); + unfulfilled.extend(peer_info.failed_requests); for handler in &self.handlers { handler.on_disconnect(&Ctx { @@ -585,6 +654,7 @@ impl LightProtocol { sent_head: pending.sent_head, last_update: pending.last_update, pending_requests: RequestSet::default(), + failed_requests: Vec::new(), })); for handler in &self.handlers { @@ -699,9 +769,10 @@ impl LightProtocol { // Receive a response for block headers. fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Headers, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Headers, &raw)?; let raw_headers: Vec<_> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect(); + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_block_headers(&Ctx { peer: *peer, @@ -764,9 +835,10 @@ impl LightProtocol { // Receive a response for block bodies. fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?; let raw_bodies: Vec = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect(); + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_block_bodies(&Ctx { peer: *peer, @@ -826,12 +898,13 @@ impl LightProtocol { // Receive a response for receipts. fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?; let raw_receipts: Vec> = raw.at(2)? .iter() .map(|x| x.as_val()) .collect::>()?; + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_receipts(&Ctx { peer: *peer, @@ -899,12 +972,13 @@ impl LightProtocol { // Receive a response for proofs. fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?; let raw_proofs: Vec> = raw.at(2)?.iter() .map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect()) .collect(); + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_state_proofs(&Ctx { peer: *peer, @@ -970,12 +1044,13 @@ impl LightProtocol { // Receive a response for contract code. fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> { - let req_id = self.pre_verify_response(peer, request::Kind::Codes, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::Codes, &raw)?; let raw_code: Vec = raw.at(2)?.iter() .map(|x| x.as_val()) .collect::>()?; + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_code(&Ctx { peer: *peer, @@ -1049,11 +1124,12 @@ impl LightProtocol { )) } - let req_id = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?; + let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?; let raw_proofs: Vec<_> = raw.at(2)?.iter() .map(decode_res) .collect::>()?; + let req_id = id_guard.defuse(); for handler in &self.handlers { handler.on_header_proofs(&Ctx { peer: *peer,