handle abandoned requests

This commit is contained in:
Robert Habermeier 2016-12-15 16:19:28 +01:00
parent ec88a992e3
commit 71e96aca10

View File

@ -183,8 +183,6 @@ impl Fetcher {
let subchain_parent = request.subchain_parent.1; let subchain_parent = request.subchain_parent.1;
// TODO: check subchain parent and punish peers who did framing
// if it's inaccurate.
if request.headers_request.max == 0 { if request.headers_request.max == 0 {
if parent_hash.map_or(true, |hash| hash != subchain_parent) { if parent_hash.map_or(true, |hash| hash != subchain_parent) {
let abort = AbortReason::BadScaffold(self.scaffold_contributors); let abort = AbortReason::BadScaffold(self.scaffold_contributors);
@ -199,6 +197,18 @@ impl Fetcher {
} }
} }
} }
fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound {
for abandoned in abandoned {
match self.pending.remove(abandoned) {
None => {},
Some(req) => self.requests.push(req),
}
}
// TODO: track failure rate and potentially abort.
SyncRound::Fetch(self)
}
} }
/// Round started: get stepped header chain. /// Round started: get stepped header chain.
@ -223,6 +233,22 @@ impl RoundStart {
} }
} }
// called on failed attempt. may trigger a transition.
fn failed_attempt(mut self) -> SyncRound {
self.attempt += 1;
if self.attempt >= SCAFFOLD_ATTEMPTS {
if self.sparse_headers.len() > 1 {
let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
SyncRound::Fetch(fetcher)
} else {
SyncRound::Abort(AbortReason::NoResponses)
}
} else {
SyncRound::Start(self)
}
}
fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound { fn process_response<R: ResponseContext>(mut self, ctx: &R) -> SyncRound {
let req = match self.pending_req.take() { let req = match self.pending_req.take() {
Some((id, ref req)) if ctx.req_id() == id => { req.clone() } Some((id, ref req)) if ctx.req_id() == id => { req.clone() }
@ -232,7 +258,6 @@ impl RoundStart {
} }
}; };
self.attempt += 1;
match response::decode_and_verify(ctx.data(), &req) { match response::decode_and_verify(ctx.data(), &req) {
Ok(headers) => { Ok(headers) => {
self.contributors.insert(ctx.responder()); self.contributors.insert(ctx.responder());
@ -252,17 +277,23 @@ impl RoundStart {
} }
}; };
if self.attempt >= SCAFFOLD_ATTEMPTS { self.failed_attempt()
if self.sparse_headers.len() > 1 {
let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
SyncRound::Fetch(fetcher)
} else {
SyncRound::Abort(AbortReason::NoResponses)
} }
fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound {
match self.pending_req.take() {
Some((id, req)) => {
if abandoned.iter().any(|r| r == &id) {
self.pending_req = None;
self.failed_attempt()
} else { } else {
self.pending_req = Some((id, req));
SyncRound::Start(self) SyncRound::Start(self)
} }
} }
None => SyncRound::Start(self),
}
}
} }
/// Sync round state machine. /// Sync round state machine.
@ -293,7 +324,11 @@ impl SyncRound {
/// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. /// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored.
pub fn requests_abandoned(self, abandoned: &[ReqId]) -> Self { pub fn requests_abandoned(self, abandoned: &[ReqId]) -> Self {
unimplemented!() match self {
SyncRound::Start(round_start) => round_start.requests_abandoned(abandoned),
SyncRound::Fetch(fetcher) => fetcher.requests_abandoned(abandoned),
other => other,
}
} }
/// Dispatch pending requests. The dispatcher provided will attempt to /// Dispatch pending requests. The dispatcher provided will attempt to