separate request dispatch from creation

This commit is contained in:
Robert Habermeier 2017-01-04 13:58:26 +01:00
parent 1d51b6f7e5
commit ca35b345ca

View File

@ -109,6 +109,12 @@ impl OnDemand {
/// Request a header by block number and CHT root hash. /// Request a header by block number and CHT root hash.
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response<encoded::Header> { pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response<encoded::Header> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_number(ctx, req, sender);
Response(receiver)
}
// dispatch the request, completing the request if no peers available.
fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<encoded::Header>) {
let num = req.num; let num = req.num;
let cht_num = ::client::cht::block_to_cht_number(req.num); let cht_num = ::client::cht::block_to_cht_number(req.num);
let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs { let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs {
@ -130,7 +136,7 @@ impl OnDemand {
req_id, req_id,
Pending::HeaderByNumber(req, sender) Pending::HeaderByNumber(req, sender)
); );
return Response(receiver); return
}, },
Err(e) => Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
@ -141,7 +147,6 @@ impl OnDemand {
// TODO: retrying // TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
} }
/// Request a header by hash. This is less accurate than by-number because we don't know /// Request a header by hash. This is less accurate than by-number because we don't know
@ -149,6 +154,11 @@ impl OnDemand {
/// it as easily. /// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response<encoded::Header> { pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response<encoded::Header> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_hash(ctx, req, sender);
Response(receiver)
}
fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender<encoded::Header>) {
let les_req = LesRequest::Headers(les_request::Headers { let les_req = LesRequest::Headers(les_request::Headers {
start: req.0.into(), start: req.0.into(),
max: 1, max: 1,
@ -175,7 +185,7 @@ impl OnDemand {
req_id, req_id,
Pending::HeaderByHash(req, sender), Pending::HeaderByHash(req, sender),
); );
return Response(receiver); return
} }
Err(e) => Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
@ -185,7 +195,6 @@ impl OnDemand {
// TODO: retrying // TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
} }
/// Request a block, given its header. Block bodies are requestable by hash only, /// Request a block, given its header. Block bodies are requestable by hash only,
@ -193,6 +202,11 @@ impl OnDemand {
/// -- this just doesn't obscure the network query. /// -- this just doesn't obscure the network query.
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response<encoded::Block> { pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response<encoded::Block> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_block(ctx, req, sender);
Response(receiver)
}
fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender<encoded::Block>) {
let num = req.header.number(); let num = req.header.number();
let les_req = LesRequest::Bodies(les_request::Bodies { let les_req = LesRequest::Bodies(les_request::Bodies {
block_hashes: vec![req.hash], block_hashes: vec![req.hash],
@ -208,7 +222,7 @@ impl OnDemand {
req_id, req_id,
Pending::Block(req, sender) Pending::Block(req, sender)
); );
return Response(receiver) return
} }
Err(e) => Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
@ -219,13 +233,17 @@ impl OnDemand {
// TODO: retrying // TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
} }
/// Request the receipts for a block. The header serves two purposes: /// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root. /// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response<Vec<Receipt>> { pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response<Vec<Receipt>> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_block_receipts(ctx, req, sender);
Response(receiver)
}
fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender<Vec<Receipt>>) {
let num = req.0.number(); let num = req.0.number();
let les_req = LesRequest::Receipts(les_request::Receipts { let les_req = LesRequest::Receipts(les_request::Receipts {
block_hashes: vec![req.0.hash()], block_hashes: vec![req.0.hash()],
@ -241,7 +259,7 @@ impl OnDemand {
req_id, req_id,
Pending::BlockReceipts(req, sender) Pending::BlockReceipts(req, sender)
); );
return Response(receiver) return
} }
Err(e) => Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
@ -252,13 +270,17 @@ impl OnDemand {
// TODO: retrying // TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
} }
/// Request an account by address and block header -- which gives a hash to query and a state root /// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against. /// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response<Account> { pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response<Account> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_account(ctx, req, sender);
Response(receiver)
}
fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender<Account>) {
let num = req.header.number(); let num = req.header.number();
let les_req = LesRequest::StateProofs(les_request::StateProofs { let les_req = LesRequest::StateProofs(les_request::StateProofs {
requests: vec![les_request::StateProof { requests: vec![les_request::StateProof {
@ -279,7 +301,7 @@ impl OnDemand {
req_id, req_id,
Pending::Account(req, sender) Pending::Account(req, sender)
); );
return Response(receiver) return
} }
Err(e) => Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
@ -290,7 +312,6 @@ impl OnDemand {
// TODO: retrying // TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); sender.complete(Err(Error::NoPeersAvailable));
Response(receiver)
} }
} }
@ -301,11 +322,23 @@ impl Handler for OnDemand {
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
self.peers.write().remove(&ctx.peer()); self.peers.write().remove(&ctx.peer());
let ctx = ctx.as_basic();
for unfulfilled in unfulfilled { for unfulfilled in unfulfilled {
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { if let Some(pending) = self.pending_requests.write().remove(unfulfilled) {
trace!(target: "on_demand", "Attempting to reassign dropped request"); trace!(target: "on_demand", "Attempting to reassign dropped request");
unimplemented!() match pending {
Pending::HeaderByNumber(req, sender)
=> self.dispatch_header_by_number(ctx, req, sender),
Pending::HeaderByHash(req, sender)
=> self.dispatch_header_by_hash(ctx, req, sender),
Pending::Block(req, sender)
=> self.dispatch_block(ctx, req, sender),
Pending::BlockReceipts(req, sender)
=> self.dispatch_block_receipts(ctx, req, sender),
Pending::Account(req, sender)
=> self.dispatch_account(ctx, req, sender),
}
} }
} }
} }