From 4daa64578948db6cff9d79c71586174ddfe01617 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 28 Dec 2016 16:20:46 +0100 Subject: [PATCH] dispatch header requests --- Cargo.lock | 1 + ethcore/light/Cargo.toml | 1 + ethcore/light/src/lib.rs | 1 + ethcore/light/src/on_demand/mod.rs | 171 +++++++++++++++++++++++++---- 4 files changed, 153 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27d737568..e9f54c918 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -522,6 +522,7 @@ dependencies = [ "ethcore-util 1.5.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.1.0", "smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 20f5f93bd..e0a1b581f 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -21,6 +21,7 @@ rlp = { path = "../../util/rlp" } time = "0.1" smallvec = "0.3.1" futures = "0.1" +rand = "0.3" [features] default = [] diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 60984d9c2..0e650f992 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -66,6 +66,7 @@ extern crate rlp; extern crate smallvec; extern crate time; extern crate futures; +extern crate rand; #[cfg(feature = "ipc")] extern crate ethcore_ipc as ipc; diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index f932dc024..09b477c20 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -20,22 +20,65 @@ use std::collections::HashMap; -use ethcore::ids::BlockId; -use ethcore::block::Block; use ethcore::encoded; -use ethcore::header::Header; use ethcore::receipt::Receipt; -use futures::Future; -use futures::sync::oneshot::{self, Sender, Receiver}; +use futures::{Async, Poll, Future}; +use futures::sync::oneshot; use network::PeerId; use client::Client; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; -use util::{Address, H256, RwLock}; +use util::{Address, H256, U256, RwLock}; +use request as les_request; -/// Dummy type for "basic account" -pub struct Account; +/// Basic account data. +// TODO: [rob] unify with similar struct in `snapshot`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Account { + /// Balance in Wei + pub balance: U256, + /// Storage trie root. + pub storage_root: H256, + /// Code hash + pub code_hash: H256, + /// Nonce + pub nonce: U256, +} + +/// Errors which can occur while trying to fulfill a request. +pub enum Error { + /// Request was canceled. + Canceled, + /// No suitable peers available to serve the request. + NoPeersAvailable, + /// Request timed out. + TimedOut, +} + +impl From for Error { + fn from(_: oneshot::Canceled) -> Self { + Error::Canceled + } +} + +/// Future which awaits a response to an on-demand request. +pub struct Response(oneshot::Receiver>); + +impl Future for Response { + type Item = T; + type Error = Error; + + fn poll(&mut self) -> Poll { + match self.0.poll().map_err(Into::into) { + Ok(Async::Ready(val)) => val.map(Async::Ready), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Err(e), + } + } +} + +type Sender = oneshot::Sender>; // relevant peer info. struct Peer { @@ -68,60 +111,146 @@ impl Handler for OnDemand { fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { self.peers.write().remove(&ctx.peer()); + + for unfulfilled in unfulfilled { + if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { + trace!(target: "on_demand", "Attempting to reassign dropped request"); + self.dispatch_request(ctx.as_basic(), pending); + } + } + } + + fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) { + peer.status.update_from(&announcement); + peer.capabilities.update_from(&announcement); + } } } impl OnDemand { /// Request a header by block number and CHT root hash. - pub fn header_by_number(&self, ctx: &BasicContext, num: u64, cht_root: H256) -> Receiver { + pub fn header_by_number(&self, ctx: &BasicContext, num: u64, cht_root: H256) -> Response { let (sender, receiver) = oneshot::channel(); self.dispatch_request(ctx, Request::HeaderByNumber(num, cht_root, sender)); - receiver + Response(receiver) } /// Request a header by hash. This is less accurate than by-number because we don't know /// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// it as easily. - pub fn header_by_hash(&self, ctx: &BasicContext, hash: H256) -> Receiver { + pub fn header_by_hash(&self, ctx: &BasicContext, hash: H256) -> Response { let (sender, receiver) = oneshot::channel(); self.dispatch_request(ctx, Request::HeaderByHash(hash, sender)); - receiver + Response(receiver) } /// Request a block, given its header. Block bodies are requestable by hash only, /// and the header is required anyway to verify and complete the block body /// -- this just doesn't obscure the network query. - pub fn block(&self, ctx: &BasicContext, header: encoded::Header) -> Receiver { + pub fn block(&self, ctx: &BasicContext, header: encoded::Header) -> Response { let (sender, receiver) = oneshot::channel(); self.dispatch_request(ctx, Request::Block(header, sender)); - receiver + Response(receiver) } /// Request the receipts for a block. The header serves two purposes: /// provide the block hash to fetch receipts for, and for verification of the receipts root. - pub fn block_receipts(&self, ctx: &BasicContext, header: encoded::Header) -> Receiver> { + pub fn block_receipts(&self, ctx: &BasicContext, header: encoded::Header) -> Response> { let (sender, receiver) = oneshot::channel(); self.dispatch_request(ctx, Request::BlockReceipts(header, sender)); - receiver + Response(receiver) } /// Request an account by address and block header -- which gives a hash to query and a state root /// to verify against. - pub fn account(&self, ctx: &BasicContext, header: encoded::Header, address: Address) -> Receiver { + pub fn account(&self, ctx: &BasicContext, header: encoded::Header, address: Address) -> Response { let (sender, receiver) = oneshot::channel(); self.dispatch_request(ctx, Request::Account(header, address, sender)); - receiver + Response(receiver) } /// Request account storage value by block header, address, and key. - pub fn storage(&self, ctx: &BasicContext, header: encoded::Header, address: Address, key: H256) -> Receiver { + pub fn storage(&self, ctx: &BasicContext, header: encoded::Header, address: Address, key: H256) -> Response { let (sender, receiver) = oneshot::channel(); self.dispatch_request(ctx, Request::Storage(header, address, key, sender)); - receiver + Response(receiver) } // dispatch a request to a suitable peer. fn dispatch_request(&self, ctx: &BasicContext, request: Request) { - unimplemented!() + match request { + Request::HeaderByNumber(num, cht_hash, sender) => { + let cht_num = ::client::cht::block_to_cht_number(num); + let req = les_request::Request::HeaderProofs(les_request::HeaderProofs { + requests: vec![les_request::HeaderProof { + cht_number: cht_num, + block_number: num, + from_level: 0, + }], + }); + + // we're looking for a peer with serveHeaders who's far enough along in the + // chain. + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_headers && peer.status.head_num >= num { + match ctx.request_from(*id, req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Request::HeaderByNumber(num, cht_hash, sender) + ); + return; + }, + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + // TODO: retrying. + trace!(target: "on_demand", "No suitable peer for request"); + sender.complete(Err(Error::NoPeersAvailable)); + } + Request::HeaderByHash(hash, sender) => { + let req = les_request::Request::Headers(les_request::Headers { + start: hash.into(), + max: 1, + skip: 0, + reverse: false, + }); + + // all we've got is a hash, so we'll just guess at peers who might have + // it randomly. + let mut potential_peers = self.peers.read().iter() + .filter(|&(_, peer)| peer.capabilities.serve_headers) + .map(|(id, _)| *id) + .collect::>(); + + let mut rng = ::rand::thread_rng(); + + ::rand::Rng::shuffle(&mut rng, &mut potential_peers); + + for id in potential_peers { + match ctx.request_from(id, req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + Request::HeaderByHash(hash, sender), + ); + return; + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + + sender.complete(Err(Error::NoPeersAvailable)); + } + _ => unimplemented!() + } } }