diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 7e258282b..14ea09d4c 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -128,6 +128,8 @@ impl OnDemand { }], }); + let pending = Pending::HeaderByNumber(req, sender); + // we're looking for a peer with serveHeaders who's far enough along in the // chain. for (id, peer) in self.peers.read().iter() { @@ -137,7 +139,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::HeaderByNumber(req, sender) + pending, ); return }, @@ -147,9 +149,8 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request a header by hash. This is less accurate than by-number because we don't know @@ -177,16 +178,17 @@ impl OnDemand { .collect::>(); let mut rng = ::rand::thread_rng(); - ::rand::Rng::shuffle(&mut rng, &mut potential_peers); + let pending = Pending::HeaderByHash(req, sender); + for id in potential_peers { match ctx.request_from(id, les_req.clone()) { Ok(req_id) => { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::HeaderByHash(req, sender), + pending, ); return } @@ -195,9 +197,8 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request a block, given its header. Block bodies are requestable by hash only, @@ -225,6 +226,7 @@ impl OnDemand { let les_req = LesRequest::Bodies(les_request::Bodies { block_hashes: vec![req.hash], }); + let pending = Pending::Block(req, sender); // we're looking for a peer with serveChainSince(num) for (id, peer) in self.peers.read().iter() { @@ -234,7 +236,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::Block(req, sender) + pending, ); return } @@ -244,9 +246,8 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request the receipts for a block. The header serves two purposes: @@ -269,6 +270,7 @@ impl OnDemand { let les_req = LesRequest::Receipts(les_request::Receipts { block_hashes: vec![req.0.hash()], }); + let pending = Pending::BlockReceipts(req, sender); // we're looking for a peer with serveChainSince(num) for (id, peer) in self.peers.read().iter() { @@ -278,7 +280,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::BlockReceipts(req, sender) + pending, ); return } @@ -288,9 +290,8 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request an account by address and block header -- which gives a hash to query and a state root @@ -311,6 +312,7 @@ impl OnDemand { from_level: 0, }], }); + let pending = Pending::Account(req, sender); // we're looking for a peer with serveStateSince(num) for (id, peer) in self.peers.read().iter() { @@ -320,7 +322,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::Account(req, sender) + pending, ); return } @@ -330,9 +332,8 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request code by address, known code hash, and block header. @@ -357,6 +358,7 @@ impl OnDemand { account_key: ::util::Hashable::sha3(&req.address), }] }); + let pending = Pending::Code(req, sender); // we're looking for a peer with serveStateSince(num) for (id, peer) in self.peers.read().iter() { @@ -366,7 +368,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::Code(req, sender) + pending ); return } @@ -376,40 +378,79 @@ impl OnDemand { } } - // TODO: retrying. trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) + } + + // dispatch orphaned requests, and discard those for which the corresponding + // receiver has been dropped. + fn dispatch_orphaned(&self, ctx: &BasicContext) { + // wrapper future for calling `poll_cancel` on our `Senders` to preserve + // the invariant that it's always within a task. + struct CheckHangup<'a, T: 'a>(&'a mut Sender); + + impl<'a, T: 'a> Future for CheckHangup<'a, T> { + type Item = bool; + type Error = (); + + fn poll(&mut self) -> Poll { + Ok(Async::Ready(match self.0.poll_cancel() { + Ok(Async::NotReady) => false, // hasn't hung up. + _ => true, // has hung up. + })) + } + } + + // check whether a sender's hung up (using `wait` to preserve the task invariant) + // returns true if has hung up, false otherwise. + fn check_hangup(send: &mut Sender) -> bool { + CheckHangup(send).wait().expect("CheckHangup always returns ok; qed") + } + + if self.orphaned_requests.read().is_empty() { return } + + let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); + + for orphaned in to_dispatch { + match orphaned { + Pending::HeaderByNumber(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) }, + Pending::HeaderByHash(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) }, + Pending::Block(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_block(ctx, req, sender) }, + Pending::BlockReceipts(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_block_receipts(ctx, req, sender) }, + Pending::Account(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_account(ctx, req, sender) }, + Pending::Code(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_code(ctx, req, sender) }, + } + } } } impl Handler for OnDemand { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); + self.dispatch_orphaned(ctx.as_basic()); } fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { self.peers.write().remove(&ctx.peer()); let ctx = ctx.as_basic(); - for unfulfilled in unfulfilled { - if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { - trace!(target: "on_demand", "Attempting to reassign dropped request"); - match pending { - Pending::HeaderByNumber(req, sender) - => self.dispatch_header_by_number(ctx, req, sender), - Pending::HeaderByHash(req, sender) - => self.dispatch_header_by_hash(ctx, req, sender), - Pending::Block(req, sender) - => self.dispatch_block(ctx, req, sender), - Pending::BlockReceipts(req, sender) - => self.dispatch_block_receipts(ctx, req, sender), - Pending::Account(req, sender) - => self.dispatch_account(ctx, req, sender), - Pending::Code(req, sender) - => self.dispatch_code(ctx, req, sender), + { + let mut orphaned = self.orphaned_requests.write(); + for unfulfilled in unfulfilled { + if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { + trace!(target: "on_demand", "Attempting to reassign dropped request"); + orphaned.push(pending); } } } + + self.dispatch_orphaned(ctx); } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { @@ -418,6 +459,8 @@ impl Handler for OnDemand { peer.status.update_from(&announcement); peer.capabilities.update_from(&announcement); } + + self.dispatch_orphaned(ctx.as_basic()); } fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec)]) { @@ -587,6 +630,10 @@ impl Handler for OnDemand { _ => panic!("Only code request fetches code; qed"), } } + + fn tick(&self, ctx: &BasicContext) { + self.dispatch_orphaned(ctx) + } } #[cfg(test)]