remove check_hangup hack now that is_canceled exists (#7620)
This commit is contained in:
parent
e5c131e0c1
commit
2c426defd9
@ -24,8 +24,8 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use ethcore::executed::{Executed, ExecutionError};
|
use ethcore::executed::{Executed, ExecutionError};
|
||||||
|
|
||||||
use futures::{Async, Poll, Future};
|
use futures::{Poll, Future};
|
||||||
use futures::sync::oneshot::{self, Sender, Receiver, Canceled};
|
use futures::sync::oneshot::{self, Receiver, Canceled};
|
||||||
use network::PeerId;
|
use network::PeerId;
|
||||||
use parking_lot::{RwLock, Mutex};
|
use parking_lot::{RwLock, Mutex};
|
||||||
|
|
||||||
@ -352,29 +352,6 @@ impl OnDemand {
|
|||||||
// dispatch pending requests, and discard those for which the corresponding
|
// dispatch pending requests, and discard those for which the corresponding
|
||||||
// receiver has been dropped.
|
// receiver has been dropped.
|
||||||
fn dispatch_pending(&self, ctx: &BasicContext) {
|
fn dispatch_pending(&self, ctx: &BasicContext) {
|
||||||
|
|
||||||
// wrapper future for calling `poll_cancel` on our `Senders` to preserve
|
|
||||||
// the invariant that it's always within a task.
|
|
||||||
struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>);
|
|
||||||
|
|
||||||
impl<'a, T: 'a> Future for CheckHangup<'a, T> {
|
|
||||||
type Item = bool;
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<bool, ()> {
|
|
||||||
Ok(Async::Ready(match self.0.poll_cancel() {
|
|
||||||
Ok(Async::NotReady) => false, // hasn't hung up.
|
|
||||||
_ => true, // has hung up.
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check whether a sender's hung up (using `wait` to preserve the task invariant)
|
|
||||||
// returns true if has hung up, false otherwise.
|
|
||||||
fn check_hangup<T>(send: &mut Sender<T>) -> bool {
|
|
||||||
CheckHangup(send).wait().expect("CheckHangup always returns ok; qed")
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.pending.read().is_empty() { return }
|
if self.pending.read().is_empty() { return }
|
||||||
let mut pending = self.pending.write();
|
let mut pending = self.pending.write();
|
||||||
|
|
||||||
@ -384,10 +361,7 @@ impl OnDemand {
|
|||||||
// then, try and find a peer who can serve it.
|
// then, try and find a peer who can serve it.
|
||||||
let peers = self.peers.read();
|
let peers = self.peers.read();
|
||||||
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
|
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
|
||||||
.filter_map(|mut pending| match check_hangup(&mut pending.sender) {
|
.filter(|pending| !pending.sender.is_canceled())
|
||||||
false => Some(pending),
|
|
||||||
true => None,
|
|
||||||
})
|
|
||||||
.filter_map(|pending| {
|
.filter_map(|pending| {
|
||||||
for (peer_id, peer) in peers.iter() { // .shuffle?
|
for (peer_id, peer) in peers.iter() { // .shuffle?
|
||||||
// TODO: see which requests can be answered by the cache?
|
// TODO: see which requests can be answered by the cache?
|
||||||
|
Loading…
Reference in New Issue
Block a user