From ddf2b944b5dd3f0e9c9d9a69b5d4f7c7a2b92c1c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 28 Dec 2016 21:46:55 +0100 Subject: [PATCH] on demand request dispatch --- ethcore/light/src/on_demand/mod.rs | 144 ++++++++++++++++++++++++++--- 1 file changed, 133 insertions(+), 11 deletions(-) diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 09b477c20..f6efb95a2 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -27,10 +27,9 @@ use futures::{Async, Poll, Future}; use futures::sync::oneshot; use network::PeerId; -use client::Client; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use util::{Address, H256, U256, RwLock}; -use request as les_request; +use request::{self as les_request, Request as LesRequest}; /// Basic account data. // TODO: [rob] unify with similar struct in `snapshot`. @@ -90,7 +89,7 @@ struct Peer { enum Request { HeaderByNumber(u64, H256, Sender), // num + CHT root HeaderByHash(H256, Sender), - Block(encoded::Header, Sender), + Block(encoded::Header, H256, Sender), BlockReceipts(encoded::Header, Sender>), Account(encoded::Header, Address, Sender), Storage(encoded::Header, Address, H256, Sender), @@ -151,7 +150,8 @@ impl OnDemand { /// -- this just doesn't obscure the network query. pub fn block(&self, ctx: &BasicContext, header: encoded::Header) -> Response { let (sender, receiver) = oneshot::channel(); - self.dispatch_request(ctx, Request::Block(header, sender)); + let hash = header.hash(); + self.dispatch_request(ctx, Request::Block(header, hash, sender)); Response(receiver) } @@ -183,7 +183,7 @@ impl OnDemand { match request { Request::HeaderByNumber(num, cht_hash, sender) => { let cht_num = ::client::cht::block_to_cht_number(num); - let req = les_request::Request::HeaderProofs(les_request::HeaderProofs { + let req = LesRequest::HeaderProofs(les_request::HeaderProofs { requests: vec![les_request::HeaderProof { cht_number: cht_num, block_number: num, @@ -202,7 +202,7 @@ impl OnDemand { req_id, Request::HeaderByNumber(num, cht_hash, sender) ); - return; + return }, Err(e) => trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), @@ -210,12 +210,12 @@ impl OnDemand { } } - // TODO: retrying. + // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } Request::HeaderByHash(hash, sender) => { - let req = les_request::Request::Headers(les_request::Headers { + let req = LesRequest::Headers(les_request::Headers { start: hash.into(), max: 1, skip: 0, @@ -241,16 +241,138 @@ impl OnDemand { req_id, Request::HeaderByHash(hash, sender), ); - return; + return } Err(e) => trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), } } - + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); + } + Request::Block(header, hash, sender) => { + let num = header.number(); + let req = LesRequest::Bodies(les_request::Bodies { + block_hashes: vec![hash], + }); + + // we're looking for a peer with serveChainSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Request::Block(header, hash, sender) + ); + return + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); + } + Request::BlockReceipts(header, sender) => { + let num = header.number(); + let req = LesRequest::Receipts(les_request::Receipts { + block_hashes: vec![header.hash()], + }); + + // we're looking for a peer with serveChainSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Request::BlockReceipts(header, sender) + ); + return + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); + } + Request::Account(header, address, sender) => { + let num = header.number(); + let req = LesRequest::StateProofs(les_request::StateProofs { + requests: vec![les_request::StateProof { + block: header.hash(), + key1: ::util::Hashable::sha3(&address), + key2: None, + from_level: 0, + }], + }); + + // we're looking for a peer with serveStateSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Request::Account(header, address, sender) + ); + return + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); + } + Request::Storage(header, address, key, sender) => { + let num = header.number(); + let req = LesRequest::StateProofs(les_request::StateProofs { + requests: vec![les_request::StateProof { + block: header.hash(), + key1: ::util::Hashable::sha3(&address), + key2: Some(::util::Hashable::sha3(&key)), + from_level: 0, + }], + }); + + // we're looking for a peer with serveStateSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Request::Storage(header, address, key, sender) + ); + return + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying + trace!(target: "on_demand", "No suitable peer for request"); sender.complete(Err(Error::NoPeersAvailable)); } - _ => unimplemented!() } } }