light: specialize Downloader to SyncRound

This commit is contained in:
Robert Habermeier 2016-12-14 23:25:51 +01:00
parent 91b8fa7039
commit 1bcfc9348d
1 changed files with 54 additions and 29 deletions

View File

@ -21,6 +21,7 @@ use std::mem;
use ethcore::header::Header; use ethcore::header::Header;
use light::client::LightChainClient;
use light::net::{EventContext, ReqId}; use light::net::{EventContext, ReqId};
use light::request::Headers as HeadersRequest; use light::request::Headers as HeadersRequest;
@ -36,11 +37,20 @@ use super::response::{self, Constraint};
const ROUND_SKIP: usize = 255; const ROUND_SKIP: usize = 255;
// amount of scaffold frames: these are the blank spaces in "X___X___X" // amount of scaffold frames: these are the blank spaces in "X___X___X"
const ROUND_FRAMES: u64 = 255; const ROUND_FRAMES: usize = 255;
// number of attempts to make to get a full scaffold for a sync round. // number of attempts to make to get a full scaffold for a sync round.
const SCAFFOLD_ATTEMPTS: usize = 3; const SCAFFOLD_ATTEMPTS: usize = 3;
/// Reasons for sync round abort.
#[derive(Debug, Clone, Copy)]
pub enum AbortReason {
/// Bad chain downloaded.
BadChain,
/// No incoming data.
NoResponses,
}
// A request for headers with a known starting header // A request for headers with a known starting header
// and a known parent hash for the last block. // and a known parent hash for the last block.
struct Request { struct Request {
@ -86,7 +96,7 @@ impl Fetcher {
} }
} }
fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Downloader, Result<(), Error>) { fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) {
unimplemented!() unimplemented!()
} }
} }
@ -98,7 +108,7 @@ struct RoundStart {
start_block: (u64, H256), start_block: (u64, H256),
pending_req: Option<(ReqId, HeadersRequest)>, pending_req: Option<(ReqId, HeadersRequest)>,
sparse_headers: Vec<Header>, sparse_headers: Vec<Header>,
attempt: 0, attempt: usize,
} }
impl RoundStart { impl RoundStart {
@ -107,65 +117,80 @@ impl RoundStart {
start_block: start.clone(), start_block: start.clone(),
pending_req: None, pending_req: None,
sparse_headers: Vec::new(), sparse_headers: Vec::new(),
attempt: 0,
} }
} }
fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (Downloader, Result<(), Error>) { fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) {
let req = match self.pending_req.take() { let req = match self.pending_req.take() {
Some((id, req)) if req_id == id { req.clone() } Some((id, ref req)) if req_id == id => { req.clone() }
other => { other => {
self.pending_req = other; self.pending_req = other;
return (self, Ok(())) return (SyncRound::Start(self), Ok(()))
} }
}; };
self.attempt += 1; self.attempt += 1;
let headers = match response::decode_and_verify(headers, &req) { let res = match response::decode_and_verify(headers, &req) {
Ok(headers) => { Ok(headers) => {
self.sparse_headers.extend(headers); self.sparse_headers.extend(headers);
if self.sparse_headers.len() == ROUND_FRAMES + 1 if self.sparse_headers.len() == ROUND_FRAMES + 1 {
|| self.attempt >= SCAFFOLD_ATTEMPTS trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers",
{ self.sparse_headers.len());
let fetcher = Fetcher::new(self.sparse_headers); let fetcher = Fetcher::new(self.sparse_headers);
(Downloader::Fetch(fetcher), Ok(())) return (SyncRound::Fetch(fetcher), Ok(()));
} }
Ok(())
} }
Err(e) => Err(e),
};
if self.attempt >= SCAFFOLD_ATTEMPTS {
(SyncRound::Abort(AbortReason::NoResponses), res.map_err(Into::into))
} else {
(SyncRound::Start(self), res.map_err(Into::into))
} }
} }
} }
/// Downloader state machine. /// Sync round state machine.
pub enum Downloader { pub enum SyncRound {
/// Waiting for peers.
Nothing,
/// Searching for common block with best chain.
SearchCommon,
/// Beginning a sync round. /// Beginning a sync round.
RoundStart(RoundStart), Start(RoundStart),
/// Fetching intermediate blocks during a sync round. /// Fetching intermediate blocks during a sync round.
Fetch(Fetcher), Fetch(Fetcher),
/// Aborted.
Abort(AbortReason),
} }
impl Downloader { impl SyncRound {
// Process an answer to a request. Unknown requests will be ignored. fn abort(reason: AbortReason) -> Self {
fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) { trace!(target: "sync", "Aborting sync round: {:?}", reason);
SyncRound::Abort(reason)
}
/// Process an answer to a request. Unknown requests will be ignored.
pub fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) {
match self { match self {
Downloader::RoundStart(round_start) => round_start.process_response(req_id, headers), SyncRound::Start(round_start) => round_start.process_response(req_id, headers),
Downloader::Fetch(fetcher) => fetcher.process_response(req_id, headers), SyncRound::Fetch(fetcher) => fetcher.process_response(req_id, headers),
other => (other, Ok(())), other => (other, Ok(())),
} }
} }
// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. /// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored.
fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) { pub fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) {
unimplemented!()
} }
// Dispatch pending requests. The dispatcher provided will attempt to /// Dispatch pending requests. The dispatcher provided will attempt to
// find a suitable peer to serve the request. /// find a suitable peer to serve the request.
// TODO: have dispatcher take capabilities argument? // TODO: have dispatcher take capabilities argument?
fn dispatch_requests<D>(self, dispatcher: D) -> (Self, Result<(), Error>) pub fn dispatch_requests<D>(self, dispatcher: D) -> (Self, Result<(), Error>)
where D: Fn(HeadersRequest) -> Option<ReqId> where D: Fn(HeadersRequest) -> Option<ReqId>
{ {
unimplemented!() unimplemented!()