Add a timeout for light client sync requests (#7848)

* Add a timeout for light client sync requests

* Adjusting timeout to number of headers
This commit is contained in:
Pierre Krieger 2018-02-14 11:31:14 +01:00 committed by Afri Schoedon
parent ebb92947a3
commit bf57eb8978
2 changed files with 57 additions and 8 deletions

View File

@ -35,6 +35,7 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Instant, Duration};
use ethcore::encoded; use ethcore::encoded;
use light::client::{AsLightClient, LightChainClient}; use light::client::{AsLightClient, LightChainClient};
@ -57,6 +58,13 @@ mod sync_round;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
// Base number of milliseconds for the header request timeout.
const REQ_TIMEOUT_MILLISECS_BASE: u64 = 7000;
// Additional number of milliseconds for each requested header.
// If we request N headers, then the timeout will be:
// REQ_TIMEOUT_MILLISECS_BASE + N * REQ_TIMEOUT_MILLISECS_PER_HEADER
const REQ_TIMEOUT_MILLISECS_PER_HEADER: u64 = 10;
/// Peer chain info. /// Peer chain info.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
struct ChainInfo { struct ChainInfo {
@ -224,12 +232,18 @@ pub struct LightSync<L: AsLightClient> {
start_block_number: u64, start_block_number: u64,
best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network. best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization. peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
pending_reqs: Mutex<HashSet<ReqId>>, // requests from this handler. pending_reqs: Mutex<HashMap<ReqId, PendingReq>>, // requests from this handler
client: Arc<L>, client: Arc<L>,
rng: Mutex<OsRng>, rng: Mutex<OsRng>,
state: Mutex<SyncState>, state: Mutex<SyncState>,
} }
#[derive(Debug, Clone)]
struct PendingReq {
started: Instant,
timeout: Duration,
}
impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> { impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
fn on_connect( fn on_connect(
&self, &self,
@ -352,7 +366,7 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
return return
} }
if !self.pending_reqs.lock().remove(&req_id) { if self.pending_reqs.lock().remove(&req_id).is_none() {
return return
} }
@ -411,7 +425,7 @@ impl<L: AsLightClient> LightSync<L> {
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
} }
// handles request dispatch, block import, and state machine transitions. // handles request dispatch, block import, state machine transitions, and timeouts.
fn maintain_sync(&self, ctx: &BasicContext) { fn maintain_sync(&self, ctx: &BasicContext) {
const DRAIN_AMOUNT: usize = 128; const DRAIN_AMOUNT: usize = 128;
@ -501,6 +515,32 @@ impl<L: AsLightClient> LightSync<L> {
} }
} }
// handle requests timeouts
{
let mut pending_reqs = self.pending_reqs.lock();
let mut unfulfilled = Vec::new();
for (req_id, info) in pending_reqs.iter() {
if info.started.elapsed() >= info.timeout {
debug!(target: "sync", "{} timed out", req_id);
unfulfilled.push(req_id.clone());
}
}
if !unfulfilled.is_empty() {
for unfulfilled in unfulfilled.iter() {
pending_reqs.remove(unfulfilled);
}
drop(pending_reqs);
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.requests_abandoned(&unfulfilled)),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(&unfulfilled)),
};
}
}
// allow dispatching of requests. // allow dispatching of requests.
{ {
let peers = self.peers.read(); let peers = self.peers.read();
@ -534,7 +574,12 @@ impl<L: AsLightClient> LightSync<L> {
if requested_from.contains(peer) { continue } if requested_from.contains(peer) { continue }
match ctx.request_from(*peer, request.clone()) { match ctx.request_from(*peer, request.clone()) {
Ok(id) => { Ok(id) => {
self.pending_reqs.lock().insert(id.clone()); let timeout_ms = REQ_TIMEOUT_MILLISECS_BASE +
req.max * REQ_TIMEOUT_MILLISECS_PER_HEADER;
self.pending_reqs.lock().insert(id.clone(), PendingReq {
started: Instant::now(),
timeout: Duration::from_millis(timeout_ms),
});
requested_from.insert(peer.clone()); requested_from.insert(peer.clone());
return Some(id) return Some(id)
@ -570,7 +615,7 @@ impl<L: AsLightClient> LightSync<L> {
start_block_number: client.as_light_client().chain_info().best_block_number, start_block_number: client.as_light_client().chain_info().best_block_number,
best_seen: Mutex::new(None), best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
pending_reqs: Mutex::new(HashSet::new()), pending_reqs: Mutex::new(HashMap::new()),
client: client, client: client,
rng: Mutex::new(OsRng::new()?), rng: Mutex::new(OsRng::new()?),
state: Mutex::new(SyncState::Idle), state: Mutex::new(SyncState::Idle),

View File

@ -179,7 +179,7 @@ impl Fetcher {
}; };
trace!(target: "sync", "Received response for subchain ({} -> {})", trace!(target: "sync", "Received response for subchain ({} -> {})",
request.subchain_parent.0 + 1, request.subchain_end.0); request.subchain_parent.0, request.subchain_end.0);
let headers = ctx.data(); let headers = ctx.data();
@ -241,6 +241,8 @@ impl Fetcher {
} }
fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound { fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound {
trace!(target: "sync", "Abandonned requests {:?}", abandoned);
for abandoned in abandoned { for abandoned in abandoned {
match self.pending.remove(abandoned) { match self.pending.remove(abandoned) {
None => {}, None => {},
@ -258,12 +260,14 @@ impl Fetcher {
while let Some(pending_req) = self.requests.pop() { while let Some(pending_req) = self.requests.pop() {
match dispatcher(pending_req.headers_request.clone()) { match dispatcher(pending_req.headers_request.clone()) {
Some(req_id) => { Some(req_id) => {
trace!(target: "sync", "Assigned request for subchain ({} -> {})", trace!(target: "sync", "Assigned request {} for subchain ({} -> {})",
pending_req.subchain_parent.0 + 1, pending_req.subchain_end.0); req_id, pending_req.subchain_parent.0, pending_req.subchain_end.0);
self.pending.insert(req_id, pending_req); self.pending.insert(req_id, pending_req);
} }
None => { None => {
trace!(target: "sync", "Failed to assign request for subchain ({} -> {})",
pending_req.subchain_parent.0, pending_req.subchain_end.0);
self.requests.push(pending_req); self.requests.push(pending_req);
break; break;
} }