From 359d433292815862165c4d1be63b9be574b9f184 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 13 Dec 2016 22:26:06 +0100 Subject: [PATCH] light: pass incoming responses to peer state machine --- sync/src/light_sync/mod.rs | 80 +++++++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 5 deletions(-) diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 15e4d72d4..faae5830f 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -24,7 +24,7 @@ //! This is written assuming that the client and sync service are running //! in the same binary; unlike a full node -use std::collections::{BinaryHeap, HashMap}; +use std::collections::HashMap; use std::fmt; use std::sync::Arc; @@ -168,7 +168,7 @@ impl UnconfirmedPeer { /// Once that's found, we can sync to this peer. enum Peer { // Searching for a common ancestor. - SearchCommon(Mutex), + SearchCommon(UnconfirmedPeer), // A peer we can sync to. SyncTo(ChainInfo), } @@ -176,7 +176,7 @@ enum Peer { /// Light client synchronization manager. See module docs for more details. pub struct LightSync { best_seen: Mutex>, // best seen block on the network. - peers: RwLock>, // peers which are relevant to synchronization. + peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, } @@ -194,6 +194,8 @@ impl Handler for LightSync { head_hash: status.head_hash, head_num: status.head_num, }; + + trace!(target: "sync", "Beginning search for common ancestor with peer {}", ctx.peer()); let unconfirmed = match UnconfirmedPeer::create(ctx, chain_info, our_best) { Ok(unconfirmed) => unconfirmed, Err(e) => { @@ -202,7 +204,75 @@ impl Handler for LightSync { } }; - self.peers.write().insert(ctx.peer(), Peer::SearchCommon(Mutex::new(unconfirmed))); + self.peers.write().insert(ctx.peer(), Mutex::new(Peer::SearchCommon(unconfirmed))); + } + + fn on_disconnect(&self, ctx: &EventContext, _unfulfilled: &[ReqId]) { + let peer = ctx.peer(); + + match self.peers.write().remove(&peer).map(|peer_data| peer_data.into_inner()) { + None => {} + Some(Peer::SearchCommon(_)) => { + // unfulfilled requests are unimportant since they are only + // relevant to searching for a common ancestor. + trace!(target: "sync", "Unconfirmed peer {} disconnect", ctx.peer()); + } + Some(Peer::SyncTo(_)) => { + trace!(target: "sync", "") + // in this case we may want to reasssign all unfulfilled requests. + // (probably just by pushing them back into the current downloader's priority queue.) + } + } + } + + fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { + // restart search for common ancestor if necessary. + // restart download if necessary. + // if this is a peer we found irrelevant earlier, we may want to + // re-evaluate their usefulness. + if !self.peers.read().contains_key(&ctx.peer()) { return } + + trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}", + ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth); + + } + + fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) { + let peer = ctx.peer(); + match self.peers.read().get(&peer) { + None => {}, + Some(peer_data) => { + let mut peer_data = peer_data.lock(); + let new_peer = match *peer_data { + Peer::SearchCommon(ref mut unconfirmed) => { + if unconfirmed.req_id != req_id { + trace!(target: "sync", "Ignoring irrelevant response from peer {}", peer); + return; + } + match unconfirmed.check_batch(ctx, &self.client, headers) { + Ok(None) => { + trace!(target: "sync", "Continuing to search for common ancestor with peer {}", peer); + return; + } + Ok(Some(common)) => { + trace!(target: "sync", "Found common ancestor {} with peer {}", peer, common); + let chain_info = unconfirmed.chain_info.clone(); + Peer::SyncTo(chain_info) + } + Err(e) => { + trace!(target: "sync", "Failed to find common ancestor with peer {}: {}", peer, e); + return; + } + } + } + Peer::SyncTo(_) => { + trace!(target: "sync", "Incoming response from peer being synced to."); + }, + }; + + *peer_data = new_peer; + } + } } } @@ -211,7 +281,7 @@ impl LightSync { /// Create a new instance of `LightSync`. /// /// This won't do anything until registered as a handler - /// so it can receive + /// so it can act on events. pub fn new(client: Arc) -> Self { LightSync { best_seen: Mutex::new(None),