Fixing deprecated methods of tokio_core

This commit is contained in:
Tomasz Drwięga
2017-03-16 15:43:31 +01:00
parent 491eeb9878
commit 579cff478d
12 changed files with 158 additions and 67 deletions

View File

@@ -20,6 +20,7 @@ pub mod errors;
pub mod block_import;
pub mod dispatch;
pub mod informant;
pub mod oneshot;
mod network_settings;
mod poll_manager;

View File

@@ -0,0 +1,67 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use jsonrpc_core::Error;
use futures::{self, Future};
use futures::sync::oneshot;
use v1::helpers::errors;
pub type Res<T> = Result<T, Error>;
pub struct Sender<T> {
sender: oneshot::Sender<Res<T>>,
}
impl<T> Sender<T> {
pub fn send(self, data: Res<T>) {
let res = self.sender.send(data);
if let Err(_) = res {
debug!(target: "rpc", "Responding to a no longer active request.");
}
}
}
pub struct Receiver<T> {
receiver: oneshot::Receiver<Res<T>>,
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let res = self.receiver.poll();
match res {
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Ok(futures::Async::Ready(Ok(res))) => Ok(futures::Async::Ready(res)),
Ok(futures::Async::Ready(Err(err))) => Err(err),
Err(e) => {
debug!(target: "rpc", "Responding to a canceled request: {:?}", e);
Err(errors::internal("Request was canceled by client.", e))
},
}
}
}
pub fn oneshot<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = futures::oneshot();
(Sender {
sender: tx,
}, Receiver {
receiver: rx,
})
}

View File

@@ -22,10 +22,10 @@ use util::{U256, Mutex};
use ethcore::account_provider::AccountProvider;
use futures::{self, future, BoxFuture, Future};
use futures::{future, BoxFuture, Future};
use jsonrpc_core::Error;
use v1::helpers::{
errors,
errors, oneshot,
DefaultAccount,
SigningQueue, ConfirmationPromise, ConfirmationResult, SignerService
};
@@ -167,21 +167,20 @@ impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
meta.origin,
);
let (ready, p) = futures::oneshot();
let (ready, p) = oneshot::oneshot();
// when dispatch is complete
res.then(move |res| {
// register callback via the oneshot sender.
handle_dispatch(res, move |response| {
match response {
Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)),
Err(e) => ready.complete(Err(e)),
e => ready.complete(Err(errors::internal("Unexpected result.", e))),
Ok(RpcConfirmationResponse::Decrypt(data)) => ready.send(Ok(data)),
Err(e) => ready.send(Err(e)),
e => ready.send(Err(errors::internal("Unexpected result.", e))),
}
});
// and wait for that to resolve.
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
p
}).boxed()
}
}
@@ -196,18 +195,18 @@ impl<D: Dispatcher + 'static> EthSigning for SigningQueueClient<D> {
meta.origin,
);
let (ready, p) = futures::oneshot();
let (ready, p) = oneshot::oneshot();
res.then(move |res| {
handle_dispatch(res, move |response| {
ignore_error(match response {
Ok(RpcConfirmationResponse::Signature(sig)) => ready.complete(Ok(sig)),
Err(e) => ready.complete(Err(e)),
e => ready.complete(Err(errors::internal("Unexpected result.", e))),
})
match response {
Ok(RpcConfirmationResponse::Signature(sig)) => ready.send(Ok(sig)),
Err(e) => ready.send(Err(e)),
e => ready.send(Err(errors::internal("Unexpected result.", e))),
}
});
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
p
}).boxed()
}
@@ -218,18 +217,18 @@ impl<D: Dispatcher + 'static> EthSigning for SigningQueueClient<D> {
meta.origin,
);
let (ready, p) = futures::oneshot();
let (ready, p) = oneshot::oneshot();
res.then(move |res| {
handle_dispatch(res, move |response| {
ignore_error(match response {
Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)),
Err(e) => ready.complete(Err(e)),
e => ready.complete(Err(errors::internal("Unexpected result.", e))),
})
match response {
Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.send(Ok(hash)),
Err(e) => ready.send(Err(e)),
e => ready.send(Err(errors::internal("Unexpected result.", e))),
}
});
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
p
}).boxed()
}
@@ -240,18 +239,18 @@ impl<D: Dispatcher + 'static> EthSigning for SigningQueueClient<D> {
meta.origin,
);
let (ready, p) = futures::oneshot();
let (ready, p) = oneshot::oneshot();
res.then(move |res| {
handle_dispatch(res, move |response| {
ignore_error(match response {
Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)),
Err(e) => ready.complete(Err(e)),
e => ready.complete(Err(errors::internal("Unexpected result.", e))),
})
match response {
Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.send(Ok(tx)),
Err(e) => ready.send(Err(e)),
e => ready.send(Err(errors::internal("Unexpected result.", e))),
}
});
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
p
}).boxed()
}
}

View File

@@ -35,7 +35,7 @@ impl Fetch for TestFetch {
let (tx, rx) = futures::oneshot();
thread::spawn(move || {
let cursor = io::Cursor::new(b"Some content");
tx.complete(fetch::Response::from_reader(cursor));
tx.send(fetch::Response::from_reader(cursor)).unwrap();
});
rx.map_err(|_| fetch::Error::Aborted).boxed()