From f776f480238ef92f7813e03b0b064c0427df18d2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 15 Dec 2016 17:33:25 +0100 Subject: [PATCH] drain prepared headers from sync round --- sync/src/light_sync/sync_round.rs | 139 +++++++++++++++++++++++++++--- 1 file changed, 127 insertions(+), 12 deletions(-) diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 83b27b046..0442cc5ce 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -33,8 +33,8 @@ use util::{Bytes, H256, Mutex}; use super::response; // amount of blocks between each scaffold entry. -// TODO: move these into paraeters for `RoundStart::new`? -const ROUND_SKIP: usize = 255; +// TODO: move these into parameters for `RoundStart::new`? +const ROUND_SKIP: u64 = 255; // amount of scaffold frames: these are the blank spaces in "X___X___X" const ROUND_FRAMES: usize = 255; @@ -94,13 +94,16 @@ pub struct Fetcher { complete_requests: HashMap, pending: HashMap, scaffold_contributors: Vec, + ready: VecDeque
, + end: (u64, H256), } impl Fetcher { // Produce a new fetcher given a sparse headerchain, in ascending order along // with a list of peers who helped produce the chain. - // The headers must be valid RLP at this point. - fn new(sparse_headers: Vec
, contributors: Vec) -> Self { + // The headers must be valid RLP at this point and must have a consistent + // non-zero gap between them. Will abort the round if found wrong. + fn new(sparse_headers: Vec
, contributors: Vec) -> SyncRound { let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1); for pair in sparse_headers.windows(2) { @@ -128,12 +131,44 @@ impl Fetcher { }); } - Fetcher { + let end = match sparse_headers.last().map(|h| (h.number(), h.hash())) { + Some(end) => end, + None => return SyncRound::abort(AbortReason::BadScaffold(contributors)), + }; + + SyncRound::Fetch(Fetcher { sparse: sparse_headers.into(), requests: requests, complete_requests: HashMap::new(), pending: HashMap::new(), scaffold_contributors: contributors, + ready: VecDeque::new(), + end: end, + }) + } + + // collect complete requests and their subchain from the sparse header chain + // into the ready set in order. + fn collect_ready(&mut self) { + loop { + let start_hash = match self.sparse.front() { + Some(first) => first.hash(), + None => break, + }; + + match self.complete_requests.remove(&start_hash) { + None => break, + Some(complete_req) => { + self.ready.push_back(self.sparse.pop_front().expect("first known to exist; qed")); + self.ready.extend(complete_req.downloaded); + } + } + } + + // frames are between two sparse headers and keyed by subchain parent, so the last + // remaining will be the last header. + if self.sparse.len() == 1 { + self.ready.push_back(self.sparse.pop_back().expect("sparse known to have one entry; qed")) } } @@ -209,6 +244,40 @@ impl Fetcher { // TODO: track failure rate and potentially abort. SyncRound::Fetch(self) } + + fn dispatch_requests(mut self, dispatcher: D) -> SyncRound + where D: Fn(HeadersRequest) -> Option + { + while let Some(pending_req) = self.requests.pop() { + match dispatcher(pending_req.headers_request.clone()) { + Some(req_id) => { + trace!(target: "sync", "Assigned request for subchain ({} -> {})", + pending_req.subchain_parent.0 + 1, pending_req.subchain_end.0); + + self.pending.insert(req_id, pending_req); + } + None => { + self.requests.push(pending_req); + break; + } + } + } + + SyncRound::Fetch(self) + } + + fn drain(mut self, headers: &mut Vec
, max: usize) -> SyncRound { + self.collect_ready(); + + let max = ::std::cmp::min(max, self.ready.len()); + headers.extend(self.ready.drain(0..max)); + + if self.sparse.is_empty() && self.ready.is_empty() { + SyncRound::Start(RoundStart::new(self.end)) + } else { + SyncRound::Fetch(self) + } + } } /// Round started: get stepped header chain. @@ -233,14 +302,16 @@ impl RoundStart { } } - // called on failed attempt. may trigger a transition. + // called on failed attempt. may trigger a transition after a number of attempts. + // a failed attempt is defined as: + // - any time we try to make a request to a peer and fail + // - any time a peer returns invalid or incomplete response fn failed_attempt(mut self) -> SyncRound { self.attempt += 1; if self.attempt >= SCAFFOLD_ATTEMPTS { if self.sparse_headers.len() > 1 { - let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); - SyncRound::Fetch(fetcher) + Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()) } else { SyncRound::Abort(AbortReason::NoResponses) } @@ -260,6 +331,12 @@ impl RoundStart { match response::decode_and_verify(ctx.data(), &req) { Ok(headers) => { + if self.sparse_headers.len() == 0 + && headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) { + trace!(target: "sync", "Wrong parent for first header in round"); + ctx.punish_responder(); // or should we reset? + } + self.contributors.insert(ctx.responder()); self.sparse_headers.extend(headers); @@ -267,8 +344,7 @@ impl RoundStart { trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", self.sparse_headers.len()); - let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); - return SyncRound::Fetch(fetcher); + return Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect()); } } Err(e) => { @@ -294,6 +370,31 @@ impl RoundStart { None => SyncRound::Start(self), } } + + fn dispatch_requests(mut self, dispatcher: D) -> SyncRound + where D: Fn(HeadersRequest) -> Option + { + if self.pending_req.is_none() { + + // beginning offset + first block expected after last header we have. + let start = (self.start_block.0 + 1) + + self.sparse_headers.len() as u64 * (ROUND_SKIP + 1); + + let headers_request = HeadersRequest { + start: start.into(), + max: (ROUND_FRAMES - 1) - self.sparse_headers.len(), + skip: ROUND_SKIP, + reverse: false, + }; + + match dispatcher(headers_request.clone()) { + Some(req_id) => self.pending_req = Some((req_id, headers_request)), + None => return self.failed_attempt(), + } + } + + SyncRound::Start(self) + } } /// Sync round state machine. @@ -333,10 +434,24 @@ impl SyncRound { /// Dispatch pending requests. The dispatcher provided will attempt to /// find a suitable peer to serve the request. - // TODO: have dispatcher take capabilities argument? + // TODO: have dispatcher take capabilities argument? and return an error as + // to why no suitable peer can be found? (no buffer, no chain heads that high, etc) pub fn dispatch_requests(self, dispatcher: D) -> Self where D: Fn(HeadersRequest) -> Option { - unimplemented!() + match self { + SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher), + SyncRound::Fetch(fetcher) => fetcher.dispatch_requests(dispatcher), + other => other, + } + } + + /// Drain up to a maximum number of headers (continuous, starting with a child of + /// the round start block) from the round, starting a new one once finished. + pub fn drain(self, v: &mut Vec
, max: usize) -> Self { + match self { + SyncRound::Fetch(fetcher) => fetcher.drain(v, max), + other => other, + } } }