migrate oneshot::complete to send in on_demand

This commit is contained in:
Robert Habermeier 2017-03-16 20:29:06 +01:00
parent cbb9314531
commit 04f106aad8

View File

@ -31,7 +31,7 @@ use futures::{Async, Poll, Future};
use futures::sync::oneshot::{self, Sender, Receiver}; use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId; use network::PeerId;
use rlp::{RlpStream, Stream}; use rlp::{RlpStream, Stream};
use util::{Bytes, DBValue, RwLock, Mutex, U256, H256}; use util::{Bytes, RwLock, Mutex, U256, H256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
@ -52,7 +52,7 @@ impl Peer {
match *pending { match *pending {
Pending::HeaderProof(ref req, _) => Pending::HeaderProof(ref req, _) =>
self.capabilities.serve_headers && self.status.head_num > req.num(), self.capabilities.serve_headers && self.status.head_num > req.num(),
Pending::HeaderByHash(ref req, _) => self.capabilities.serve_headers, Pending::HeaderByHash(_, _) => self.capabilities.serve_headers,
Pending::Block(ref req, _) => Pending::Block(ref req, _) =>
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.header.number()), self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.header.number()),
Pending::BlockReceipts(ref req, _) => Pending::BlockReceipts(ref req, _) =>
@ -156,7 +156,7 @@ impl OnDemand {
}; };
match cached { match cached {
Some(hash) => sender.complete(hash), Some(hash) => sender.send(hash).expect("receiver alive here; qed"),
None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))), None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))),
} }
receiver receiver
@ -172,7 +172,7 @@ impl OnDemand {
}; };
match cached { match cached {
Some(score) => sender.complete(score), Some(score) => sender.send(score).expect("receiver alive here; qed"),
None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))), None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))),
} }
@ -193,7 +193,7 @@ impl OnDemand {
}; };
match cached { match cached {
(Some(hash), Some(score)) => sender.complete((hash, score)), (Some(hash), Some(score)) => sender.send((hash, score)).expect("receiver alive here; qed"),
_ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))), _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))),
} }
@ -206,7 +206,7 @@ impl OnDemand {
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> { pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
match self.cache.lock().block_header(&req.0) { match self.cache.lock().block_header(&req.0) {
Some(hdr) => sender.complete(hdr), Some(hdr) => sender.send(hdr).expect("receiver alive here; qed"),
None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)),
} }
receiver receiver
@ -225,7 +225,7 @@ impl OnDemand {
stream.begin_list(0); stream.begin_list(0);
stream.begin_list(0); stream.begin_list(0);
sender.complete(encoded::Block::new(stream.out())) sender.send(encoded::Block::new(stream.out())).expect("receiver alive here; qed");
} else { } else {
match self.cache.lock().block_body(&req.hash) { match self.cache.lock().block_body(&req.hash) {
Some(body) => { Some(body) => {
@ -233,7 +233,7 @@ impl OnDemand {
stream.append_raw(&req.header.into_inner(), 1); stream.append_raw(&req.header.into_inner(), 1);
stream.append_raw(&body.into_inner(), 2); stream.append_raw(&body.into_inner(), 2);
sender.complete(encoded::Block::new(stream.out())); sender.send(encoded::Block::new(stream.out())).expect("receiver alive here; qed");
} }
None => self.dispatch(ctx, Pending::Block(req, sender)), None => self.dispatch(ctx, Pending::Block(req, sender)),
} }
@ -248,10 +248,10 @@ impl OnDemand {
// fast path for empty receipts. // fast path for empty receipts.
if req.0.receipts_root() == SHA3_NULL_RLP { if req.0.receipts_root() == SHA3_NULL_RLP {
sender.complete(Vec::new()) sender.send(Vec::new()).expect("receiver alive here; qed");
} else { } else {
match self.cache.lock().block_receipts(&req.0.hash()) { match self.cache.lock().block_receipts(&req.0.hash()) {
Some(receipts) => sender.complete(receipts), Some(receipts) => sender.send(receipts).expect("receiver alive here; qed"),
None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)), None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
} }
} }
@ -273,7 +273,7 @@ impl OnDemand {
// fast path for no code. // fast path for no code.
if req.code_hash == ::util::sha3::SHA3_EMPTY { if req.code_hash == ::util::sha3::SHA3_EMPTY {
sender.complete(Vec::new()) sender.send(Vec::new()).expect("receiver alive here; qed")
} else { } else {
self.dispatch(ctx, Pending::Code(req, sender)); self.dispatch(ctx, Pending::Code(req, sender));
} }
@ -430,9 +430,9 @@ impl Handler for OnDemand {
cache.insert_chain_score(hash, score); cache.insert_chain_score(hash, score);
match sender { match sender {
ChtProofSender::Both(sender) => sender.complete((hash, score)), ChtProofSender::Both(sender) => { let _ = sender.send((hash, score)); }
ChtProofSender::Hash(sender) => sender.complete(hash), ChtProofSender::Hash(sender) => { let _ = sender.send(hash); }
ChtProofSender::ChainScore(sender) => sender.complete(score), ChtProofSender::ChainScore(sender) => { let _ = sender.send(score); }
} }
return return
} }
@ -446,7 +446,7 @@ impl Handler for OnDemand {
match req.check_response(header) { match req.check_response(header) {
Ok(header) => { Ok(header) => {
self.cache.lock().insert_block_header(req.0, header.clone()); self.cache.lock().insert_block_header(req.0, header.clone());
sender.complete(header); let _ = sender.send(header);
return return
} }
Err(e) => warn!("Error handling response for header request: {:?}", e), Err(e) => warn!("Error handling response for header request: {:?}", e),
@ -459,7 +459,7 @@ impl Handler for OnDemand {
match req.check_response(&response.body) { match req.check_response(&response.body) {
Ok(block) => { Ok(block) => {
self.cache.lock().insert_block_body(req.hash, response.body.clone()); self.cache.lock().insert_block_body(req.hash, response.body.clone());
sender.complete(block); let _ = sender.send(block);
return return
} }
Err(e) => warn!("Error handling response for block request: {:?}", e), Err(e) => warn!("Error handling response for block request: {:?}", e),
@ -472,7 +472,7 @@ impl Handler for OnDemand {
Ok(receipts) => { Ok(receipts) => {
let hash = req.0.hash(); let hash = req.0.hash();
self.cache.lock().insert_block_receipts(hash, receipts.clone()); self.cache.lock().insert_block_receipts(hash, receipts.clone());
sender.complete(receipts); let _ = sender.send(receipts);
return return
} }
Err(e) => warn!("Error handling response for receipts request: {:?}", e), Err(e) => warn!("Error handling response for receipts request: {:?}", e),
@ -485,7 +485,7 @@ impl Handler for OnDemand {
Ok(maybe_account) => { Ok(maybe_account) => {
// TODO: validate against request outputs. // TODO: validate against request outputs.
// needs engine + env info as part of request. // needs engine + env info as part of request.
sender.complete(maybe_account); let _ = sender.send(maybe_account);
return return
} }
Err(e) => warn!("Error handling response for state request: {:?}", e), Err(e) => warn!("Error handling response for state request: {:?}", e),
@ -496,7 +496,7 @@ impl Handler for OnDemand {
if let NetworkResponse::Code(ref response) = *response { if let NetworkResponse::Code(ref response) = *response {
match req.check_response(response.code.as_slice()) { match req.check_response(response.code.as_slice()) {
Ok(()) => { Ok(()) => {
sender.complete(response.code.clone()); let _ = sender.send(response.code.clone());
return return
} }
Err(e) => warn!("Error handling response for code request: {:?}", e), Err(e) => warn!("Error handling response for code request: {:?}", e),
@ -507,11 +507,11 @@ impl Handler for OnDemand {
if let NetworkResponse::Execution(ref response) = *response { if let NetworkResponse::Execution(ref response) = *response {
match req.check_response(&response.items) { match req.check_response(&response.items) {
ProvedExecution::Complete(executed) => { ProvedExecution::Complete(executed) => {
sender.complete(Ok(executed)); let _ = sender.send(Ok(executed));
return return
} }
ProvedExecution::Failed(err) => { ProvedExecution::Failed(err) => {
sender.complete(Err(err)); let _ = sender.send(Err(err));
return return
} }
ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"), ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"),