diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 12bb6a11e..bd62b4c70 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -38,8 +38,9 @@ use light::net::{ use light::request; use network::PeerId; use util::{Bytes, U256, H256, Mutex, RwLock}; +use rand::{Rng, OsRng}; -use self::sync_round::{SyncRound, ResponseContext}; +use self::sync_round::{AbortReason, SyncRound, ResponseContext}; mod response; mod sync_round; @@ -65,10 +66,10 @@ impl Peer { } } -// Search for a common ancestor. +// Search for a common ancestor with the best chain. struct AncestorSearch { last_batched: u64, - req_id: ReqId, + req_id: Option, } // synchronization state machine. @@ -101,6 +102,7 @@ pub struct LightSync { best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, + rng: OsRng, state: Mutex, } @@ -240,10 +242,76 @@ impl Handler for LightSync { // private helpers impl LightSync { - fn maintain_sync(&self, ctx: &BasicContext) { - const DRAIN_AMOUNT: usize = 256; + // Begins a search for the common ancestor and our best block. + // does not lock state, instead has a mutable reference to it passed. + fn begin_search(&self, _state: &mut SyncState) { + self.client.clear_queue(); - unimplemented!() + unimplemented!(); + } + + fn maintain_sync(&self, ctx: &BasicContext) { + const DRAIN_AMOUNT: usize = 128; + + let mut state = self.state.lock(); + + // drain any pending blocks into the queue. + { + let mut sink = Vec::with_capacity(DRAIN_AMOUNT); + + 'a: + loop { + let queue_info = self.client.queue_info(); + if queue_info.is_full() { break } + + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Rounds(round) + => SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))), + other => other, + }; + + if sink.is_empty() { break } + + for header in sink.drain(..) { + if let Err(e) = self.client.queue_header(header) { + debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e); + + self.begin_search(&mut state); + break 'a; + } + } + } + } + + // check for aborted sync round. + { + match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Rounds(SyncRound::Abort(reason)) => { + match reason { + AbortReason::BadScaffold(bad_peers) => { + debug!(target: "sync", "Disabling peers responsible for bad scaffold"); + for peer in bad_peers { + ctx.disable_peer(peer); + } + } + AbortReason::NoResponses => {} + } + + debug!(target: "sync", "Beginning search after aborted sync round"); + self.begin_search(&mut state); + } + other => *state = other, // restore displaced state. + } + } + + // allow dispatching of requests. + { + *state = match mem::replace(&mut *state, SyncState::Idle) { + SyncState::Rounds(round) + => SyncState::Rounds(round.dispatch_requests(|_| unimplemented!())), + other => other, + }; + } } } @@ -253,12 +321,13 @@ impl LightSync { /// /// This won't do anything until registered as a handler /// so it can act on events. - pub fn new(client: Arc) -> Self { - LightSync { + pub fn new(client: Arc) -> Result { + Ok(LightSync { best_seen: Mutex::new(None), peers: RwLock::new(HashMap::new()), client: client, + rng: try!(OsRng::new()), state: Mutex::new(SyncState::Idle), - } + }) } }