Fix light client informant while syncing (#9932)

* Add `is_idle` to LightSync to check importing status

* Use SyncStateWrapper to make sure is_idle gets updates

* Update is_major_import to use verified queue size as well

* Add comment for `is_idle`

* Add Debug to `SyncStateWrapper`

* `fn get` -> `fn into_inner`
This commit is contained in:
Nicolas Gotchac 2018-11-26 12:05:02 +01:00 committed by Andronik Ordian
parent 832c4a7565
commit 0d593199d0
2 changed files with 80 additions and 48 deletions

View File

@ -34,6 +34,7 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::mem; use std::mem;
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
@ -213,6 +214,44 @@ enum SyncState {
Rounds(SyncRound), Rounds(SyncRound),
} }
/// A wrapper around the SyncState that makes sure to
/// update the giving reference to `is_idle`
#[derive(Debug)]
struct SyncStateWrapper {
state: SyncState,
}
impl SyncStateWrapper {
/// Create a new wrapper for SyncState::Idle
pub fn idle() -> Self {
SyncStateWrapper {
state: SyncState::Idle,
}
}
/// Set the new state's value, making sure `is_idle` gets updated
pub fn set(&mut self, state: SyncState, is_idle_handle: &mut bool) {
*is_idle_handle = match state {
SyncState::Idle => true,
_ => false,
};
self.state = state;
}
/// Returns the internal state's value
pub fn into_inner(self) -> SyncState {
self.state
}
}
impl Deref for SyncStateWrapper {
type Target = SyncState;
fn deref(&self) -> &SyncState {
&self.state
}
}
struct ResponseCtx<'a> { struct ResponseCtx<'a> {
peer: PeerId, peer: PeerId,
req_id: ReqId, req_id: ReqId,
@ -235,7 +274,9 @@ pub struct LightSync<L: AsLightClient> {
pending_reqs: Mutex<HashMap<ReqId, PendingReq>>, // requests from this handler pending_reqs: Mutex<HashMap<ReqId, PendingReq>>, // requests from this handler
client: Arc<L>, client: Arc<L>,
rng: Mutex<OsRng>, rng: Mutex<OsRng>,
state: Mutex<SyncState>, state: Mutex<SyncStateWrapper>,
// We duplicate this state tracking to avoid deadlocks in `is_major_importing`.
is_idle: Mutex<bool>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -309,16 +350,17 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
if new_best.is_none() { if new_best.is_none() {
debug!(target: "sync", "No peers remain. Reverting to idle"); debug!(target: "sync", "No peers remain. Reverting to idle");
*self.state.lock() = SyncState::Idle; self.set_state(&mut self.state.lock(), SyncState::Idle);
} else { } else {
let mut state = self.state.lock(); let mut state = self.state.lock();
*state = match mem::replace(&mut *state, SyncState::Idle) { let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Idle => SyncState::Idle, SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.requests_abandoned(unfulfilled)), SyncState::AncestorSearch(search.requests_abandoned(unfulfilled)),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)), SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)),
}; };
self.set_state(&mut state, next_state);
} }
self.maintain_sync(ctx.as_basic()); self.maintain_sync(ctx.as_basic());
@ -390,12 +432,13 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
data: headers, data: headers,
}; };
*state = match mem::replace(&mut *state, SyncState::Idle) { let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Idle => SyncState::Idle, SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)), SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)),
SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)), SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)),
}; };
self.set_state(&mut state, next_state);
} }
self.maintain_sync(ctx.as_basic()); self.maintain_sync(ctx.as_basic());
@ -408,12 +451,18 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
// private helpers // private helpers
impl<L: AsLightClient> LightSync<L> { impl<L: AsLightClient> LightSync<L> {
/// Sets the LightSync's state, and update
/// `is_idle`
fn set_state(&self, state: &mut SyncStateWrapper, next_state: SyncState) {
state.set(next_state, &mut self.is_idle.lock());
}
// Begins a search for the common ancestor and our best block. // Begins a search for the common ancestor and our best block.
// does not lock state, instead has a mutable reference to it passed. // does not lock state, instead has a mutable reference to it passed.
fn begin_search(&self, state: &mut SyncState) { fn begin_search(&self, state: &mut SyncStateWrapper) {
if let None = *self.best_seen.lock() { if let None = *self.best_seen.lock() {
// no peers. // no peers.
*state = SyncState::Idle; self.set_state(state, SyncState::Idle);
return; return;
} }
@ -422,7 +471,8 @@ impl<L: AsLightClient> LightSync<L> {
trace!(target: "sync", "Beginning search for common ancestor from {:?}", trace!(target: "sync", "Beginning search for common ancestor from {:?}",
(chain_info.best_block_number, chain_info.best_block_hash)); (chain_info.best_block_number, chain_info.best_block_hash));
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); let next_state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
self.set_state(state, next_state);
} }
// handles request dispatch, block import, state machine transitions, and timeouts. // handles request dispatch, block import, state machine transitions, and timeouts.
@ -435,7 +485,7 @@ impl<L: AsLightClient> LightSync<L> {
let chain_info = client.chain_info(); let chain_info = client.chain_info();
let mut state = self.state.lock(); let mut state = self.state.lock();
debug!(target: "sync", "Maintaining sync ({:?})", &*state); debug!(target: "sync", "Maintaining sync ({:?})", **state);
// drain any pending blocks into the queue. // drain any pending blocks into the queue.
{ {
@ -445,11 +495,12 @@ impl<L: AsLightClient> LightSync<L> {
loop { loop {
if client.queue_info().is_full() { break } if client.queue_info().is_full() { break }
*state = match mem::replace(&mut *state, SyncState::Idle) { let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Rounds(round) SyncState::Rounds(round)
=> SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))), => SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))),
other => other, other => other,
}; };
self.set_state(&mut state, next_state);
if sink.is_empty() { break } if sink.is_empty() { break }
trace!(target: "sync", "Drained {} headers to import", sink.len()); trace!(target: "sync", "Drained {} headers to import", sink.len());
@ -483,15 +534,15 @@ impl<L: AsLightClient> LightSync<L> {
let network_score = other.as_ref().map(|target| target.head_td); let network_score = other.as_ref().map(|target| target.head_td);
trace!(target: "sync", "No target to sync to. Network score: {:?}, Local score: {:?}", trace!(target: "sync", "No target to sync to. Network score: {:?}, Local score: {:?}",
network_score, best_td); network_score, best_td);
*state = SyncState::Idle; self.set_state(&mut state, SyncState::Idle);
return; return;
} }
}; };
match mem::replace(&mut *state, SyncState::Idle) { match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Rounds(SyncRound::Abort(reason, remaining)) => { SyncState::Rounds(SyncRound::Abort(reason, remaining)) => {
if remaining.len() > 0 { if remaining.len() > 0 {
*state = SyncState::Rounds(SyncRound::Abort(reason, remaining)); self.set_state(&mut state, SyncState::Rounds(SyncRound::Abort(reason, remaining)));
return; return;
} }
@ -505,7 +556,7 @@ impl<L: AsLightClient> LightSync<L> {
AbortReason::NoResponses => {} AbortReason::NoResponses => {}
AbortReason::TargetReached => { AbortReason::TargetReached => {
debug!(target: "sync", "Sync target reached. Going idle"); debug!(target: "sync", "Sync target reached. Going idle");
*state = SyncState::Idle; self.set_state(&mut state, SyncState::Idle);
return; return;
} }
} }
@ -514,15 +565,15 @@ impl<L: AsLightClient> LightSync<L> {
self.begin_search(&mut state); self.begin_search(&mut state);
} }
SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => { SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => {
*state = SyncState::Rounds(SyncRound::begin((num, hash), sync_target)); self.set_state(&mut state, SyncState::Rounds(SyncRound::begin((num, hash), sync_target)));
} }
SyncState::AncestorSearch(AncestorSearch::Genesis) => { SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here. // Same here.
let g_hash = chain_info.genesis_hash; let g_hash = chain_info.genesis_hash;
*state = SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target)); self.set_state(&mut state, SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target)));
} }
SyncState::Idle => self.begin_search(&mut state), SyncState::Idle => self.begin_search(&mut state),
other => *state = other, // restore displaced state. other => self.set_state(&mut state, other), // restore displaced state.
} }
} }
@ -543,12 +594,13 @@ impl<L: AsLightClient> LightSync<L> {
} }
drop(pending_reqs); drop(pending_reqs);
*state = match mem::replace(&mut *state, SyncState::Idle) { let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Idle => SyncState::Idle, SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.requests_abandoned(&unfulfilled)), SyncState::AncestorSearch(search.requests_abandoned(&unfulfilled)),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(&unfulfilled)), SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(&unfulfilled)),
}; };
self.set_state(&mut state, next_state);
} }
} }
@ -605,34 +657,14 @@ impl<L: AsLightClient> LightSync<L> {
None None
}; };
*state = match mem::replace(&mut *state, SyncState::Idle) { let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Rounds(round) => SyncState::Rounds(round) =>
SyncState::Rounds(round.dispatch_requests(dispatcher)), SyncState::Rounds(round.dispatch_requests(dispatcher)),
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.dispatch_request(dispatcher)), SyncState::AncestorSearch(search.dispatch_request(dispatcher)),
other => other, other => other,
}; };
} self.set_state(&mut state, next_state);
}
fn is_major_importing_do_wait(&self, wait: bool) -> bool {
const EMPTY_QUEUE: usize = 3;
if self.client.as_light_client().queue_info().unverified_queue_size > EMPTY_QUEUE {
return true;
}
let mg_state = if wait {
self.state.lock()
} else {
if let Some(mg_state) = self.state.try_lock() {
mg_state
} else {
return false;
}
};
match *mg_state {
SyncState::Idle => false,
_ => true,
} }
} }
} }
@ -651,7 +683,8 @@ impl<L: AsLightClient> LightSync<L> {
pending_reqs: Mutex::new(HashMap::new()), pending_reqs: Mutex::new(HashMap::new()),
client: client, client: client,
rng: Mutex::new(OsRng::new()?), rng: Mutex::new(OsRng::new()?),
state: Mutex::new(SyncState::Idle), state: Mutex::new(SyncStateWrapper::idle()),
is_idle: Mutex::new(true),
}) })
} }
} }
@ -666,9 +699,6 @@ pub trait SyncInfo {
/// Whether major sync is underway. /// Whether major sync is underway.
fn is_major_importing(&self) -> bool; fn is_major_importing(&self) -> bool;
/// Whether major sync is underway, skipping some synchronization.
fn is_major_importing_no_sync(&self) -> bool;
} }
impl<L: AsLightClient> SyncInfo for LightSync<L> { impl<L: AsLightClient> SyncInfo for LightSync<L> {
@ -681,11 +711,13 @@ impl<L: AsLightClient> SyncInfo for LightSync<L> {
} }
fn is_major_importing(&self) -> bool { fn is_major_importing(&self) -> bool {
self.is_major_importing_do_wait(true) const EMPTY_QUEUE: usize = 3;
}
fn is_major_importing_no_sync(&self) -> bool { let queue_info = self.client.as_light_client().queue_info();
self.is_major_importing_do_wait(false) let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > EMPTY_QUEUE;
let is_syncing = !*self.is_idle.lock();
is_verifying || is_syncing
} }
} }

View File

@ -184,7 +184,7 @@ impl InformantData for LightNodeInformantData {
fn executes_transactions(&self) -> bool { false } fn executes_transactions(&self) -> bool { false }
fn is_major_importing(&self) -> bool { fn is_major_importing(&self) -> bool {
self.sync.is_major_importing_no_sync() self.sync.is_major_importing()
} }
fn report(&self) -> Report { fn report(&self) -> Report {