ensure failed requests considered unfulfilled

This commit is contained in:
Robert Habermeier 2017-01-09 11:29:06 +01:00
parent f63faea308
commit 54058e3712
1 changed files with 102 additions and 26 deletions

View File

@ -40,6 +40,7 @@ use self::buffer_flow::{Buffer, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
use self::request_set::RequestSet;
use self::id_guard::IdGuard;
mod buffer_flow;
mod context;
@ -131,8 +132,9 @@ struct PendingPeer {
last_update: SteadyTime,
}
// data about each peer.
struct Peer {
/// Relevant data to each peer. Not accessible publicly, only `pub` due to
/// limitations of the privacy system.
pub struct Peer {
local_buffer: Buffer, // their buffer relative to us
status: Status,
capabilities: Capabilities,
@ -140,6 +142,7 @@ struct Peer {
sent_head: H256, // last chain head we've given them.
last_update: SteadyTime,
pending_requests: RequestSet,
failed_requests: Vec<ReqId>,
}
impl Peer {
@ -213,6 +216,57 @@ pub struct Params {
pub capabilities: Capabilities,
}
/// Type alias for convenience.
pub type PeerMap = HashMap<PeerId, Mutex<Peer>>;
mod id_guard {
use network::PeerId;
use util::RwLockReadGuard;
use super::{PeerMap, ReqId};
// Guards success or failure of given request.
// On drop, inserts the req_id into the "failed requests"
// set for the peer unless defused. In separate module to enforce correct usage.
pub struct IdGuard<'a> {
peers: RwLockReadGuard<'a, PeerMap>,
peer_id: PeerId,
req_id: ReqId,
active: bool,
}
impl<'a> IdGuard<'a> {
/// Create a new `IdGuard`, which will prevent access of the inner ReqId
/// (for forming responses, triggering handlers) until defused
pub fn new(peers: RwLockReadGuard<'a, PeerMap>, peer_id: PeerId, req_id: ReqId) -> Self {
IdGuard {
peers: peers,
peer_id: peer_id,
req_id: req_id,
active: true,
}
}
/// Defuse the guard, signalling that the request has been successfully decoded.
pub fn defuse(mut self) -> ReqId {
// can't use the mem::forget trick here since we need the
// read guard to drop.
self.active = false;
self.req_id
}
}
impl<'a> Drop for IdGuard<'a> {
fn drop(&mut self) {
if !self.active { return }
if let Some(p) = self.peers.get(&self.peer_id) {
p.lock().failed_requests.push(self.req_id);
}
}
}
}
/// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network.
///
@ -228,7 +282,7 @@ pub struct LightProtocol {
genesis_hash: H256,
network_id: u64,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
peers: RwLock<PeerMap>,
capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>,
@ -384,36 +438,49 @@ impl LightProtocol {
// - check whether peer exists
// - check whether request was made
// - check whether request kinds match
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<ReqId, Error> {
fn pre_verify_response(&self, peer: &PeerId, kind: request::Kind, raw: &UntrustedRlp) -> Result<IdGuard, Error> {
let req_id = ReqId(raw.val_at(0)?);
let cur_buffer: U256 = raw.val_at(1)?;
trace!(target: "les", "pre-verifying response from peer {}, kind={:?}", peer, kind);
let mut had_req = false;
let peers = self.peers.read();
match peers.get(peer) {
let maybe_err = match peers.get(peer) {
Some(peer_info) => {
let mut peer_info = peer_info.lock();
let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now());
let flow_info = peer_info.remote_flow.as_mut();
match (req_info, flow_info) {
(Some(request), Some(flow_info)) => {
had_req = true;
let &mut (ref mut buf, ref mut flow) = flow_info;
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
buf.update_to(actual_buffer);
match peer_info.pending_requests.remove(&req_id, SteadyTime::now()) {
None => return Err(Error::UnsolicitedResponse),
Some(request) => {
if request.kind() != kind {
return Err(Error::UnsolicitedResponse)
Some(Error::UnsolicitedResponse)
} else {
None
}
}
(None, _) => Some(Error::UnsolicitedResponse),
(_, None) => Some(Error::NotServer), // really should be impossible.
}
match peer_info.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref mut flow)) => {
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
buf.update_to(actual_buffer)
}
None => return Err(Error::NotServer), // this really should be impossible.
}
Ok(req_id)
}
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
None => Some(Error::UnknownPeer), // probably only occurs in a race of some kind.
};
if had_req {
let id_guard = IdGuard::new(peers, *peer, req_id);
match maybe_err {
Some(err) => Err(err),
None => Ok(id_guard)
}
} else {
Err(maybe_err.expect("every branch without a request leads to error; qed"))
}
}
@ -542,7 +609,9 @@ impl LightProtocol {
self.pending_peers.write().remove(&peer);
if let Some(peer_info) = self.peers.write().remove(&peer) {
let unfulfilled: Vec<_> = peer_info.into_inner().pending_requests.collect_ids();
let peer_info = peer_info.into_inner();
let mut unfulfilled: Vec<_> = peer_info.pending_requests.collect_ids();
unfulfilled.extend(peer_info.failed_requests);
for handler in &self.handlers {
handler.on_disconnect(&Ctx {
@ -585,6 +654,7 @@ impl LightProtocol {
sent_head: pending.sent_head,
last_update: pending.last_update,
pending_requests: RequestSet::default(),
failed_requests: Vec::new(),
}));
for handler in &self.handlers {
@ -699,9 +769,10 @@ impl LightProtocol {
// Receive a response for block headers.
fn block_headers(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = self.pre_verify_response(peer, request::Kind::Headers, &raw)?;
let id_guard = self.pre_verify_response(peer, request::Kind::Headers, &raw)?;
let raw_headers: Vec<_> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
let req_id = id_guard.defuse();
for handler in &self.handlers {
handler.on_block_headers(&Ctx {
peer: *peer,
@ -764,9 +835,10 @@ impl LightProtocol {
// Receive a response for block bodies.
fn block_bodies(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?;
let id_guard = self.pre_verify_response(peer, request::Kind::Bodies, &raw)?;
let raw_bodies: Vec<Bytes> = raw.at(2)?.iter().map(|x| x.as_raw().to_owned()).collect();
let req_id = id_guard.defuse();
for handler in &self.handlers {
handler.on_block_bodies(&Ctx {
peer: *peer,
@ -826,12 +898,13 @@ impl LightProtocol {
// Receive a response for receipts.
fn receipts(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?;
let id_guard = self.pre_verify_response(peer, request::Kind::Receipts, &raw)?;
let raw_receipts: Vec<Vec<Receipt>> = raw.at(2)?
.iter()
.map(|x| x.as_val())
.collect::<Result<_,_>>()?;
let req_id = id_guard.defuse();
for handler in &self.handlers {
handler.on_receipts(&Ctx {
peer: *peer,
@ -899,12 +972,13 @@ impl LightProtocol {
// Receive a response for proofs.
fn proofs(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?;
let id_guard = self.pre_verify_response(peer, request::Kind::StateProofs, &raw)?;
let raw_proofs: Vec<Vec<Bytes>> = raw.at(2)?.iter()
.map(|x| x.iter().map(|node| node.as_raw().to_owned()).collect())
.collect();
let req_id = id_guard.defuse();
for handler in &self.handlers {
handler.on_state_proofs(&Ctx {
peer: *peer,
@ -970,12 +1044,13 @@ impl LightProtocol {
// Receive a response for contract code.
fn contract_code(&self, peer: &PeerId, io: &IoContext, raw: UntrustedRlp) -> Result<(), Error> {
let req_id = self.pre_verify_response(peer, request::Kind::Codes, &raw)?;
let id_guard = self.pre_verify_response(peer, request::Kind::Codes, &raw)?;
let raw_code: Vec<Bytes> = raw.at(2)?.iter()
.map(|x| x.as_val())
.collect::<Result<_,_>>()?;
let req_id = id_guard.defuse();
for handler in &self.handlers {
handler.on_code(&Ctx {
peer: *peer,
@ -1049,11 +1124,12 @@ impl LightProtocol {
))
}
let req_id = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
let id_guard = self.pre_verify_response(peer, request::Kind::HeaderProofs, &raw)?;
let raw_proofs: Vec<_> = raw.at(2)?.iter()
.map(decode_res)
.collect::<Result<_,_>>()?;
let req_id = id_guard.defuse();
for handler in &self.handlers {
handler.on_header_proofs(&Ctx {
peer: *peer,