From 386cdb830dbe0d98e0a1c0db8304a0c18ad8d182 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 23 May 2017 06:39:25 -0400 Subject: [PATCH] Back-references for the on-demand service (#5573) * header back-references for on demand * initial back-reference implementation for on demand requests * answer requests from cache * answer requests from cache, add tests * strongly typed responses for vectors of homogeneous requests * fix fallout in RPC without optimizing --- ethcore/light/src/on_demand/mod.rs | 371 ++++++++----------- ethcore/light/src/on_demand/request.rs | 391 ++++++++++++++++----- ethcore/light/src/on_demand/tests.rs | 120 ++++++- ethcore/light/src/types/request/builder.rs | 53 ++- ethcore/light/src/types/request/mod.rs | 29 +- parity/light_helpers/queue_cull.rs | 36 +- rpc/src/v1/helpers/dispatch.rs | 35 +- rpc/src/v1/helpers/light_fetch.rs | 144 +++++--- rpc/src/v1/impls/light/eth.rs | 25 +- 9 files changed, 778 insertions(+), 426 deletions(-) diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index e61c126d6..435c72cf7 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -22,24 +22,19 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; -use ethcore::basic_account::BasicAccount; -use ethcore::encoded; -use ethcore::receipt::Receipt; use ethcore::executed::{Executed, ExecutionError}; -use futures::{future, Async, Poll, Future, BoxFuture}; +use futures::{Async, Poll, Future}; use futures::sync::oneshot::{self, Sender, Receiver, Canceled}; use network::PeerId; -use rlp::RlpStream; -use util::{Bytes, RwLock, Mutex, U256, H256}; -use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP}; +use util::{RwLock, Mutex}; use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use cache::Cache; use request::{self as basic_request, Request as NetworkRequest}; use self::request::CheckedRequest; -pub use self::request::{Request, Response}; +pub use self::request::{Request, Response, HeaderRef}; #[cfg(test)] mod tests; @@ -75,6 +70,98 @@ struct Pending { sender: oneshot::Sender>, } +impl Pending { + // answer as many of the given requests from the supplied cache as possible. + // TODO: support re-shuffling. + fn answer_from_cache(&mut self, cache: &Mutex) { + while !self.requests.is_complete() { + let idx = self.requests.num_answered(); + match self.requests[idx].respond_local(cache) { + Some(response) => { + self.requests.supply_response_unchecked(&response); + self.update_header_refs(idx, &response); + self.responses.push(response); + } + None => break, + } + } + } + + // update header refs if the given response contains a header future requests require for + // verification. + // `idx` is the index of the request the response corresponds to. + fn update_header_refs(&mut self, idx: usize, response: &Response) { + match *response { + Response::HeaderByHash(ref hdr) => { + // fill the header for all requests waiting on this one. + // TODO: could be faster if we stored a map usize => Vec + // but typical use just has one header request that others + // depend on. + for r in self.requests.iter_mut().skip(idx + 1) { + if r.needs_header().map_or(false, |(i, _)| i == idx) { + r.provide_header(hdr.clone()) + } + } + } + _ => {}, // no other responses produce headers. + } + } + + // supply a response. + fn supply_response(&mut self, cache: &Mutex, response: &basic_request::Response) + -> Result<(), basic_request::ResponseError> + { + match self.requests.supply_response(&cache, response) { + Ok(response) => { + let idx = self.responses.len(); + self.update_header_refs(idx, &response); + self.responses.push(response); + Ok(()) + } + Err(e) => Err(e), + } + } + + // if the requests are complete, send the result and consume self. + fn try_complete(self) -> Option { + if self.requests.is_complete() { + let _ = self.sender.send(self.responses); + None + } else { + Some(self) + } + } + + fn fill_unanswered(&mut self) { + self.requests.fill_unanswered(); + } + + // update the cached network requests. + fn update_net_requests(&mut self) { + use request::IncompleteRequest; + + let mut builder = basic_request::RequestBuilder::default(); + let num_answered = self.requests.num_answered(); + let mut mapping = move |idx| idx - num_answered; + + for request in self.requests.iter().skip(num_answered) { + let mut net_req = request.clone().into_net_request(); + + // all back-references with request index less than `num_answered` have + // been filled by now. all remaining requests point to nothing earlier + // than the next unanswered request. + net_req.adjust_refs(&mut mapping); + builder.push(net_req) + .expect("all back-references to answered requests have been filled; qed"); + } + + // update pending fields. + let capabilities = guess_capabilities(&self.requests[num_answered..]); + self.net_requests = builder.build(); + self.required_capabilities = capabilities; + } +} + // helper to guess capabilities required for a given batch of network requests. fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities { let mut caps = Capabilities { @@ -97,16 +184,21 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities { caps.serve_headers = true, CheckedRequest::HeaderByHash(_, _) => caps.serve_headers = true, - CheckedRequest::Body(ref req, _) => - update_since(&mut caps.serve_chain_since, req.header.number()), - CheckedRequest::Receipts(ref req, _) => - update_since(&mut caps.serve_chain_since, req.0.number()), - CheckedRequest::Account(ref req, _) => - update_since(&mut caps.serve_state_since, req.header.number()), - CheckedRequest::Code(ref req, _) => - update_since(&mut caps.serve_state_since, req.block_id.1), - CheckedRequest::Execution(ref req, _) => - update_since(&mut caps.serve_state_since, req.header.number()), + CheckedRequest::Body(ref req, _) => if let Ok(ref hdr) = req.0.as_ref() { + update_since(&mut caps.serve_chain_since, hdr.number()); + }, + CheckedRequest::Receipts(ref req, _) => if let Ok(ref hdr) = req.0.as_ref() { + update_since(&mut caps.serve_chain_since, hdr.number()); + }, + CheckedRequest::Account(ref req, _) => if let Ok(ref hdr) = req.header.as_ref() { + update_since(&mut caps.serve_state_since, hdr.number()); + }, + CheckedRequest::Code(ref req, _) => if let Ok(ref hdr) = req.header.as_ref() { + update_since(&mut caps.serve_state_since, hdr.number()); + }, + CheckedRequest::Execution(ref req, _) => if let Ok(ref hdr) = req.header.as_ref() { + update_since(&mut caps.serve_state_since, hdr.number()); + }, } } @@ -163,158 +255,6 @@ impl OnDemand { me } - /// Request a header's hash by block number and CHT root hash. - /// Returns the hash. - pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture { - let cached = { - let mut cache = self.cache.lock(); - cache.block_hash(&req.num()) - }; - - match cached { - Some(hash) => future::ok(hash).boxed(), - None => { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .map(|(h, _)| h) - .boxed() - }, - } - } - - /// Request a canonical block's chain score. - /// Returns the chain score. - pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture { - let cached = { - let mut cache = self.cache.lock(); - cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash)) - }; - - match cached { - Some(score) => future::ok(score).boxed(), - None => { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .map(|(_, s)| s) - .boxed() - }, - } - } - - /// Request a canonical block's hash and chain score by number. - /// Returns the hash and chain score. - pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<(H256, U256), Canceled> { - let cached = { - let mut cache = self.cache.lock(); - let hash = cache.block_hash(&req.num()); - ( - hash.clone(), - hash.and_then(|hash| cache.chain_score(&hash)), - ) - }; - - match cached { - (Some(hash), Some(score)) => future::ok((hash, score)).boxed(), - _ => { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - }, - } - } - - /// Request a header by hash. This is less accurate than by-number because we don't know - /// where in the chain this header lies, and therefore can't find a peer who is supposed to have - /// it as easily. - pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> BoxFuture { - match { self.cache.lock().block_header(&req.0) } { - Some(hdr) => future::ok(hdr).boxed(), - None => { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - }, - } - } - - /// Request a block, given its header. Block bodies are requestable by hash only, - /// and the header is required anyway to verify and complete the block body - /// -- this just doesn't obscure the network query. - pub fn block(&self, ctx: &BasicContext, req: request::Body) -> BoxFuture { - // fast path for empty body. - if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP { - let mut stream = RlpStream::new_list(3); - stream.append_raw(&req.header.into_inner(), 1); - stream.begin_list(0); - stream.begin_list(0); - - future::ok(encoded::Block::new(stream.out())).boxed() - } else { - match { self.cache.lock().block_body(&req.hash) } { - Some(body) => { - let mut stream = RlpStream::new_list(3); - let body = body.rlp(); - stream.append_raw(&req.header.into_inner(), 1); - stream.append_raw(&body.at(0).as_raw(), 1); - stream.append_raw(&body.at(1).as_raw(), 1); - - future::ok(encoded::Block::new(stream.out())).boxed() - } - None => { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - } - } - } - } - - /// Request the receipts for a block. The header serves two purposes: - /// provide the block hash to fetch receipts for, and for verification of the receipts root. - pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> BoxFuture, Canceled> { - // fast path for empty receipts. - if req.0.receipts_root() == SHA3_NULL_RLP { - return future::ok(Vec::new()).boxed() - } - - match { self.cache.lock().block_receipts(&req.0.hash()) } { - Some(receipts) => future::ok(receipts).boxed(), - None => { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - }, - } - } - - /// Request an account by address and block header -- which gives a hash to query and a state root - /// to verify against. - /// `None` here means that no account by the queried key exists in the queried state. - pub fn account(&self, ctx: &BasicContext, req: request::Account) -> BoxFuture, Canceled> { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - } - - /// Request code by address, known code hash, and block header. - pub fn code(&self, ctx: &BasicContext, req: request::Code) -> BoxFuture { - // fast path for no code. - if req.code_hash == SHA3_EMPTY { - future::ok(Vec::new()).boxed() - } else { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - } - } - - /// Request proof-of-execution for a transaction. - pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> BoxFuture { - self.request(ctx, req) - .expect("request given fully fleshed out; qed") - .boxed() - } - /// Submit a vector of requests to be processed together. /// /// Fails if back-references are not coherent. @@ -332,15 +272,33 @@ impl OnDemand { let mut builder = basic_request::RequestBuilder::default(); let responses = Vec::with_capacity(requests.len()); - for request in requests { - builder.push(CheckedRequest::from(request))?; + + let mut header_producers = HashMap::new(); + for (i, request) in requests.into_iter().enumerate() { + let request = CheckedRequest::from(request); + + // ensure that all requests needing headers will get them. + if let Some((idx, field)) = request.needs_header() { + // a request chain with a header back-reference is valid only if it both + // points to a request that returns a header and has the same back-reference + // for the block hash. + match header_producers.get(&idx) { + Some(ref f) if &field == *f => {} + _ => return Err(basic_request::NoSuchOutput), + } + } + if let CheckedRequest::HeaderByHash(ref req, _) = request { + header_producers.insert(i, req.0.clone()); + } + + builder.push(request)?; } let requests = builder.build(); let net_requests = requests.clone().map_requests(|req| req.into_net_request()); let capabilities = guess_capabilities(requests.requests()); - self.pending.write().push(Pending { + self.submit_pending(ctx, Pending { requests: requests, net_requests: net_requests, required_capabilities: capabilities, @@ -348,8 +306,6 @@ impl OnDemand { sender: sender, }); - self.attempt_dispatch(ctx); - Ok(receiver) } @@ -430,6 +386,19 @@ impl OnDemand { }) .collect(); // `pending` now contains all requests we couldn't dispatch. } + + // submit a pending request set. attempts to answer from cache before + // going to the network. if complete, sends response and consumes the struct. + fn submit_pending(&self, ctx: &BasicContext, mut pending: Pending) { + // answer as many requests from cache as we can, and schedule for dispatch + // if incomplete. + pending.answer_from_cache(&*self.cache); + if let Some(mut pending) = pending.try_complete() { + pending.update_net_requests(); + self.pending.write().push(pending); + self.attempt_dispatch(ctx); + } + } } impl Handler for OnDemand { @@ -468,63 +437,27 @@ impl Handler for OnDemand { } fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { - use request::IncompleteRequest; - let mut pending = match self.in_transit.write().remove(&req_id) { Some(req) => req, None => return, }; // for each incoming response - // 1. ensure verification data filled. (still TODO since on_demand doesn't use back-references yet) + // 1. ensure verification data filled. // 2. pending.requests.supply_response // 3. if extracted on-demand response, keep it for later. for response in responses { - match pending.requests.supply_response(&*self.cache, response) { - Ok(response) => { - pending.responses.push(response) - } - Err(e) => { - let peer = ctx.peer(); - debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); - ctx.disable_peer(peer); + if let Err(e) = pending.supply_response(&*self.cache, response) { + let peer = ctx.peer(); + debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); + ctx.disable_peer(peer); - break; - } + break; } } - pending.requests.fill_unanswered(); - if pending.requests.is_complete() { - let _ = pending.sender.send(pending.responses); - - return; - } - - - // update network requests (unless we're done, in which case fulfill the future.) - let mut builder = basic_request::RequestBuilder::default(); - let num_answered = pending.requests.num_answered(); - let mut mapping = move |idx| idx - num_answered; - - for request in pending.requests.requests().iter().skip(num_answered) { - let mut net_req = request.clone().into_net_request(); - - // all back-references with request index less than `num_answered` have - // been filled by now. all remaining requests point to nothing earlier - // than the next unanswered request. - net_req.adjust_refs(&mut mapping); - builder.push(net_req) - .expect("all back-references to answered requests have been filled; qed"); - } - - // update pending fields and re-queue. - let capabilities = guess_capabilities(&pending.requests.requests()[num_answered..]); - pending.net_requests = builder.build(); - pending.required_capabilities = capabilities; - - self.pending.write().push(pending); - self.attempt_dispatch(ctx.as_basic()); + pending.fill_unanswered(); + self.submit_pending(ctx.as_basic(), pending); } fn tick(&self, ctx: &BasicContext) { diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index c9a5c4d9b..3ad9a28d5 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -26,12 +26,12 @@ use ethcore::receipt::Receipt; use ethcore::state::{self, ProvedExecution}; use ethcore::transaction::SignedTransaction; -use request::{self as net_request, IncompleteRequest, Output, OutputKind}; +use request::{self as net_request, IncompleteRequest, CompleteRequest, Output, OutputKind, Field}; use rlp::{RlpStream, UntrustedRlp}; use util::{Address, Bytes, DBValue, HashDB, Mutex, H256, U256}; use util::memorydb::MemoryDB; -use util::sha3::Hashable; +use util::sha3::{Hashable, SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP}; use util::trie::{Trie, TrieDB, TrieError}; const SUPPLIED_MATCHES: &'static str = "supplied responses always match produced requests; enforced by `check_response`; qed"; @@ -87,6 +87,18 @@ pub trait RequestAdapter { fn extract_from(Vec) -> Self::Out; } +impl RequestAdapter for Vec { + type Out = Vec; + + fn make_requests(self) -> Vec { + self.into_iter().map(RequestArg::make).collect() + } + + fn extract_from(r: Vec) -> Self::Out { + r.into_iter().map(T::extract).collect() + } +} + // helper to implement `RequestArg` and `From` for a single request kind. macro_rules! impl_single { ($variant: ident, $me: ty, $out: ty) => { @@ -173,6 +185,50 @@ mod impls { impl_args!(A, B, C, D, E, F, G, H, I, J, K, L,); } +/// A block header to be used for verification. +/// May be stored or an unresolved output of a prior request. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum HeaderRef { + /// A stored header. + Stored(encoded::Header), + /// An unresolved header. The first item here is the index of the request which + /// will return the header. The second is a back-reference pointing to a block hash + /// which can be used to make requests until that header is resolved. + Unresolved(usize, Field), +} + +impl HeaderRef { + /// Attempt to inspect the header. + pub fn as_ref(&self) -> Result<&encoded::Header, Error> { + match *self { + HeaderRef::Stored(ref hdr) => Ok(hdr), + HeaderRef::Unresolved(idx, _) => Err(Error::UnresolvedHeader(idx)), + } + } + + // get the blockhash field to be used in requests. + fn field(&self) -> Field { + match *self { + HeaderRef::Stored(ref hdr) => Field::Scalar(hdr.hash()), + HeaderRef::Unresolved(_, ref field) => field.clone(), + } + } + + // yield the index of the request which will produce the header. + fn needs_header(&self) -> Option<(usize, Field)> { + match *self { + HeaderRef::Stored(_) => None, + HeaderRef::Unresolved(idx, ref field) => Some((idx, field.clone())), + } + } +} + +impl From for HeaderRef { + fn from(header: encoded::Header) -> Self { + HeaderRef::Stored(header) + } +} + /// Requests coupled with their required data for verification. /// This is used internally but not part of the public API. #[derive(Clone)] @@ -192,7 +248,7 @@ impl From for CheckedRequest { match req { Request::HeaderByHash(req) => { let net_req = net_request::IncompleteHeadersRequest { - start: net_request::HashOrNumber::Hash(req.0).into(), + start: req.0.map(Into::into), skip: 0, max: 1, reverse: false, @@ -207,33 +263,33 @@ impl From for CheckedRequest { } Request::Body(req) => { let net_req = net_request::IncompleteBodyRequest { - hash: req.hash.into(), + hash: req.0.field(), }; CheckedRequest::Body(req, net_req) } Request::Receipts(req) => { let net_req = net_request::IncompleteReceiptsRequest { - hash: req.0.hash().into(), + hash: req.0.field(), }; CheckedRequest::Receipts(req, net_req) } - Request::Account(req) => { + Request::Account(req) => { let net_req = net_request::IncompleteAccountRequest { - block_hash: req.header.hash().into(), + block_hash: req.header.field(), address_hash: ::util::Hashable::sha3(&req.address).into(), }; CheckedRequest::Account(req, net_req) } Request::Code(req) => { let net_req = net_request::IncompleteCodeRequest { - block_hash: req.block_id.0.into(), + block_hash: req.header.field(), code_hash: req.code_hash.into(), }; CheckedRequest::Code(req, net_req) } Request::Execution(req) => { let net_req = net_request::IncompleteExecutionRequest { - block_hash: req.header.hash().into(), + block_hash: req.header.field(), from: req.tx.sender(), gas: req.tx.gas, gas_price: req.tx.gas_price, @@ -262,6 +318,119 @@ impl CheckedRequest { CheckedRequest::Execution(_, req) => NetRequest::Execution(req), } } + + /// Whether this needs a header from a prior request. + /// Returns `Some` with the index of the request returning the header + /// and the field giving the hash + /// if so, `None` otherwise. + pub fn needs_header(&self) -> Option<(usize, Field)> { + match *self { + CheckedRequest::Receipts(ref x, _) => x.0.needs_header(), + CheckedRequest::Body(ref x, _) => x.0.needs_header(), + CheckedRequest::Account(ref x, _) => x.header.needs_header(), + CheckedRequest::Code(ref x, _) => x.header.needs_header(), + CheckedRequest::Execution(ref x, _) => x.header.needs_header(), + _ => None, + } + } + + /// Provide a header where one was needed. Should only be called if `needs_header` + /// returns `Some`, and for correctness, only use the header yielded by the correct + /// request. + pub fn provide_header(&mut self, header: encoded::Header) { + match *self { + CheckedRequest::Receipts(ref mut x, _) => x.0 = HeaderRef::Stored(header), + CheckedRequest::Body(ref mut x, _) => x.0 = HeaderRef::Stored(header), + CheckedRequest::Account(ref mut x, _) => x.header = HeaderRef::Stored(header), + CheckedRequest::Code(ref mut x, _) => x.header = HeaderRef::Stored(header), + CheckedRequest::Execution(ref mut x, _) => x.header = HeaderRef::Stored(header), + _ => {}, + } + } + + /// Attempt to complete the request based on data in the cache. + pub fn respond_local(&self, cache: &Mutex<::cache::Cache>) -> Option { + match *self { + CheckedRequest::HeaderProof(ref check, _) => { + let mut cache = cache.lock(); + cache.block_hash(&check.num) + .and_then(|h| cache.chain_score(&h).map(|s| (h, s))) + .map(|(h, s)| Response::HeaderProof((h, s))) + } + CheckedRequest::HeaderByHash(_, ref req) => { + if let Some(&net_request::HashOrNumber::Hash(ref h)) = req.start.as_ref() { + return cache.lock().block_header(h).map(Response::HeaderByHash); + } + + None + } + CheckedRequest::Receipts(ref check, ref req) => { + // empty transactions -> no receipts + if check.0.as_ref().ok().map_or(false, |hdr| hdr.receipts_root() == SHA3_NULL_RLP) { + return Some(Response::Receipts(Vec::new())); + } + + req.hash.as_ref() + .and_then(|hash| cache.lock().block_receipts(hash)) + .map(Response::Receipts) + } + CheckedRequest::Body(ref check, ref req) => { + // check for empty body. + if let Some(hdr) = check.0.as_ref().ok() { + if hdr.transactions_root() == SHA3_NULL_RLP && hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { + let mut stream = RlpStream::new_list(3); + stream.append_raw(hdr.rlp().as_raw(), 1); + stream.begin_list(0); + stream.begin_list(0); + + return Some(Response::Body(encoded::Block::new(stream.out()))); + } + } + + // otherwise, check for cached body and header. + let block_hash = req.hash.as_ref() + .cloned() + .or_else(|| check.0.as_ref().ok().map(|hdr| hdr.hash())); + let block_hash = match block_hash { + Some(hash) => hash, + None => return None, + }; + + let mut cache = cache.lock(); + let cached_header; + + // can't use as_ref here although it seems like you would be able to: + // it complains about uninitialized `cached_header`. + let block_header = match check.0.as_ref().ok() { + Some(hdr) => Some(hdr), + None => { + cached_header = cache.block_header(&block_hash); + cached_header.as_ref() + } + }; + + block_header + .and_then(|hdr| cache.block_body(&block_hash).map(|b| (hdr, b))) + .map(|(hdr, body)| { + let mut stream = RlpStream::new_list(3); + let body = body.rlp(); + stream.append_raw(&hdr.rlp().as_raw(), 1); + stream.append_raw(&body.at(0).as_raw(), 1); + stream.append_raw(&body.at(1).as_raw(), 1); + + Response::Body(encoded::Block::new(stream.out())) + }) + } + CheckedRequest::Code(_, ref req) => { + if req.code_hash.as_ref().map_or(false, |&h| h == SHA3_EMPTY) { + Some(Response::Code(Vec::new())) + } else { + None + } + } + _ => None, + } + } } macro_rules! match_me { @@ -279,37 +448,40 @@ macro_rules! match_me { } impl IncompleteRequest for CheckedRequest { - type Complete = net_request::CompleteRequest; + type Complete = CompleteRequest; type Response = net_request::Response; - /// Check prior outputs against the needed inputs. - /// - /// This is called to ensure consistency of this request with - /// others in the same packet. - fn check_outputs(&self, f: F) -> Result<(), net_request::NoSuchOutput> + fn check_outputs(&self, mut f: F) -> Result<(), net_request::NoSuchOutput> where F: FnMut(usize, usize, OutputKind) -> Result<(), net_request::NoSuchOutput> { - match_me!(*self, (_, ref req) => req.check_outputs(f)) + match *self { + CheckedRequest::HeaderProof(_, ref req) => req.check_outputs(f), + CheckedRequest::HeaderByHash(ref check, ref req) => { + req.check_outputs(&mut f)?; + + // make sure the output given is definitively a hash. + match check.0 { + Field::BackReference(r, idx) => f(r, idx, OutputKind::Hash), + _ => Ok(()), + } + } + CheckedRequest::Receipts(_, ref req) => req.check_outputs(f), + CheckedRequest::Body(_, ref req) => req.check_outputs(f), + CheckedRequest::Account(_, ref req) => req.check_outputs(f), + CheckedRequest::Code(_, ref req) => req.check_outputs(f), + CheckedRequest::Execution(_, ref req) => req.check_outputs(f), + } } - /// Note that this request will produce the following outputs. fn note_outputs(&self, f: F) where F: FnMut(usize, OutputKind) { match_me!(*self, (_, ref req) => req.note_outputs(f)) } - /// Fill fields of the request. - /// - /// This function is provided an "output oracle" which allows fetching of - /// prior request outputs. - /// Only outputs previously checked with `check_outputs` may be available. fn fill(&mut self, f: F) where F: Fn(usize, usize) -> Result { match_me!(*self, (_, ref mut req) => req.fill(f)) } - /// Will succeed if all fields have been filled, will fail otherwise. fn complete(self) -> Result { - use ::request::CompleteRequest; - match self { CheckedRequest::HeaderProof(_, req) => req.complete().map(CompleteRequest::HeaderProof), CheckedRequest::HeaderByHash(_, req) => req.complete().map(CompleteRequest::Headers), @@ -333,35 +505,42 @@ impl net_request::CheckedRequest for CheckedRequest { type Environment = Mutex<::cache::Cache>; /// Check whether the response matches (beyond the type). - fn check_response(&self, cache: &Mutex<::cache::Cache>, response: &Self::Response) -> Result { + fn check_response(&self, complete: &Self::Complete, cache: &Mutex<::cache::Cache>, response: &Self::Response) -> Result { use ::request::Response as NetResponse; // helper for expecting a specific response for a given request. macro_rules! expect { - ($res: pat => $e: expr) => { - match *response { + ($res: pat => $e: expr) => {{ + match (response, complete) { $res => $e, _ => Err(Error::WrongKind), } - } + }} } // check response against contained prover. match *self { - CheckedRequest::HeaderProof(ref prover, _) => expect!(NetResponse::HeaderProof(ref res) => - prover.check_response(cache, &res.proof).map(Response::HeaderProof)), - CheckedRequest::HeaderByHash(ref prover, _) => expect!(NetResponse::Headers(ref res) => - prover.check_response(cache, &res.headers).map(Response::HeaderByHash)), - CheckedRequest::Receipts(ref prover, _) => expect!(NetResponse::Receipts(ref res) => - prover.check_response(cache, &res.receipts).map(Response::Receipts)), - CheckedRequest::Body(ref prover, _) => expect!(NetResponse::Body(ref res) => - prover.check_response(cache, &res.body).map(Response::Body)), - CheckedRequest::Account(ref prover, _) => expect!(NetResponse::Account(ref res) => - prover.check_response(cache, &res.proof).map(Response::Account)), - CheckedRequest::Code(ref prover, _) => expect!(NetResponse::Code(ref res) => - prover.check_response(cache, &res.code).map(Response::Code)), - CheckedRequest::Execution(ref prover, _) => expect!(NetResponse::Execution(ref res) => - prover.check_response(cache, &res.items).map(Response::Execution)), + CheckedRequest::HeaderProof(ref prover, _) => + expect!((&NetResponse::HeaderProof(ref res), _) => + prover.check_response(cache, &res.proof).map(Response::HeaderProof)), + CheckedRequest::HeaderByHash(ref prover, _) => + expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) => + prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderByHash)), + CheckedRequest::Receipts(ref prover, _) => + expect!((&NetResponse::Receipts(ref res), _) => + prover.check_response(cache, &res.receipts).map(Response::Receipts)), + CheckedRequest::Body(ref prover, _) => + expect!((&NetResponse::Body(ref res), _) => + prover.check_response(cache, &res.body).map(Response::Body)), + CheckedRequest::Account(ref prover, _) => + expect!((&NetResponse::Account(ref res), _) => + prover.check_response(cache, &res.proof).map(Response::Account)), + CheckedRequest::Code(ref prover, _) => + expect!((&NetResponse::Code(ref res), &CompleteRequest::Code(ref req)) => + prover.check_response(cache, &req.code_hash, &res.code).map(Response::Code)), + CheckedRequest::Execution(ref prover, _) => + expect!((&NetResponse::Execution(ref res), _) => + prover.check_response(cache, &res.items).map(Response::Execution)), } } } @@ -387,6 +566,23 @@ pub enum Response { Execution(super::ExecutionResult), } +impl net_request::ResponseLike for Response { + fn fill_outputs(&self, mut f: F) where F: FnMut(usize, Output) { + match *self { + Response::HeaderProof((ref hash, _)) => f(0, Output::Hash(*hash)), + Response::Account(None) => { + f(0, Output::Hash(SHA3_EMPTY)); // code hash + f(1, Output::Hash(SHA3_NULL_RLP)); // storage root. + } + Response::Account(Some(ref acc)) => { + f(0, Output::Hash(acc.code_hash)); + f(1, Output::Hash(acc.storage_root)); + } + _ => {} + } + } +} + /// Errors in verification. #[derive(Debug, PartialEq)] pub enum Error { @@ -398,6 +594,10 @@ pub enum Error { Trie(TrieError), /// Bad inclusion proof BadProof, + /// Header by number instead of hash. + HeaderByNumber, + /// Unresolved header reference. + UnresolvedHeader(usize), /// Wrong header number. WrongNumber(u64, u64), /// Wrong hash. @@ -468,62 +668,63 @@ impl HeaderProof { /// Request for a header by hash. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct HeaderByHash(pub H256); +pub struct HeaderByHash(pub Field); impl HeaderByHash { /// Check a response for the header. - pub fn check_response(&self, cache: &Mutex<::cache::Cache>, headers: &[encoded::Header]) -> Result { + pub fn check_response( + &self, + cache: &Mutex<::cache::Cache>, + start: &net_request::HashOrNumber, + headers: &[encoded::Header] + ) -> Result { + let expected_hash = match (self.0, start) { + (Field::Scalar(ref h), &net_request::HashOrNumber::Hash(ref h2)) => { + if h != h2 { return Err(Error::WrongHash(*h, *h2)) } + *h + } + (_, &net_request::HashOrNumber::Hash(h2)) => h2, + _ => return Err(Error::HeaderByNumber), + }; + let header = headers.get(0).ok_or(Error::Empty)?; let hash = header.sha3(); - match hash == self.0 { + match hash == expected_hash { true => { cache.lock().insert_block_header(hash, header.clone()); Ok(header.clone()) } - false => Err(Error::WrongHash(self.0, hash)), + false => Err(Error::WrongHash(expected_hash, hash)), } } } -/// Request for a block, with header and precomputed hash. +/// Request for a block, with header for verification. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Body { - /// The block's header. - pub header: encoded::Header, - /// The block's hash. - pub hash: H256, -} +pub struct Body(pub HeaderRef); impl Body { - /// Create a request for a block body from a given header. - pub fn new(header: encoded::Header) -> Self { - let hash = header.hash(); - Body { - header: header, - hash: hash, - } - } - /// Check a response for this block body. pub fn check_response(&self, cache: &Mutex<::cache::Cache>, body: &encoded::Body) -> Result { // check the integrity of the the body against the header + let header = self.0.as_ref()?; let tx_root = ::util::triehash::ordered_trie_root(body.rlp().at(0).iter().map(|r| r.as_raw().to_vec())); - if tx_root != self.header.transactions_root() { - return Err(Error::WrongTrieRoot(self.header.transactions_root(), tx_root)); + if tx_root != header.transactions_root() { + return Err(Error::WrongTrieRoot(header.transactions_root(), tx_root)); } let uncles_hash = body.rlp().at(1).as_raw().sha3(); - if uncles_hash != self.header.uncles_hash() { - return Err(Error::WrongHash(self.header.uncles_hash(), uncles_hash)); + if uncles_hash != header.uncles_hash() { + return Err(Error::WrongHash(header.uncles_hash(), uncles_hash)); } // concatenate the header and the body. let mut stream = RlpStream::new_list(3); - stream.append_raw(self.header.rlp().as_raw(), 1); + stream.append_raw(header.rlp().as_raw(), 1); stream.append_raw(body.rlp().at(0).as_raw(), 1); stream.append_raw(body.rlp().at(1).as_raw(), 1); - cache.lock().insert_block_body(self.hash, body.clone()); + cache.lock().insert_block_body(header.hash(), body.clone()); Ok(encoded::Block::new(stream.out())) } @@ -531,12 +732,12 @@ impl Body { /// Request for a block's receipts with header for verification. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct BlockReceipts(pub encoded::Header); +pub struct BlockReceipts(pub HeaderRef); impl BlockReceipts { /// Check a response with receipts against the stored header. pub fn check_response(&self, cache: &Mutex<::cache::Cache>, receipts: &[Receipt]) -> Result, Error> { - let receipts_root = self.0.receipts_root(); + let receipts_root = self.0.as_ref()?.receipts_root(); let found_root = ::util::triehash::ordered_trie_root(receipts.iter().map(|r| ::rlp::encode(r).to_vec())); match receipts_root == found_root { @@ -553,7 +754,7 @@ impl BlockReceipts { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Account { /// Header for verification. - pub header: encoded::Header, + pub header: HeaderRef, /// Address requested. pub address: Address, } @@ -561,7 +762,8 @@ pub struct Account { impl Account { /// Check a response with an account against the stored header. pub fn check_response(&self, _: &Mutex<::cache::Cache>, proof: &[Bytes]) -> Result, Error> { - let state_root = self.header.state_root(); + let header = self.header.as_ref()?; + let state_root = header.state_root(); let mut db = MemoryDB::new(); for node in proof { db.insert(&node[..]); } @@ -584,20 +786,25 @@ impl Account { /// Request for account code. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Code { - /// Block hash, number pair. - pub block_id: (H256, u64), + /// Header reference. + pub header: HeaderRef, /// Account's code hash. - pub code_hash: H256, + pub code_hash: Field, } impl Code { /// Check a response with code against the code hash. - pub fn check_response(&self, _: &Mutex<::cache::Cache>, code: &[u8]) -> Result, Error> { + pub fn check_response( + &self, + _: &Mutex<::cache::Cache>, + code_hash: &H256, + code: &[u8] + ) -> Result, Error> { let found_hash = code.sha3(); - if found_hash == self.code_hash { + if &found_hash == code_hash { Ok(code.to_vec()) } else { - Err(Error::WrongHash(self.code_hash, found_hash)) + Err(Error::WrongHash(*code_hash, found_hash)) } } } @@ -608,8 +815,9 @@ pub struct TransactionProof { /// The transaction to request proof of. pub tx: SignedTransaction, /// Block header. - pub header: encoded::Header, + pub header: HeaderRef, /// Transaction environment info. + // TODO: it's not really possible to provide this if the header is unknown. pub env_info: EnvInfo, /// Consensus engine. pub engine: Arc, @@ -618,7 +826,7 @@ pub struct TransactionProof { impl TransactionProof { /// Check the proof, returning the proved execution or indicate that the proof was bad. pub fn check_response(&self, _: &Mutex<::cache::Cache>, state_items: &[DBValue]) -> Result { - let root = self.header.state_root(); + let root = self.header.as_ref()?.state_root(); let mut env_info = self.env_info.clone(); env_info.gas_limit = self.tx.gas.clone(); @@ -697,7 +905,7 @@ mod tests { let raw_header = encoded::Header::new(::rlp::encode(&header).to_vec()); let cache = Mutex::new(make_cache()); - assert!(HeaderByHash(hash).check_response(&cache, &[raw_header]).is_ok()) + assert!(HeaderByHash(hash.into()).check_response(&cache, &hash.into(), &[raw_header]).is_ok()) } #[test] @@ -708,10 +916,7 @@ mod tests { let mut body_stream = RlpStream::new_list(2); body_stream.begin_list(0).begin_list(0); - let req = Body { - header: encoded::Header::new(::rlp::encode(&header).to_vec()), - hash: header.hash(), - }; + let req = Body(encoded::Header::new(::rlp::encode(&header).to_vec()).into()); let cache = Mutex::new(make_cache()); let response = encoded::Body::new(body_stream.drain().to_vec()); @@ -734,7 +939,7 @@ mod tests { header.set_receipts_root(receipts_root); - let req = BlockReceipts(encoded::Header::new(::rlp::encode(&header).to_vec())); + let req = BlockReceipts(encoded::Header::new(::rlp::encode(&header).to_vec()).into()); let cache = Mutex::new(make_cache()); assert!(req.check_response(&cache, &receipts).is_ok()) @@ -782,7 +987,7 @@ mod tests { header.set_state_root(root.clone()); let req = Account { - header: encoded::Header::new(::rlp::encode(&header).to_vec()), + header: encoded::Header::new(::rlp::encode(&header).to_vec()).into(), address: addr, }; @@ -793,13 +998,15 @@ mod tests { #[test] fn check_code() { let code = vec![1u8; 256]; + let code_hash = ::util::Hashable::sha3(&code); + let header = Header::new(); let req = Code { - block_id: (Default::default(), 2), - code_hash: ::util::Hashable::sha3(&code), + header: encoded::Header::new(::rlp::encode(&header).to_vec()).into(), + code_hash: code_hash.into(), }; let cache = Mutex::new(make_cache()); - assert!(req.check_response(&cache, &code).is_ok()); - assert!(req.check_response(&cache, &[]).is_err()); + assert!(req.check_response(&cache, &code_hash, &code).is_ok()); + assert!(req.check_response(&cache, &code_hash, &[]).is_err()); } } diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs index d5789c5e1..10c4ceae5 100644 --- a/ethcore/light/src/on_demand/tests.rs +++ b/ethcore/light/src/on_demand/tests.rs @@ -28,7 +28,7 @@ use ::request::{self as basic_request, Response}; use std::sync::Arc; -use super::{request, OnDemand, Peer}; +use super::{request, OnDemand, Peer, HeaderRef}; // useful contexts to give the service. enum Context { @@ -122,7 +122,10 @@ fn dummy_capabilities() -> Capabilities { #[test] fn detects_hangup() { let on_demand = Harness::create().service; - let result = on_demand.header_by_hash(&Context::NoOp, request::HeaderByHash(H256::default())); + let result = on_demand.request_raw( + &Context::NoOp, + vec![request::HeaderByHash(H256::default().into()).into()], + ); assert_eq!(on_demand.pending.read().len(), 1); drop(result); @@ -148,7 +151,7 @@ fn single_request() { let recv = harness.service.request_raw( &Context::NoOp, - vec![request::HeaderByHash(header.hash()).into()] + vec![request::HeaderByHash(header.hash().into()).into()] ).unwrap(); assert_eq!(harness.service.pending.read().len(), 1); @@ -182,7 +185,7 @@ fn no_capabilities() { let _recv = harness.service.request_raw( &Context::NoOp, - vec![request::HeaderByHash(Default::default()).into()] + vec![request::HeaderByHash(H256::default().into()).into()] ).unwrap(); assert_eq!(harness.service.pending.read().len(), 1); @@ -209,7 +212,7 @@ fn reassign() { let recv = harness.service.request_raw( &Context::NoOp, - vec![request::HeaderByHash(header.hash()).into()] + vec![request::HeaderByHash(header.hash().into()).into()] ).unwrap(); assert_eq!(harness.service.pending.read().len(), 1); @@ -264,8 +267,8 @@ fn partial_response() { let recv = harness.service.request_raw( &Context::NoOp, vec![ - request::HeaderByHash(header1.hash()).into(), - request::HeaderByHash(header2.hash()).into(), + request::HeaderByHash(header1.hash().into()).into(), + request::HeaderByHash(header2.hash().into()).into(), ], ).unwrap(); @@ -323,8 +326,8 @@ fn part_bad_part_good() { let recv = harness.service.request_raw( &Context::NoOp, vec![ - request::HeaderByHash(header1.hash()).into(), - request::HeaderByHash(header2.hash()).into(), + request::HeaderByHash(header1.hash().into()).into(), + request::HeaderByHash(header2.hash().into()).into(), ], ).unwrap(); @@ -378,7 +381,7 @@ fn wrong_kind() { let _recv = harness.service.request_raw( &Context::NoOp, - vec![request::HeaderByHash(Default::default()).into()] + vec![request::HeaderByHash(H256::default().into()).into()] ).unwrap(); assert_eq!(harness.service.pending.read().len(), 1); @@ -395,3 +398,100 @@ fn wrong_kind() { assert_eq!(harness.service.pending.read().len(), 1); } + +#[test] +fn back_references() { + let harness = Harness::create(); + + let peer_id = 10101; + let req_id = ReqId(14426); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let header = Header::default(); + let encoded = encoded::Header::new(header.rlp(Seal::With)); + + let recv = harness.service.request_raw( + &Context::NoOp, + vec![ + request::HeaderByHash(header.hash().into()).into(), + request::BlockReceipts(HeaderRef::Unresolved(0, header.hash().into())).into(), + ] + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id)); + + assert_eq!(harness.service.pending.read().len(), 0); + + harness.service.on_responses( + &Context::WithPeer(peer_id), + req_id, + &[ + Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] }), + Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] }), + ] + ); + + assert!(recv.wait().is_ok()); +} + +#[test] +#[should_panic] +fn bad_back_reference() { + let harness = Harness::create(); + + let header = Header::default(); + + let _ = harness.service.request_raw( + &Context::NoOp, + vec![ + request::HeaderByHash(header.hash().into()).into(), + request::BlockReceipts(HeaderRef::Unresolved(1, header.hash().into())).into(), + ] + ).unwrap(); +} + +#[test] +fn fill_from_cache() { + let harness = Harness::create(); + + let peer_id = 10101; + let req_id = ReqId(14426); + + harness.inject_peer(peer_id, Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + }); + + let header = Header::default(); + let encoded = encoded::Header::new(header.rlp(Seal::With)); + + let recv = harness.service.request_raw( + &Context::NoOp, + vec![ + request::HeaderByHash(header.hash().into()).into(), + request::BlockReceipts(HeaderRef::Unresolved(0, header.hash().into())).into(), + ] + ).unwrap(); + + assert_eq!(harness.service.pending.read().len(), 1); + + harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id)); + + assert_eq!(harness.service.pending.read().len(), 0); + + harness.service.on_responses( + &Context::WithPeer(peer_id), + req_id, + &[ + Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] }), + ] + ); + + assert!(recv.wait().is_ok()); +} diff --git a/ethcore/light/src/types/request/builder.rs b/ethcore/light/src/types/request/builder.rs index dff33513a..0b413677d 100644 --- a/ethcore/light/src/types/request/builder.rs +++ b/ethcore/light/src/types/request/builder.rs @@ -19,6 +19,7 @@ //! supplied as well. use std::collections::HashMap; +use std::ops::{Deref, DerefMut}; use request::{ IncompleteRequest, OutputKind, Output, NoSuchOutput, ResponseError, ResponseLike, }; @@ -124,23 +125,14 @@ impl Requests { req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput)) } } -} -impl Requests { - /// Supply a response for the next request. - /// Fails on: wrong request kind, all requests answered already. - pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response) - -> Result> - { - let idx = self.answered; - - // check validity. - if self.is_complete() { return Err(ResponseError::Unexpected) } - - let extracted = self.requests[idx] - .check_response(env, response).map_err(ResponseError::Validity)?; + /// Supply a response, asserting its correctness. + /// Fill outputs based upon it. + pub fn supply_response_unchecked(&mut self, response: &R) { + if self.is_complete() { return } let outputs = &mut self.outputs; + let idx = self.answered; response.fill_outputs(|out_idx, output| { // we don't need to check output kinds here because all back-references // are validated in the builder. @@ -154,7 +146,26 @@ impl Requests { if let Some(ref mut req) = self.requests.get_mut(self.answered) { req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput)) } + } +} +impl Requests { + /// Supply a response for the next request. + /// Fails on: wrong request kind, all requests answered already. + pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response) + -> Result> + { + let idx = self.answered; + + // check validity. + if idx == self.requests.len() { return Err(ResponseError::Unexpected) } + let completed = self.next_complete() + .expect("only fails when all requests have been answered; this just checked against; qed"); + + let extracted = self.requests[idx] + .check_response(&completed, env, response).map_err(ResponseError::Validity)?; + + self.supply_response_unchecked(response); Ok(extracted) } } @@ -182,6 +193,20 @@ impl Requests { } } +impl Deref for Requests { + type Target = [T]; + + fn deref(&self) -> &[T] { + &self.requests[..] + } +} + +impl DerefMut for Requests { + fn deref_mut(&mut self) -> &mut [T] { + &mut self.requests[..] + } +} + #[cfg(test)] mod tests { use request::*; diff --git a/ethcore/light/src/types/request/mod.rs b/ethcore/light/src/types/request/mod.rs index 3953aa88b..5b15a0180 100644 --- a/ethcore/light/src/types/request/mod.rs +++ b/ethcore/light/src/types/request/mod.rs @@ -83,7 +83,7 @@ pub enum ResponseError { } /// An input to a request. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Field { /// A pre-specified input. Scalar(T), @@ -93,6 +93,29 @@ pub enum Field { } impl Field { + /// Helper for creating a new back-reference field. + pub fn back_ref(idx: usize, req: usize) -> Self { + Field::BackReference(idx, req) + } + + /// map a scalar into some other item. + pub fn map(self, f: F) -> Field where F: FnOnce(T) -> U { + match self { + Field::Scalar(x) => Field::Scalar(f(x)), + Field::BackReference(req, idx) => Field::BackReference(req, idx), + } + } + + /// Attempt to get a reference to the inner scalar. + pub fn as_ref(&self) -> Option<&T> { + match *self { + Field::Scalar(ref x) => Some(x), + Field::BackReference(_, _) => None, + } + } + + + // attempt conversion into scalar value. fn into_scalar(self) -> Result { match self { @@ -400,7 +423,7 @@ impl CheckedRequest for Request { type Error = WrongKind; type Environment = (); - fn check_response(&self, _: &(), response: &Response) -> Result<(), WrongKind> { + fn check_response(&self, _: &Self::Complete, _: &(), response: &Response) -> Result<(), WrongKind> { if self.kind() == response.kind() { Ok(()) } else { @@ -587,7 +610,7 @@ pub trait CheckedRequest: IncompleteRequest { type Environment; /// Check whether the response matches (beyond the type). - fn check_response(&self, &Self::Environment, &Self::Response) -> Result; + fn check_response(&self, &Self::Complete, &Self::Environment, &Self::Response) -> Result; } /// A response-like object. diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs index c643daa2a..090521ba5 100644 --- a/parity/light_helpers/queue_cull.rs +++ b/parity/light_helpers/queue_cull.rs @@ -27,7 +27,7 @@ use light::client::Client; use light::on_demand::{request, OnDemand}; use light::TransactionQueue; -use futures::{future, stream, Future, Stream}; +use futures::{future, Future}; use parity_reactor::Remote; @@ -73,28 +73,32 @@ impl IoHandler for QueueCull { self.remote.spawn_with_timeout(move || { let maybe_fetching = sync.with_context(move |ctx| { // fetch the nonce of each sender in the queue. - let nonce_futures = senders.iter() - .map(|&address| request::Account { header: best_header.clone(), address: address }) - .map(move |request| { - on_demand.account(ctx, request) - .map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) - }) - .zip(senders.iter()) - .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); + let nonce_reqs = senders.iter() + .map(|&address| request::Account { header: best_header.clone().into(), address: address }) + .collect::>(); - // as they come in, update each sender to the new nonce. - stream::futures_unordered(nonce_futures) - .fold(txq, |txq, (address, nonce)| { - txq.write().cull(address, nonce); - future::ok(txq) + // when they come in, update each sender to the new nonce. + on_demand.request(ctx, nonce_reqs) + .expect("No back-references; therefore all back-references are valid; qed") + .map(move |accs| { + let txq = txq.write(); + let _ = accs.into_iter() + .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) + .zip(senders) + .fold(txq, |mut txq, (nonce, addr)| { + txq.cull(addr, nonce); + txq + }); }) - .map(|_| ()) // finally, discard the txq handle and log errors. .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) }); match maybe_fetching { Some(fut) => fut.boxed(), - None => future::ok(()).boxed(), + None => { + debug!(target: "cull", "Unable to acquire network context; qed"); + future::ok(()).boxed() + } } }, Duration::from_millis(PURGE_TIMEOUT_MS), || {}) } diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index 68fff78b7..efd118967 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -20,7 +20,7 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, Weak}; -use futures::{future, stream, Future, Stream, BoxFuture}; +use futures::{future, Future, BoxFuture}; use light::cache::Cache as LightDataCache; use light::client::LightChainClient; use light::on_demand::{request, OnDemand}; @@ -185,25 +185,28 @@ pub fn fetch_gas_price_corpus( let eventual_corpus = sync.with_context(|ctx| { // get some recent headers with gas used, // and request each of the blocks from the network. - let block_futures = client.ancestry_iter(BlockId::Latest) + let block_requests = client.ancestry_iter(BlockId::Latest) .filter(|hdr| hdr.gas_used() != U256::default()) .take(GAS_PRICE_SAMPLE_SIZE) - .map(request::Body::new) - .map(|req| on_demand.block(ctx, req)); + .map(|hdr| request::Body(hdr.into())) + .collect::>(); - // as the blocks come in, collect gas prices into a vector - stream::futures_unordered(block_futures) - .fold(Vec::new(), |mut v, block| { - for t in block.transaction_views().iter() { - v.push(t.gas_price()) - } + // when the blocks come in, collect gas prices into a vector + on_demand.request(ctx, block_requests) + .expect("no back-references; therefore all back-references are valid; qed") + .map(|bodies| { + bodies.into_iter().fold(Vec::new(), |mut v, block| { + for t in block.transaction_views().iter() { + v.push(t.gas_price()) + } - future::ok(v) + v + }) }) - .map(move |v| { + .map(move |prices| { // produce a corpus from the vector, cache it, and return // the median as the intended gas price. - let corpus: ::stats::Corpus<_> = v.into(); + let corpus: ::stats::Corpus<_> = prices.into(); cache.lock().set_gas_price_corpus(corpus.clone()); corpus }) @@ -282,10 +285,10 @@ impl LightDispatcher { let best_header = self.client.best_block_header(); let account_start_nonce = self.client.engine().account_start_nonce(); - let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { - header: best_header, + let nonce_future = self.sync.with_context(|ctx| self.on_demand.request(ctx, request::Account { + header: best_header.into(), address: addr, - })); + }).expect("no back-references; therefore all back-references valid; qed")); match nonce_future { Some(x) => diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 7132106cb..8bcd5115a 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -31,7 +31,8 @@ use jsonrpc_macros::Trailing; use light::cache::Cache; use light::client::LightChainClient; use light::cht; -use light::on_demand::{OnDemand, request}; +use light::on_demand::{request, OnDemand, HeaderRef, Request as OnDemandRequest, Response as OnDemandResponse}; +use light::request::Field; use ethsync::LightSync; use util::{Address, Mutex, Uint, U256}; @@ -55,51 +56,72 @@ pub struct LightFetch { /// Type alias for convenience. pub type ExecutionResult = Result; +// extract the header indicated by the given `HeaderRef` from the given responses. +// fails only if they do not correspond. +fn extract_header(res: &[OnDemandResponse], header: HeaderRef) -> Option { + match header { + HeaderRef::Stored(hdr) => Some(hdr), + HeaderRef::Unresolved(idx, _) => match res.get(idx) { + Some(&OnDemandResponse::HeaderByHash(ref hdr)) => Some(hdr.clone()), + _ => None, + }, + } +} + impl LightFetch { - /// Get a block header from the on demand service or client, or error. - pub fn header(&self, id: BlockId) -> BoxFuture { + // push the necessary requests onto the request chain to get the header by the given ID. + // yield a header reference which other requests can use. + fn make_header_requests(&self, id: BlockId, reqs: &mut Vec) -> Result { if let Some(h) = self.client.block_header(id) { - return future::ok(h).boxed() + return Ok(h.into()); } - let maybe_future = match id { + match id { BlockId::Number(n) => { let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize)); match cht_root { - None => return future::err(errors::unknown_block()).boxed(), + None => Err(errors::unknown_block()), Some(root) => { let req = request::HeaderProof::new(n, root) .expect("only fails for 0; client always stores genesis; client already queried; qed"); - let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); - self.sync.with_context(|ctx| { - let fut = self.on_demand.hash_by_number(ctx, req) - .map(request::HeaderByHash) - .map_err(errors::on_demand_cancel); + let idx = reqs.len(); + let hash_ref = Field::back_ref(idx, 0); + reqs.push(req.into()); + reqs.push(request::HeaderByHash(hash_ref.clone()).into()); - fut.and_then(move |req| { - match sync.with_context(|ctx| on_demand.header_by_hash(ctx, req)) { - Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), - None => future::err(errors::network_disabled()).boxed(), - } - }).boxed() - }) + Ok(HeaderRef::Unresolved(idx + 1, hash_ref)) } } } BlockId::Hash(h) => { - self.sync.with_context(|ctx| - self.on_demand.header_by_hash(ctx, request::HeaderByHash(h)) - .then(|res| future::done(match res { - Ok(h) => Ok(h), - Err(e) => Err(errors::on_demand_cancel(e)), - })) - .boxed() - ) + reqs.push(request::HeaderByHash(h.into()).into()); + + let idx = reqs.len(); + Ok(HeaderRef::Unresolved(idx, h.into())) } - _ => None, // latest, earliest, and pending will have all already returned. + _ => Err(errors::unknown_block()) // latest, earliest, and pending will have all already returned. + } + } + + /// Get a block header from the on demand service or client, or error. + pub fn header(&self, id: BlockId) -> BoxFuture { + let mut reqs = Vec::new(); + let header_ref = match self.make_header_requests(id, &mut reqs) { + Ok(r) => r, + Err(e) => return future::err(e).boxed(), }; + let maybe_future = self.sync.with_context(move |ctx| { + self.on_demand.request_raw(ctx, reqs) + .expect("all back-references known to be valid; qed") + .map(|res| extract_header(&res, header_ref) + .expect("these responses correspond to requests that header_ref belongs to. \ + therefore it will not fail; qed")) + .map_err(errors::on_demand_cancel) + .boxed() + }); + match maybe_future { Some(recv) => recv, None => future::err(errors::network_disabled()).boxed() @@ -109,19 +131,29 @@ impl LightFetch { /// helper for getting account info at a given block. /// `None` indicates the account doesn't exist at the given block. pub fn account(&self, address: Address, id: BlockId) -> BoxFuture, Error> { - let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); + let mut reqs = Vec::new(); + let header_ref = match self.make_header_requests(id, &mut reqs) { + Ok(r) => r, + Err(e) => return future::err(e).boxed(), + }; - self.header(id).and_then(move |header| { - let maybe_fut = sync.with_context(|ctx| on_demand.account(ctx, request::Account { - header: header, - address: address, - })); + reqs.push(request::Account { header: header_ref, address: address }.into()); - match maybe_fut { - Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), - None => future::err(errors::network_disabled()).boxed(), - } - }).boxed() + let maybe_future = self.sync.with_context(move |ctx| { + self.on_demand.request_raw(ctx, reqs) + .expect("all back-references known to be valid; qed") + .map(|mut res| match res.pop() { + Some(OnDemandResponse::Account(acc)) => acc, + _ => panic!("responses correspond directly with requests in amount and type; qed"), + }) + .map_err(errors::on_demand_cancel) + .boxed() + }); + + match maybe_future { + Some(recv) => recv, + None => future::err(errors::network_disabled()).boxed() + } } /// helper for getting proved execution. @@ -182,13 +214,16 @@ impl LightFetch { let request = request::TransactionProof { tx: tx, - header: hdr, + header: hdr.into(), env_info: env_info, engine: client.engine().clone(), }; let proved_future = sync.with_context(move |ctx| { - on_demand.transaction_proof(ctx, request).map_err(errors::on_demand_cancel).boxed() + on_demand + .request(ctx, request) + .expect("no back-references; therefore all back-refs valid; qed") + .map_err(errors::on_demand_cancel).boxed() }); match proved_future { @@ -200,13 +235,28 @@ impl LightFetch { /// get a block itself. fails on unknown block ID. pub fn block(&self, id: BlockId) -> BoxFuture { - let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); + let mut reqs = Vec::new(); + let header_ref = match self.make_header_requests(id, &mut reqs) { + Ok(r) => r, + Err(e) => return future::err(e).boxed(), + }; - self.header(id).map(request::Body::new).and_then(move |req| { - match sync.with_context(move |ctx| on_demand.block(ctx, req)) { - Some(fut) => fut.map_err(errors::on_demand_cancel).boxed(), - None => future::err(errors::network_disabled()).boxed(), - } - }).boxed() + reqs.push(request::Body(header_ref).into()); + + let maybe_future = self.sync.with_context(move |ctx| { + self.on_demand.request_raw(ctx, reqs) + .expect("all back-references known to be valid; qed") + .map(|mut res| match res.pop() { + Some(OnDemandResponse::Body(b)) => b, + _ => panic!("responses correspond directly with requests in amount and type; qed"), + }) + .map_err(errors::on_demand_cancel) + .boxed() + }); + + match maybe_future { + Some(recv) => recv, + None => future::err(errors::network_disabled()).boxed() + } } } diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index e02ccc987..fc61b0605 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -59,6 +59,8 @@ use v1::metadata::Metadata; use util::Address; +const NO_INVALID_BACK_REFS: &'static str = "Fails only on invalid back-references; back-references here known to be valid; qed"; + /// Light client `ETH` (and filter) RPC. pub struct EthClient { sync: Arc, @@ -186,16 +188,17 @@ impl EthClient { // - network is down. // - we get a score, but our hash is non-canonical. // - we get a score, and our hash is canonical. - let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req)); + let maybe_fut = sync.with_context(move |ctx| on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS)); match maybe_fut { - Some(fut) => fut.map(move |(hash, score)| { + Some(fut) => fut + .map(move |(hash, score)| { let score = if hash == block.hash() { Some(score) } else { None }; - fill_rich(block, score) + fill_rich(block, score) }).map_err(errors::on_demand_cancel).boxed(), None => return future::err(errors::network_disabled()).boxed(), } @@ -295,7 +298,8 @@ impl Eth for EthClient { if hdr.transactions_root() == SHA3_NULL_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { - sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + sync.with_context(|ctx| on_demand.request(ctx, request::Body(hdr.into()))) + .map(|x| x.expect(NO_INVALID_BACK_REFS)) .map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into()))) .map(|x| x.map_err(errors::on_demand_cancel).boxed()) .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) @@ -310,7 +314,8 @@ impl Eth for EthClient { if hdr.transactions_root() == SHA3_NULL_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { - sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + sync.with_context(|ctx| on_demand.request(ctx, request::Body(hdr.into()))) + .map(|x| x.expect(NO_INVALID_BACK_REFS)) .map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into()))) .map(|x| x.map_err(errors::on_demand_cancel).boxed()) .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) @@ -325,7 +330,8 @@ impl Eth for EthClient { if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { - sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + sync.with_context(|ctx| on_demand.request(ctx, request::Body(hdr.into()))) + .map(|x| x.expect(NO_INVALID_BACK_REFS)) .map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into()))) .map(|x| x.map_err(errors::on_demand_cancel).boxed()) .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) @@ -340,7 +346,8 @@ impl Eth for EthClient { if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { - sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + sync.with_context(|ctx| on_demand.request(ctx, request::Body(hdr.into()))) + .map(|x| x.expect(NO_INVALID_BACK_REFS)) .map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into()))) .map(|x| x.map_err(errors::on_demand_cancel).boxed()) .unwrap_or_else(|| future::err(errors::network_disabled()).boxed()) @@ -501,8 +508,8 @@ impl Filterable for EthClient { let hdr_bloom = hdr.log_bloom(); bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some() }) - .map(|hdr| (hdr.number(), request::BlockReceipts(hdr))) - .map(|(num, req)| self.on_demand.block_receipts(ctx, req).map(move |x| (num, x))) + .map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into()))) + .map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x))) .collect(); // as the receipts come in, find logs within them which match the filter.