better bookkeeping of requests in light sync

This commit is contained in:
Robert Habermeier 2017-03-23 15:44:16 +01:00
parent c75b49667e
commit 10a470a5fa
4 changed files with 36 additions and 10 deletions

View File

@ -61,10 +61,12 @@ impl<'a> IoContext for NetworkContext<'a> {
}
fn disconnect_peer(&self, peer: PeerId) {
trace!(target: "pip", "Initiating disconnect of peer {}", peer);
NetworkContext::disconnect_peer(self, peer);
}
fn disable_peer(&self, peer: PeerId) {
trace!(target: "pip", "Initiating disable of peer {}", peer);
NetworkContext::disable_peer(self, peer);
}

View File

@ -424,6 +424,8 @@ impl Handler for OnDemand {
}
};
trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind());
// handle the response appropriately for the request.
// all branches which do not return early lead to disabling of the peer
// due to misbehavior.
@ -443,7 +445,7 @@ impl Handler for OnDemand {
}
return
}
Err(e) => warn!("Error handling response for header request: {:?}", e),
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
}
}
}
@ -456,7 +458,7 @@ impl Handler for OnDemand {
let _ = sender.send(header);
return
}
Err(e) => warn!("Error handling response for header request: {:?}", e),
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
}
}
}
@ -469,7 +471,7 @@ impl Handler for OnDemand {
let _ = sender.send(block);
return
}
Err(e) => warn!("Error handling response for block request: {:?}", e),
Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e),
}
}
}
@ -482,7 +484,7 @@ impl Handler for OnDemand {
let _ = sender.send(receipts);
return
}
Err(e) => warn!("Error handling response for receipts request: {:?}", e),
Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e),
}
}
}
@ -495,7 +497,7 @@ impl Handler for OnDemand {
let _ = sender.send(maybe_account);
return
}
Err(e) => warn!("Error handling response for state request: {:?}", e),
Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e),
}
}
}
@ -506,7 +508,7 @@ impl Handler for OnDemand {
let _ = sender.send(response.code.clone());
return
}
Err(e) => warn!("Error handling response for code request: {:?}", e),
Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e),
}
}
}
@ -521,7 +523,7 @@ impl Handler for OnDemand {
let _ = sender.send(Err(err));
return
}
ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"),
ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"),
}
}
}

View File

@ -435,7 +435,8 @@ impl Response {
}
}
fn kind(&self) -> Kind {
/// Inspect the kind of this response.
pub fn kind(&self) -> Kind {
match *self {
Response::Headers(_) => Kind::Headers,
Response::HeaderProof(_) => Kind::HeaderProof,

View File

@ -32,7 +32,7 @@
//! announced blocks.
//! - On bad block/response, punish peer and reset.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::Arc;
@ -150,6 +150,19 @@ impl AncestorSearch {
}
}
fn requests_abandoned(self, req_ids: &[ReqId]) -> AncestorSearch {
match self {
AncestorSearch::Awaiting(id, start, req) => {
if req_ids.iter().find(|&x| x == &id).is_some() {
AncestorSearch::Queued(start)
} else {
AncestorSearch::Awaiting(id, start, req)
}
}
other => other,
}
}
fn dispatch_request<F>(self, mut dispatcher: F) -> AncestorSearch
where F: FnMut(HeadersRequest) -> Option<ReqId>
{
@ -209,6 +222,7 @@ pub struct LightSync<L: AsLightClient> {
start_block_number: u64,
best_seen: Mutex<Option<ChainInfo>>, // best seen block on the network.
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>, // peers which are relevant to synchronization.
pending_reqs: Mutex<HashSet<ReqId>>, // requests from this handler.
client: Arc<L>,
rng: Mutex<OsRng>,
state: Mutex<SyncState>,
@ -271,7 +285,8 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
*state = match mem::replace(&mut *state, SyncState::Idle) {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search),
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.requests_abandoned(unfulfilled)),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)),
};
}
@ -321,6 +336,10 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
return
}
if !self.pending_reqs.lock().remove(&req_id) {
return
}
let headers = match responses.get(0) {
Some(&request::Response::Headers(ref response)) => &response.headers[..],
Some(_) => {
@ -496,6 +515,7 @@ impl<L: AsLightClient> LightSync<L> {
for peer in &peer_ids {
match ctx.request_from(*peer, request.clone()) {
Ok(id) => {
self.pending_reqs.lock().insert(id.clone());
return Some(id)
}
Err(NetError::NoCredits) => {}
@ -529,6 +549,7 @@ impl<L: AsLightClient> LightSync<L> {
start_block_number: client.as_light_client().chain_info().best_block_number,
best_seen: Mutex::new(None),
peers: RwLock::new(HashMap::new()),
pending_reqs: Mutex::new(HashSet::new()),
client: client,
rng: Mutex::new(try!(OsRng::new())),
state: Mutex::new(SyncState::Idle),