diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index bd62b4c70..3ab2ffe82 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -33,7 +33,7 @@ use ethcore::header::Header; use light::client::LightChainClient; use light::net::{ Announcement, Handler, BasicContext, EventContext, - Capabilities, ReqId, Status + Capabilities, ReqId, Status, }; use light::request; use network::PeerId; @@ -66,10 +66,60 @@ impl Peer { } } -// Search for a common ancestor with the best chain. -struct AncestorSearch { - last_batched: u64, - req_id: Option, +// search for a common ancestor with the best chain. +enum AncestorSearch { + Queued(u64), // queued to search for blocks starting from here. + Awaiting(ReqId, u64, request::Headers), // awaiting response for this request. + Prehistoric, // prehistoric block found. TODO: start to roll back CHTs. + FoundCommon(u64, H256), // common block found. + Genesis, // common ancestor is the genesis. +} + +impl AncestorSearch { + fn begin(best_num: u64) -> Self { + match best_num { + 0 => AncestorSearch::Genesis, + x => AncestorSearch::Queued(best_num), + } + } + + fn process_response(mut self, ctx: &ResponseContext, client: &L) -> AncestorSearch + where L: LightChainClient + { + let first_num = client.chain_info().first_block_number.unwrap_or(0); + match self { + AncestorSearch::Awaiting(id, start, req) => { + if &id == ctx.req_id() { + match response::decode_and_verify(ctx.data(), &req) { + Ok(headers) => { + for header in &headers { + if client.is_known(&header.hash()) { + debug!(target: "sync", "Found common ancestor with best chain"); + return AncestorSearch::FoundCommon(header.number(), header.hash()); + } + + if header.number() <= first_num { + debug!(target: "sync", "Prehistoric common ancestor with best chain."); + return AncestorSearch::Prehistoric; + } + } + + AncestorSearch::Queued(start - headers.len() as u64) + } + Err(e) => { + trace!(target: "sync", "Bad headers response from {}: {}", ctx.responder(), e); + + ctx.punish_responder(); + AncestorSearch::Queued(start) + } + } + } else { + AncestorSearch::Awaiting(id, start, req) + } + } + other => other, + } + } } // synchronization state machine. @@ -218,17 +268,18 @@ impl Handler for LightSync { { let mut state = self.state.lock(); + let ctx = ResponseCtx { + peer: ctx.peer(), + req_id: req_id, + ctx: ctx.as_basic(), + data: headers, + }; + *state = match mem::replace(&mut *state, SyncState::Idle) { SyncState::Idle => SyncState::Idle, - SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search), - SyncState::Rounds(round) => { - SyncState::Rounds(round.process_response(&ResponseCtx { - peer: ctx.peer(), - req_id: req_id, - ctx: ctx.as_basic(), - data: headers, - })) - } + SyncState::AncestorSearch(search) => + SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)), + SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)), }; } @@ -244,10 +295,17 @@ impl Handler for LightSync { impl LightSync { // Begins a search for the common ancestor and our best block. // does not lock state, instead has a mutable reference to it passed. - fn begin_search(&self, _state: &mut SyncState) { + fn begin_search(&self, state: &mut SyncState) { self.client.clear_queue(); - unimplemented!(); + let chain_info = self.client.chain_info(); + if let None = *self.best_seen.lock() { + // no peers. + *state = SyncState::Idle; + return; + } + + *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); } fn maintain_sync(&self, ctx: &BasicContext) { @@ -283,7 +341,7 @@ impl LightSync { } } - // check for aborted sync round. + // handle state transitions. { match mem::replace(&mut *state, SyncState::Idle) { SyncState::Rounds(SyncRound::Abort(reason)) => { @@ -300,11 +358,24 @@ impl LightSync { debug!(target: "sync", "Beginning search after aborted sync round"); self.begin_search(&mut state); } + SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => { + // TODO: compare to best block and switch to another downloading + // method when close. + *state = SyncState::Rounds(SyncRound::begin(num, hash)); + } + SyncState::AncestorSearch(AncestorSearch::Genesis) => { + // Same here. + let g_hash = self.client.chain_info().genesis_hash; + *state = SyncState::Rounds(SyncRound::begin(0, g_hash)); + } + SyncState::Idle => self.begin_search(&mut state), other => *state = other, // restore displaced state. } } // allow dispatching of requests. + // TODO: maybe wait until the amount of cumulative requests remaining is high enough + // to avoid pumping the failure rate. { *state = match mem::replace(&mut *state, SyncState::Idle) { SyncState::Rounds(round) diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index fb4f5c4ef..7f86faf2d 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -299,9 +299,7 @@ impl RoundStart { } // called on failed attempt. may trigger a transition after a number of attempts. - // a failed attempt is defined as: - // - any time we try to make a request to a peer and fail - // - any time a peer returns invalid or incomplete response + // a failed attempt is defined as any time a peer returns invalid or incomplete response fn failed_attempt(mut self) -> SyncRound { self.attempt += 1; @@ -371,7 +369,6 @@ impl RoundStart { where D: Fn(HeadersRequest) -> Option { if self.pending_req.is_none() { - // beginning offset + first block expected after last header we have. let start = (self.start_block.0 + 1) + self.sparse_headers.len() as u64 * (ROUND_SKIP + 1); @@ -383,9 +380,8 @@ impl RoundStart { reverse: false, }; - match dispatcher(headers_request.clone()) { - Some(req_id) => self.pending_req = Some((req_id, headers_request)), - None => return self.failed_attempt(), + if let Some(req_id) = dispatcher(headers_request.clone()) { + self.pending_req = Some((req_id, headers_request)); } } @@ -410,6 +406,11 @@ impl SyncRound { SyncRound::Abort(reason) } + /// Begin sync rounds from a starting block. + pub fn begin(num: u64, hash: H256) -> Self { + SyncRound::Start(RoundStart::new((num, hash))) + } + /// Process an answer to a request. Unknown requests will be ignored. pub fn process_response(self, ctx: &R) -> Self { match self {