diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 12db882f0..ef1bd8742 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -35,6 +35,7 @@ use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::Arc; +use std::time::{Instant, Duration}; use ethcore::encoded; use light::client::{AsLightClient, LightChainClient}; @@ -57,6 +58,13 @@ mod sync_round; #[cfg(test)] mod tests; +// Base number of milliseconds for the header request timeout. +const REQ_TIMEOUT_MILLISECS_BASE: u64 = 7000; +// Additional number of milliseconds for each requested header. +// If we request N headers, then the timeout will be: +// REQ_TIMEOUT_MILLISECS_BASE + N * REQ_TIMEOUT_MILLISECS_PER_HEADER +const REQ_TIMEOUT_MILLISECS_PER_HEADER: u64 = 10; + /// Peer chain info. #[derive(Debug, Clone, PartialEq, Eq)] struct ChainInfo { @@ -224,12 +232,18 @@ pub struct LightSync { start_block_number: u64, best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. - pending_reqs: Mutex>, // requests from this handler. + pending_reqs: Mutex>, // requests from this handler client: Arc, rng: Mutex, state: Mutex, } +#[derive(Debug, Clone)] +struct PendingReq { + started: Instant, + timeout: Duration, +} + impl Handler for LightSync { fn on_connect( &self, @@ -352,7 +366,7 @@ impl Handler for LightSync { return } - if !self.pending_reqs.lock().remove(&req_id) { + if self.pending_reqs.lock().remove(&req_id).is_none() { return } @@ -411,7 +425,7 @@ impl LightSync { *state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number)); } - // handles request dispatch, block import, and state machine transitions. + // handles request dispatch, block import, state machine transitions, and timeouts. fn maintain_sync(&self, ctx: &BasicContext) { const DRAIN_AMOUNT: usize = 128; @@ -501,6 +515,32 @@ impl LightSync { } } + // handle requests timeouts + { + let mut pending_reqs = self.pending_reqs.lock(); + let mut unfulfilled = Vec::new(); + for (req_id, info) in pending_reqs.iter() { + if info.started.elapsed() >= info.timeout { + debug!(target: "sync", "{} timed out", req_id); + unfulfilled.push(req_id.clone()); + } + } + + if !unfulfilled.is_empty() { + for unfulfilled in unfulfilled.iter() { + pending_reqs.remove(unfulfilled); + } + drop(pending_reqs); + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Idle => SyncState::Idle, + SyncState::AncestorSearch(search) => + SyncState::AncestorSearch(search.requests_abandoned(&unfulfilled)), + SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(&unfulfilled)), + }; + } + } + // allow dispatching of requests. { let peers = self.peers.read(); @@ -534,7 +574,12 @@ impl LightSync { if requested_from.contains(peer) { continue } match ctx.request_from(*peer, request.clone()) { Ok(id) => { - self.pending_reqs.lock().insert(id.clone()); + let timeout_ms = REQ_TIMEOUT_MILLISECS_BASE + + req.max * REQ_TIMEOUT_MILLISECS_PER_HEADER; + self.pending_reqs.lock().insert(id.clone(), PendingReq { + started: Instant::now(), + timeout: Duration::from_millis(timeout_ms), + }); requested_from.insert(peer.clone()); return Some(id) @@ -570,7 +615,7 @@ impl LightSync { start_block_number: client.as_light_client().chain_info().best_block_number, best_seen: Mutex::new(None), peers: RwLock::new(HashMap::new()), - pending_reqs: Mutex::new(HashSet::new()), + pending_reqs: Mutex::new(HashMap::new()), client: client, rng: Mutex::new(OsRng::new()?), state: Mutex::new(SyncState::Idle), diff --git a/sync/src/light_sync/sync_round.rs b/sync/src/light_sync/sync_round.rs index 4fa7a528b..d477ecc81 100644 --- a/sync/src/light_sync/sync_round.rs +++ b/sync/src/light_sync/sync_round.rs @@ -179,7 +179,7 @@ impl Fetcher { }; trace!(target: "sync", "Received response for subchain ({} -> {})", - request.subchain_parent.0 + 1, request.subchain_end.0); + request.subchain_parent.0, request.subchain_end.0); let headers = ctx.data(); @@ -241,6 +241,8 @@ impl Fetcher { } fn requests_abandoned(mut self, abandoned: &[ReqId]) -> SyncRound { + trace!(target: "sync", "Abandonned requests {:?}", abandoned); + for abandoned in abandoned { match self.pending.remove(abandoned) { None => {}, @@ -258,12 +260,14 @@ impl Fetcher { while let Some(pending_req) = self.requests.pop() { match dispatcher(pending_req.headers_request.clone()) { Some(req_id) => { - trace!(target: "sync", "Assigned request for subchain ({} -> {})", - pending_req.subchain_parent.0 + 1, pending_req.subchain_end.0); + trace!(target: "sync", "Assigned request {} for subchain ({} -> {})", + req_id, pending_req.subchain_parent.0, pending_req.subchain_end.0); self.pending.insert(req_id, pending_req); } None => { + trace!(target: "sync", "Failed to assign request for subchain ({} -> {})", + pending_req.subchain_parent.0, pending_req.subchain_end.0); self.requests.push(pending_req); break; }