drain prepared headers from sync round
This commit is contained in:
parent
71e96aca10
commit
f776f48023
@ -33,8 +33,8 @@ use util::{Bytes, H256, Mutex};
|
|||||||
use super::response;
|
use super::response;
|
||||||
|
|
||||||
// amount of blocks between each scaffold entry.
|
// amount of blocks between each scaffold entry.
|
||||||
// TODO: move these into paraeters for `RoundStart::new`?
|
// TODO: move these into parameters for `RoundStart::new`?
|
||||||
const ROUND_SKIP: usize = 255;
|
const ROUND_SKIP: u64 = 255;
|
||||||
|
|
||||||
// amount of scaffold frames: these are the blank spaces in "X___X___X"
|
// amount of scaffold frames: these are the blank spaces in "X___X___X"
|
||||||
const ROUND_FRAMES: usize = 255;
|
const ROUND_FRAMES: usize = 255;
|
||||||
@ -94,13 +94,16 @@ pub struct Fetcher {
|
|||||||
complete_requests: HashMap<H256, SubchainRequest>,
|
complete_requests: HashMap<H256, SubchainRequest>,
|
||||||
pending: HashMap<ReqId, SubchainRequest>,
|
pending: HashMap<ReqId, SubchainRequest>,
|
||||||
scaffold_contributors: Vec<PeerId>,
|
scaffold_contributors: Vec<PeerId>,
|
||||||
|
ready: VecDeque<Header>,
|
||||||
|
end: (u64, H256),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Fetcher {
|
impl Fetcher {
|
||||||
// Produce a new fetcher given a sparse headerchain, in ascending order along
|
// Produce a new fetcher given a sparse headerchain, in ascending order along
|
||||||
// with a list of peers who helped produce the chain.
|
// with a list of peers who helped produce the chain.
|
||||||
// The headers must be valid RLP at this point.
|
// The headers must be valid RLP at this point and must have a consistent
|
||||||
fn new(sparse_headers: Vec<Header>, contributors: Vec<PeerId>) -> Self {
|
// non-zero gap between them. Will abort the round if found wrong.
|
||||||
|
fn new(sparse_headers: Vec<Header>, contributors: Vec<PeerId>) -> SyncRound {
|
||||||
let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1);
|
let mut requests = BinaryHeap::with_capacity(sparse_headers.len() - 1);
|
||||||
|
|
||||||
for pair in sparse_headers.windows(2) {
|
for pair in sparse_headers.windows(2) {
|
||||||
@ -128,12 +131,44 @@ impl Fetcher {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Fetcher {
|
let end = match sparse_headers.last().map(|h| (h.number(), h.hash())) {
|
||||||
|
Some(end) => end,
|
||||||
|
None => return SyncRound::abort(AbortReason::BadScaffold(contributors)),
|
||||||
|
};
|
||||||
|
|
||||||
|
SyncRound::Fetch(Fetcher {
|
||||||
sparse: sparse_headers.into(),
|
sparse: sparse_headers.into(),
|
||||||
requests: requests,
|
requests: requests,
|
||||||
complete_requests: HashMap::new(),
|
complete_requests: HashMap::new(),
|
||||||
pending: HashMap::new(),
|
pending: HashMap::new(),
|
||||||
scaffold_contributors: contributors,
|
scaffold_contributors: contributors,
|
||||||
|
ready: VecDeque::new(),
|
||||||
|
end: end,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect complete requests and their subchain from the sparse header chain
|
||||||
|
// into the ready set in order.
|
||||||
|
fn collect_ready(&mut self) {
|
||||||
|
loop {
|
||||||
|
let start_hash = match self.sparse.front() {
|
||||||
|
Some(first) => first.hash(),
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.complete_requests.remove(&start_hash) {
|
||||||
|
None => break,
|
||||||
|
Some(complete_req) => {
|
||||||
|
self.ready.push_back(self.sparse.pop_front().expect("first known to exist; qed"));
|
||||||
|
self.ready.extend(complete_req.downloaded);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// frames are between two sparse headers and keyed by subchain parent, so the last
|
||||||
|
// remaining will be the last header.
|
||||||
|
if self.sparse.len() == 1 {
|
||||||
|
self.ready.push_back(self.sparse.pop_back().expect("sparse known to have one entry; qed"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,6 +244,40 @@ impl Fetcher {
|
|||||||
// TODO: track failure rate and potentially abort.
|
// TODO: track failure rate and potentially abort.
|
||||||
SyncRound::Fetch(self)
|
SyncRound::Fetch(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn dispatch_requests<D>(mut self, dispatcher: D) -> SyncRound
|
||||||
|
where D: Fn(HeadersRequest) -> Option<ReqId>
|
||||||
|
{
|
||||||
|
while let Some(pending_req) = self.requests.pop() {
|
||||||
|
match dispatcher(pending_req.headers_request.clone()) {
|
||||||
|
Some(req_id) => {
|
||||||
|
trace!(target: "sync", "Assigned request for subchain ({} -> {})",
|
||||||
|
pending_req.subchain_parent.0 + 1, pending_req.subchain_end.0);
|
||||||
|
|
||||||
|
self.pending.insert(req_id, pending_req);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
self.requests.push(pending_req);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncRound::Fetch(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drain(mut self, headers: &mut Vec<Header>, max: usize) -> SyncRound {
|
||||||
|
self.collect_ready();
|
||||||
|
|
||||||
|
let max = ::std::cmp::min(max, self.ready.len());
|
||||||
|
headers.extend(self.ready.drain(0..max));
|
||||||
|
|
||||||
|
if self.sparse.is_empty() && self.ready.is_empty() {
|
||||||
|
SyncRound::Start(RoundStart::new(self.end))
|
||||||
|
} else {
|
||||||
|
SyncRound::Fetch(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Round started: get stepped header chain.
|
/// Round started: get stepped header chain.
|
||||||
@ -233,14 +302,16 @@ impl RoundStart {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// called on failed attempt. may trigger a transition.
|
// called on failed attempt. may trigger a transition after a number of attempts.
|
||||||
|
// a failed attempt is defined as:
|
||||||
|
// - any time we try to make a request to a peer and fail
|
||||||
|
// - any time a peer returns invalid or incomplete response
|
||||||
fn failed_attempt(mut self) -> SyncRound {
|
fn failed_attempt(mut self) -> SyncRound {
|
||||||
self.attempt += 1;
|
self.attempt += 1;
|
||||||
|
|
||||||
if self.attempt >= SCAFFOLD_ATTEMPTS {
|
if self.attempt >= SCAFFOLD_ATTEMPTS {
|
||||||
if self.sparse_headers.len() > 1 {
|
if self.sparse_headers.len() > 1 {
|
||||||
let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
|
Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect())
|
||||||
SyncRound::Fetch(fetcher)
|
|
||||||
} else {
|
} else {
|
||||||
SyncRound::Abort(AbortReason::NoResponses)
|
SyncRound::Abort(AbortReason::NoResponses)
|
||||||
}
|
}
|
||||||
@ -260,6 +331,12 @@ impl RoundStart {
|
|||||||
|
|
||||||
match response::decode_and_verify(ctx.data(), &req) {
|
match response::decode_and_verify(ctx.data(), &req) {
|
||||||
Ok(headers) => {
|
Ok(headers) => {
|
||||||
|
if self.sparse_headers.len() == 0
|
||||||
|
&& headers.get(0).map_or(false, |x| x.parent_hash() != &self.start_block.1) {
|
||||||
|
trace!(target: "sync", "Wrong parent for first header in round");
|
||||||
|
ctx.punish_responder(); // or should we reset?
|
||||||
|
}
|
||||||
|
|
||||||
self.contributors.insert(ctx.responder());
|
self.contributors.insert(ctx.responder());
|
||||||
self.sparse_headers.extend(headers);
|
self.sparse_headers.extend(headers);
|
||||||
|
|
||||||
@ -267,8 +344,7 @@ impl RoundStart {
|
|||||||
trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers",
|
trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers",
|
||||||
self.sparse_headers.len());
|
self.sparse_headers.len());
|
||||||
|
|
||||||
let fetcher = Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
|
return Fetcher::new(self.sparse_headers, self.contributors.into_iter().collect());
|
||||||
return SyncRound::Fetch(fetcher);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -294,6 +370,31 @@ impl RoundStart {
|
|||||||
None => SyncRound::Start(self),
|
None => SyncRound::Start(self),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn dispatch_requests<D>(mut self, dispatcher: D) -> SyncRound
|
||||||
|
where D: Fn(HeadersRequest) -> Option<ReqId>
|
||||||
|
{
|
||||||
|
if self.pending_req.is_none() {
|
||||||
|
|
||||||
|
// beginning offset + first block expected after last header we have.
|
||||||
|
let start = (self.start_block.0 + 1)
|
||||||
|
+ self.sparse_headers.len() as u64 * (ROUND_SKIP + 1);
|
||||||
|
|
||||||
|
let headers_request = HeadersRequest {
|
||||||
|
start: start.into(),
|
||||||
|
max: (ROUND_FRAMES - 1) - self.sparse_headers.len(),
|
||||||
|
skip: ROUND_SKIP,
|
||||||
|
reverse: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
match dispatcher(headers_request.clone()) {
|
||||||
|
Some(req_id) => self.pending_req = Some((req_id, headers_request)),
|
||||||
|
None => return self.failed_attempt(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncRound::Start(self)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync round state machine.
|
/// Sync round state machine.
|
||||||
@ -333,10 +434,24 @@ impl SyncRound {
|
|||||||
|
|
||||||
/// Dispatch pending requests. The dispatcher provided will attempt to
|
/// Dispatch pending requests. The dispatcher provided will attempt to
|
||||||
/// find a suitable peer to serve the request.
|
/// find a suitable peer to serve the request.
|
||||||
// TODO: have dispatcher take capabilities argument?
|
// TODO: have dispatcher take capabilities argument? and return an error as
|
||||||
|
// to why no suitable peer can be found? (no buffer, no chain heads that high, etc)
|
||||||
pub fn dispatch_requests<D>(self, dispatcher: D) -> Self
|
pub fn dispatch_requests<D>(self, dispatcher: D) -> Self
|
||||||
where D: Fn(HeadersRequest) -> Option<ReqId>
|
where D: Fn(HeadersRequest) -> Option<ReqId>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
match self {
|
||||||
|
SyncRound::Start(round_start) => round_start.dispatch_requests(dispatcher),
|
||||||
|
SyncRound::Fetch(fetcher) => fetcher.dispatch_requests(dispatcher),
|
||||||
|
other => other,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drain up to a maximum number of headers (continuous, starting with a child of
|
||||||
|
/// the round start block) from the round, starting a new one once finished.
|
||||||
|
pub fn drain(self, v: &mut Vec<Header>, max: usize) -> Self {
|
||||||
|
match self {
|
||||||
|
SyncRound::Fetch(fetcher) => fetcher.drain(v, max),
|
||||||
|
other => other,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user