diff --git a/sync/src/light_sync/downloader.rs b/sync/src/light_sync/sync_round.rs similarity index 65% rename from sync/src/light_sync/downloader.rs rename to sync/src/light_sync/sync_round.rs index dd78a3144..905e8354f 100644 --- a/sync/src/light_sync/downloader.rs +++ b/sync/src/light_sync/sync_round.rs @@ -21,6 +21,7 @@ use std::mem; use ethcore::header::Header; +use light::client::LightChainClient; use light::net::{EventContext, ReqId}; use light::request::Headers as HeadersRequest; @@ -36,11 +37,20 @@ use super::response::{self, Constraint}; const ROUND_SKIP: usize = 255; // amount of scaffold frames: these are the blank spaces in "X___X___X" -const ROUND_FRAMES: u64 = 255; +const ROUND_FRAMES: usize = 255; // number of attempts to make to get a full scaffold for a sync round. const SCAFFOLD_ATTEMPTS: usize = 3; +/// Reasons for sync round abort. +#[derive(Debug, Clone, Copy)] +pub enum AbortReason { + /// Bad chain downloaded. + BadChain, + /// No incoming data. + NoResponses, +} + // A request for headers with a known starting header // and a known parent hash for the last block. struct Request { @@ -86,7 +96,7 @@ impl Fetcher { } } - fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Downloader, Result<(), Error>) { + fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) { unimplemented!() } } @@ -98,7 +108,7 @@ struct RoundStart { start_block: (u64, H256), pending_req: Option<(ReqId, HeadersRequest)>, sparse_headers: Vec
, - attempt: 0, + attempt: usize, } impl RoundStart { @@ -107,65 +117,80 @@ impl RoundStart { start_block: start.clone(), pending_req: None, sparse_headers: Vec::new(), + attempt: 0, } } - fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (Downloader, Result<(), Error>) { + fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (SyncRound, Result<(), Error>) { let req = match self.pending_req.take() { - Some((id, req)) if req_id == id { req.clone() } + Some((id, ref req)) if req_id == id => { req.clone() } other => { self.pending_req = other; - return (self, Ok(())) + return (SyncRound::Start(self), Ok(())) } }; self.attempt += 1; - let headers = match response::decode_and_verify(headers, &req) { + let res = match response::decode_and_verify(headers, &req) { Ok(headers) => { self.sparse_headers.extend(headers); - if self.sparse_headers.len() == ROUND_FRAMES + 1 - || self.attempt >= SCAFFOLD_ATTEMPTS - { + if self.sparse_headers.len() == ROUND_FRAMES + 1 { + trace!(target: "sync", "Beginning fetch of blocks between {} sparse headers", + self.sparse_headers.len()); + let fetcher = Fetcher::new(self.sparse_headers); - (Downloader::Fetch(fetcher), Ok(())) + return (SyncRound::Fetch(fetcher), Ok(())); } + + Ok(()) } + Err(e) => Err(e), + }; + + if self.attempt >= SCAFFOLD_ATTEMPTS { + (SyncRound::Abort(AbortReason::NoResponses), res.map_err(Into::into)) + } else { + (SyncRound::Start(self), res.map_err(Into::into)) } } } -/// Downloader state machine. -pub enum Downloader { - /// Waiting for peers. - Nothing, - /// Searching for common block with best chain. - SearchCommon, +/// Sync round state machine. +pub enum SyncRound { /// Beginning a sync round. - RoundStart(RoundStart), + Start(RoundStart), /// Fetching intermediate blocks during a sync round. Fetch(Fetcher), + /// Aborted. + Abort(AbortReason), } -impl Downloader { - // Process an answer to a request. Unknown requests will be ignored. - fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) { +impl SyncRound { + fn abort(reason: AbortReason) -> Self { + trace!(target: "sync", "Aborting sync round: {:?}", reason); + + SyncRound::Abort(reason) + } + + /// Process an answer to a request. Unknown requests will be ignored. + pub fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) { match self { - Downloader::RoundStart(round_start) => round_start.process_response(req_id, headers), - Downloader::Fetch(fetcher) => fetcher.process_response(req_id, headers), + SyncRound::Start(round_start) => round_start.process_response(req_id, headers), + SyncRound::Fetch(fetcher) => fetcher.process_response(req_id, headers), other => (other, Ok(())), } } - // Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. - fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) { - + /// Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. + pub fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) { + unimplemented!() } - // Dispatch pending requests. The dispatcher provided will attempt to - // find a suitable peer to serve the request. + /// Dispatch pending requests. The dispatcher provided will attempt to + /// find a suitable peer to serve the request. // TODO: have dispatcher take capabilities argument? - fn dispatch_requests(self, dispatcher: D) -> (Self, Result<(), Error>) + pub fn dispatch_requests(self, dispatcher: D) -> (Self, Result<(), Error>) where D: Fn(HeadersRequest) -> Option { unimplemented!()