diff --git a/sync/src/light_sync/downloader.rs b/sync/src/light_sync/downloader.rs new file mode 100644 index 000000000..dd78a3144 --- /dev/null +++ b/sync/src/light_sync/downloader.rs @@ -0,0 +1,173 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Header download state machine. + +use std::collections::{HashMap, VecDeque}; +use std::mem; + +use ethcore::header::Header; + +use light::net::{EventContext, ReqId}; +use light::request::Headers as HeadersRequest; + +use network::PeerId; +use rlp::{UntrustedRlp, View}; +use util::{Bytes, H256, Mutex}; + +use super::{Error, Peer}; +use super::response::{self, Constraint}; + +// amount of blocks between each scaffold entry. +// TODO: move these into paraeters for `RoundStart::new`? +const ROUND_SKIP: usize = 255; + +// amount of scaffold frames: these are the blank spaces in "X___X___X" +const ROUND_FRAMES: u64 = 255; + +// number of attempts to make to get a full scaffold for a sync round. +const SCAFFOLD_ATTEMPTS: usize = 3; + +// A request for headers with a known starting header +// and a known parent hash for the last block. +struct Request { + headers: HeadersRequest, + end_parent: H256, +} + +pub struct Fetcher { + sparse: Vec
, // sparse header chain. + requests: VecDeque, + pending: HashMap, +} + +impl Fetcher { + // Produce a new fetcher given a sparse headerchain, in ascending order. + // The headers must be valid RLP at this point. + fn new(sparse_headers: Vec
) -> Self { + let mut requests = VecDeque::with_capacity(sparse_headers.len() - 1); + for pair in sparse_headers.windows(2) { + let low_rung = &pair[0]; + let high_rung = &pair[1]; + + let diff = high_rung.number() - low_rung.number(); + if diff < 2 { continue } // these headers are already adjacent. + + let needed_headers = HeadersRequest { + start: high_rung.parent_hash().clone().into(), + max: diff as usize - 1, + skip: 0, + reverse: true, + }; + + requests.push_back(Request { + headers: needed_headers, + end_parent: low_rung.hash(), + }); + } + + Fetcher { + sparse: sparse_headers, + requests: requests, + pending: HashMap::new(), + } + } + + fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Downloader, Result<(), Error>) { + unimplemented!() + } +} + +// Round started: get stepped header chain. +// from a start block with number X we request 256 headers stepped by 256 from +// block X + 1. +struct RoundStart { + start_block: (u64, H256), + pending_req: Option<(ReqId, HeadersRequest)>, + sparse_headers: Vec
, + attempt: 0, +} + +impl RoundStart { + fn new(start: (u64, H256)) -> Self { + RoundStart { + start_block: start.clone(), + pending_req: None, + sparse_headers: Vec::new(), + } + } + + fn process_response(mut self, req_id: ReqId, headers: &[Bytes]) -> (Downloader, Result<(), Error>) { + let req = match self.pending_req.take() { + Some((id, req)) if req_id == id { req.clone() } + other => { + self.pending_req = other; + return (self, Ok(())) + } + }; + + self.attempt += 1; + let headers = match response::decode_and_verify(headers, &req) { + Ok(headers) => { + self.sparse_headers.extend(headers); + + if self.sparse_headers.len() == ROUND_FRAMES + 1 + || self.attempt >= SCAFFOLD_ATTEMPTS + { + let fetcher = Fetcher::new(self.sparse_headers); + (Downloader::Fetch(fetcher), Ok(())) + } + } + } + } +} + +/// Downloader state machine. +pub enum Downloader { + /// Waiting for peers. + Nothing, + /// Searching for common block with best chain. + SearchCommon, + /// Beginning a sync round. + RoundStart(RoundStart), + /// Fetching intermediate blocks during a sync round. + Fetch(Fetcher), +} + +impl Downloader { + // Process an answer to a request. Unknown requests will be ignored. + fn process_response(self, req_id: ReqId, headers: &[Bytes]) -> (Self, Result<(), Error>) { + match self { + Downloader::RoundStart(round_start) => round_start.process_response(req_id, headers), + Downloader::Fetch(fetcher) => fetcher.process_response(req_id, headers), + other => (other, Ok(())), + } + } + + // Return unfulfilled requests from disconnected peer. Unknown requests will be ignored. + fn requests_abandoned(self, abandoned: &[ReqId]) -> (Self, Result<(), Error>) { + + } + + // Dispatch pending requests. The dispatcher provided will attempt to + // find a suitable peer to serve the request. + // TODO: have dispatcher take capabilities argument? + fn dispatch_requests(self, dispatcher: D) -> (Self, Result<(), Error>) + where D: Fn(HeadersRequest) -> Option + { + unimplemented!() + } +}