expunge error types from OnDemand

This commit is contained in:
Robert Habermeier 2017-02-07 17:06:22 +01:00
parent 04fe72face
commit 6c06a1a5ec
1 changed files with 28 additions and 60 deletions

View File

@ -25,7 +25,7 @@ use ethcore::encoded;
use ethcore::receipt::Receipt;
use futures::{Async, Poll, Future};
use futures::sync::oneshot;
use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId;
use rlp::{RlpStream, Stream};
use util::{Bytes, RwLock, U256};
@ -36,41 +36,6 @@ use types::les_request::{self as les_request, Request as LesRequest};
pub mod request;
/// Errors which can occur while trying to fulfill a request.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Error {
/// Request was canceled.
Canceled,
/// No suitable peers available to serve the request.
NoPeersAvailable,
/// Request timed out.
TimedOut,
}
impl From<oneshot::Canceled> for Error {
fn from(_: oneshot::Canceled) -> Self {
Error::Canceled
}
}
/// Future which awaits a response to an on-demand request.
pub struct Response<T>(oneshot::Receiver<Result<T, Error>>);
impl<T> Future for Response<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<T, Error> {
match self.0.poll().map_err(Into::into) {
Ok(Async::Ready(val)) => val.map(Async::Ready),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}
type Sender<T> = oneshot::Sender<Result<T, Error>>;
// relevant peer info.
struct Peer {
status: Status,
@ -109,10 +74,10 @@ impl Default for OnDemand {
impl OnDemand {
/// Request a header by block number and CHT root hash.
/// Returns the header and the total difficulty.
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response<(encoded::Header, U256)> {
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> {
let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_number(ctx, req, sender);
Response(receiver)
receiver
}
// dispatch the request, completing the request if no peers available.
@ -156,10 +121,10 @@ impl OnDemand {
/// Request a header by hash. This is less accurate than by-number because we don't know
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have
/// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response<encoded::Header> {
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_hash(ctx, req, sender);
Response(receiver)
receiver
}
fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender<encoded::Header>) {
@ -204,7 +169,7 @@ impl OnDemand {
/// Request a block, given its header. Block bodies are requestable by hash only,
/// and the header is required anyway to verify and complete the block body
/// -- this just doesn't obscure the network query.
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response<encoded::Block> {
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver<encoded::Block> {
let (sender, receiver) = oneshot::channel();
// fast path for empty body.
@ -214,11 +179,11 @@ impl OnDemand {
stream.begin_list(0);
stream.begin_list(0);
sender.complete(Ok(encoded::Block::new(stream.out())))
sender.complete(encoded::Block::new(stream.out()))
} else {
self.dispatch_block(ctx, req, sender);
}
Response(receiver)
receiver
}
fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender<encoded::Block>) {
@ -252,17 +217,17 @@ impl OnDemand {
/// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response<Vec<Receipt>> {
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> {
let (sender, receiver) = oneshot::channel();
// fast path for empty receipts.
if req.0.receipts_root() == SHA3_NULL_RLP {
sender.complete(Ok(Vec::new()))
sender.complete(Vec::new())
} else {
self.dispatch_block_receipts(ctx, req, sender);
}
Response(receiver)
receiver
}
fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender<Vec<Receipt>>) {
@ -296,10 +261,10 @@ impl OnDemand {
/// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response<BasicAccount> {
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
let (sender, receiver) = oneshot::channel();
self.dispatch_account(ctx, req, sender);
Response(receiver)
receiver
}
fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender<BasicAccount>) {
@ -337,17 +302,17 @@ impl OnDemand {
}
/// Request code by address, known code hash, and block header.
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Response<Bytes> {
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> {
let (sender, receiver) = oneshot::channel();
// fast path for no code.
if req.code_hash == ::util::sha3::SHA3_EMPTY {
sender.complete(Ok(Vec::new()))
sender.complete(Vec::new())
} else {
self.dispatch_code(ctx, req, sender);
}
Response(receiver)
receiver
}
fn dispatch_code(&self, ctx: &BasicContext, req: request::Code, sender: Sender<Bytes>) {
@ -475,7 +440,7 @@ impl Handler for OnDemand {
if let Some(&(ref header, ref proof)) = proofs.get(0) {
match req.check_response(header, proof) {
Ok(header) => {
sender.complete(Ok(header));
sender.complete(header);
return
}
Err(e) => {
@ -503,7 +468,7 @@ impl Handler for OnDemand {
if let Some(ref header) = headers.get(0) {
match req.check_response(header) {
Ok(header) => {
sender.complete(Ok(header));
sender.complete(header);
return
}
Err(e) => {
@ -531,7 +496,7 @@ impl Handler for OnDemand {
if let Some(ref block) = bodies.get(0) {
match req.check_response(block) {
Ok(block) => {
sender.complete(Ok(block));
sender.complete(block);
return
}
Err(e) => {
@ -559,7 +524,7 @@ impl Handler for OnDemand {
if let Some(ref receipts) = receipts.get(0) {
match req.check_response(receipts) {
Ok(receipts) => {
sender.complete(Ok(receipts));
sender.complete(receipts);
return
}
Err(e) => {
@ -587,7 +552,7 @@ impl Handler for OnDemand {
if let Some(ref proof) = proofs.get(0) {
match req.check_response(proof) {
Ok(proof) => {
sender.complete(Ok(proof));
sender.complete(proof);
return
}
Err(e) => {
@ -615,7 +580,7 @@ impl Handler for OnDemand {
if let Some(code) = codes.get(0) {
match req.check_response(code.as_slice()) {
Ok(()) => {
sender.complete(Ok(code.clone()));
sender.complete(code.clone());
return
}
Err(e) => {
@ -642,7 +607,6 @@ mod tests {
use net::{Announcement, BasicContext, ReqId, Error as LesError};
use request::{Request as LesRequest, Kind as LesRequestKind};
use network::{PeerId, NodeId};
use futures::Future;
use util::H256;
struct FakeContext;
@ -659,10 +623,14 @@ mod tests {
}
#[test]
fn no_peers() {
fn detects_hangup() {
let on_demand = OnDemand::default();
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));
assert_eq!(result.wait().unwrap_err(), Error::NoPeersAvailable);
assert!(on_demand.orphaned_requests.read().len() == 1);
drop(result);
on_demand.dispatch_orphaned(&FakeContext);
assert!(on_demand.orphaned_requests.read().is_empty());
}
}