reassign requests indefinitely

This commit is contained in:
Robert Habermeier 2017-02-07 16:49:14 +01:00
parent b37124991c
commit 04fe72face

View File

@ -128,6 +128,8 @@ impl OnDemand {
}],
});
let pending = Pending::HeaderByNumber(req, sender);
// we're looking for a peer with serveHeaders who's far enough along in the
// chain.
for (id, peer) in self.peers.read().iter() {
@ -137,7 +139,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::HeaderByNumber(req, sender)
pending,
);
return
},
@ -147,9 +149,8 @@ impl OnDemand {
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
self.orphaned_requests.write().push(pending)
}
/// Request a header by hash. This is less accurate than by-number because we don't know
@ -177,16 +178,17 @@ impl OnDemand {
.collect::<Vec<_>>();
let mut rng = ::rand::thread_rng();
::rand::Rng::shuffle(&mut rng, &mut potential_peers);
let pending = Pending::HeaderByHash(req, sender);
for id in potential_peers {
match ctx.request_from(id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::HeaderByHash(req, sender),
pending,
);
return
}
@ -195,9 +197,8 @@ impl OnDemand {
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
self.orphaned_requests.write().push(pending)
}
/// Request a block, given its header. Block bodies are requestable by hash only,
@ -225,6 +226,7 @@ impl OnDemand {
let les_req = LesRequest::Bodies(les_request::Bodies {
block_hashes: vec![req.hash],
});
let pending = Pending::Block(req, sender);
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
@ -234,7 +236,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::Block(req, sender)
pending,
);
return
}
@ -244,9 +246,8 @@ impl OnDemand {
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
self.orphaned_requests.write().push(pending)
}
/// Request the receipts for a block. The header serves two purposes:
@ -269,6 +270,7 @@ impl OnDemand {
let les_req = LesRequest::Receipts(les_request::Receipts {
block_hashes: vec![req.0.hash()],
});
let pending = Pending::BlockReceipts(req, sender);
// we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() {
@ -278,7 +280,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::BlockReceipts(req, sender)
pending,
);
return
}
@ -288,9 +290,8 @@ impl OnDemand {
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
self.orphaned_requests.write().push(pending)
}
/// Request an account by address and block header -- which gives a hash to query and a state root
@ -311,6 +312,7 @@ impl OnDemand {
from_level: 0,
}],
});
let pending = Pending::Account(req, sender);
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
@ -320,7 +322,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::Account(req, sender)
pending,
);
return
}
@ -330,9 +332,8 @@ impl OnDemand {
}
}
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
self.orphaned_requests.write().push(pending)
}
/// Request code by address, known code hash, and block header.
@ -357,6 +358,7 @@ impl OnDemand {
account_key: ::util::Hashable::sha3(&req.address),
}]
});
let pending = Pending::Code(req, sender);
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
@ -366,7 +368,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
Pending::Code(req, sender)
pending
);
return
}
@ -376,40 +378,79 @@ impl OnDemand {
}
}
// TODO: retrying.
trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable));
self.orphaned_requests.write().push(pending)
}
// dispatch orphaned requests, and discard those for which the corresponding
// receiver has been dropped.
fn dispatch_orphaned(&self, ctx: &BasicContext) {
// wrapper future for calling `poll_cancel` on our `Senders` to preserve
// the invariant that it's always within a task.
struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>);
impl<'a, T: 'a> Future for CheckHangup<'a, T> {
type Item = bool;
type Error = ();
fn poll(&mut self) -> Poll<bool, ()> {
Ok(Async::Ready(match self.0.poll_cancel() {
Ok(Async::NotReady) => false, // hasn't hung up.
_ => true, // has hung up.
}))
}
}
// check whether a sender's hung up (using `wait` to preserve the task invariant)
// returns true if has hung up, false otherwise.
fn check_hangup<T>(send: &mut Sender<T>) -> bool {
CheckHangup(send).wait().expect("CheckHangup always returns ok; qed")
}
if self.orphaned_requests.read().is_empty() { return }
let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new());
for orphaned in to_dispatch {
match orphaned {
Pending::HeaderByNumber(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) },
Pending::HeaderByHash(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) },
Pending::Block(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_block(ctx, req, sender) },
Pending::BlockReceipts(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_block_receipts(ctx, req, sender) },
Pending::Account(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_account(ctx, req, sender) },
Pending::Code(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_code(ctx, req, sender) },
}
}
}
}
impl Handler for OnDemand {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() });
self.dispatch_orphaned(ctx.as_basic());
}
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
self.peers.write().remove(&ctx.peer());
let ctx = ctx.as_basic();
for unfulfilled in unfulfilled {
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) {
trace!(target: "on_demand", "Attempting to reassign dropped request");
match pending {
Pending::HeaderByNumber(req, sender)
=> self.dispatch_header_by_number(ctx, req, sender),
Pending::HeaderByHash(req, sender)
=> self.dispatch_header_by_hash(ctx, req, sender),
Pending::Block(req, sender)
=> self.dispatch_block(ctx, req, sender),
Pending::BlockReceipts(req, sender)
=> self.dispatch_block_receipts(ctx, req, sender),
Pending::Account(req, sender)
=> self.dispatch_account(ctx, req, sender),
Pending::Code(req, sender)
=> self.dispatch_code(ctx, req, sender),
{
let mut orphaned = self.orphaned_requests.write();
for unfulfilled in unfulfilled {
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) {
trace!(target: "on_demand", "Attempting to reassign dropped request");
orphaned.push(pending);
}
}
}
self.dispatch_orphaned(ctx);
}
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
@ -418,6 +459,8 @@ impl Handler for OnDemand {
peer.status.update_from(&announcement);
peer.capabilities.update_from(&announcement);
}
self.dispatch_orphaned(ctx.as_basic());
}
fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec<Bytes>)]) {
@ -587,6 +630,10 @@ impl Handler for OnDemand {
_ => panic!("Only code request fetches code; qed"),
}
}
fn tick(&self, ctx: &BasicContext) {
self.dispatch_orphaned(ctx)
}
}
#[cfg(test)]