light: pass incoming responses to peer state machine
This commit is contained in:
parent
6fb71527e4
commit
359d433292
@ -24,7 +24,7 @@
|
|||||||
//! This is written assuming that the client and sync service are running
|
//! This is written assuming that the client and sync service are running
|
||||||
//! in the same binary; unlike a full node
|
//! in the same binary; unlike a full node
|
||||||
|
|
||||||
use std::collections::{BinaryHeap, HashMap};
|
use std::collections::HashMap;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@ -168,7 +168,7 @@ impl UnconfirmedPeer {
|
|||||||
/// Once that's found, we can sync to this peer.
|
/// Once that's found, we can sync to this peer.
|
||||||
enum Peer {
|
enum Peer {
|
||||||
// Searching for a common ancestor.
|
// Searching for a common ancestor.
|
||||||
SearchCommon(Mutex<UnconfirmedPeer>),
|
SearchCommon(UnconfirmedPeer),
|
||||||
// A peer we can sync to.
|
// A peer we can sync to.
|
||||||
SyncTo(ChainInfo),
|
SyncTo(ChainInfo),
|
||||||
}
|
}
|
||||||
@ -176,7 +176,7 @@ enum Peer {
|
|||||||
/// Light client synchronization manager. See module docs for more details.
|
/// Light client synchronization manager. See module docs for more details.
|
||||||
pub struct LightSync {
|
pub struct LightSync {
|
||||||
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
|
best_seen: Mutex<Option<(H256, U256)>>, // best seen block on the network.
|
||||||
peers: RwLock<HashMap<PeerId, Peer>>, // peers which are relevant to synchronization.
|
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,6 +194,8 @@ impl Handler for LightSync {
|
|||||||
head_hash: status.head_hash,
|
head_hash: status.head_hash,
|
||||||
head_num: status.head_num,
|
head_num: status.head_num,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
trace!(target: "sync", "Beginning search for common ancestor with peer {}", ctx.peer());
|
||||||
let unconfirmed = match UnconfirmedPeer::create(ctx, chain_info, our_best) {
|
let unconfirmed = match UnconfirmedPeer::create(ctx, chain_info, our_best) {
|
||||||
Ok(unconfirmed) => unconfirmed,
|
Ok(unconfirmed) => unconfirmed,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -202,7 +204,75 @@ impl Handler for LightSync {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.peers.write().insert(ctx.peer(), Peer::SearchCommon(Mutex::new(unconfirmed)));
|
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::SearchCommon(unconfirmed)));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_disconnect(&self, ctx: &EventContext, _unfulfilled: &[ReqId]) {
|
||||||
|
let peer = ctx.peer();
|
||||||
|
|
||||||
|
match self.peers.write().remove(&peer).map(|peer_data| peer_data.into_inner()) {
|
||||||
|
None => {}
|
||||||
|
Some(Peer::SearchCommon(_)) => {
|
||||||
|
// unfulfilled requests are unimportant since they are only
|
||||||
|
// relevant to searching for a common ancestor.
|
||||||
|
trace!(target: "sync", "Unconfirmed peer {} disconnect", ctx.peer());
|
||||||
|
}
|
||||||
|
Some(Peer::SyncTo(_)) => {
|
||||||
|
trace!(target: "sync", "")
|
||||||
|
// in this case we may want to reasssign all unfulfilled requests.
|
||||||
|
// (probably just by pushing them back into the current downloader's priority queue.)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
||||||
|
// restart search for common ancestor if necessary.
|
||||||
|
// restart download if necessary.
|
||||||
|
// if this is a peer we found irrelevant earlier, we may want to
|
||||||
|
// re-evaluate their usefulness.
|
||||||
|
if !self.peers.read().contains_key(&ctx.peer()) { return }
|
||||||
|
|
||||||
|
trace!(target: "sync", "Announcement from peer {}: new chain head {:?}, reorg depth {}",
|
||||||
|
ctx.peer(), (announcement.head_hash, announcement.head_num), announcement.reorg_depth);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_block_headers(&self, ctx: &EventContext, req_id: ReqId, headers: &[Bytes]) {
|
||||||
|
let peer = ctx.peer();
|
||||||
|
match self.peers.read().get(&peer) {
|
||||||
|
None => {},
|
||||||
|
Some(peer_data) => {
|
||||||
|
let mut peer_data = peer_data.lock();
|
||||||
|
let new_peer = match *peer_data {
|
||||||
|
Peer::SearchCommon(ref mut unconfirmed) => {
|
||||||
|
if unconfirmed.req_id != req_id {
|
||||||
|
trace!(target: "sync", "Ignoring irrelevant response from peer {}", peer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match unconfirmed.check_batch(ctx, &self.client, headers) {
|
||||||
|
Ok(None) => {
|
||||||
|
trace!(target: "sync", "Continuing to search for common ancestor with peer {}", peer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ok(Some(common)) => {
|
||||||
|
trace!(target: "sync", "Found common ancestor {} with peer {}", peer, common);
|
||||||
|
let chain_info = unconfirmed.chain_info.clone();
|
||||||
|
Peer::SyncTo(chain_info)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
trace!(target: "sync", "Failed to find common ancestor with peer {}: {}", peer, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Peer::SyncTo(_) => {
|
||||||
|
trace!(target: "sync", "Incoming response from peer being synced to.");
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
*peer_data = new_peer;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,7 +281,7 @@ impl LightSync {
|
|||||||
/// Create a new instance of `LightSync`.
|
/// Create a new instance of `LightSync`.
|
||||||
///
|
///
|
||||||
/// This won't do anything until registered as a handler
|
/// This won't do anything until registered as a handler
|
||||||
/// so it can receive
|
/// so it can act on events.
|
||||||
pub fn new(client: Arc<Client>) -> Self {
|
pub fn new(client: Arc<Client>) -> Self {
|
||||||
LightSync {
|
LightSync {
|
||||||
best_seen: Mutex::new(None),
|
best_seen: Mutex::new(None),
|
||||||
|
Loading…
Reference in New Issue
Block a user