Back-references for the on-demand service (#5573)

* header back-references for on demand

* initial back-reference implementation for on demand requests

* answer requests from cache

* answer requests from cache, add tests

* strongly typed responses for vectors of homogeneous requests

* fix fallout in RPC without optimizing
This commit is contained in:
Robert Habermeier
2017-05-23 06:39:25 -04:00
committed by Arkadiy Paronyan
parent aa41b48ba0
commit 386cdb830d
9 changed files with 778 additions and 426 deletions

View File

@@ -22,24 +22,19 @@ use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::receipt::Receipt;
use ethcore::executed::{Executed, ExecutionError};
use futures::{future, Async, Poll, Future, BoxFuture};
use futures::{Async, Poll, Future};
use futures::sync::oneshot::{self, Sender, Receiver, Canceled};
use network::PeerId;
use rlp::RlpStream;
use util::{Bytes, RwLock, Mutex, U256, H256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP};
use util::{RwLock, Mutex};
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use cache::Cache;
use request::{self as basic_request, Request as NetworkRequest};
use self::request::CheckedRequest;
pub use self::request::{Request, Response};
pub use self::request::{Request, Response, HeaderRef};
#[cfg(test)]
mod tests;
@@ -75,6 +70,98 @@ struct Pending {
sender: oneshot::Sender<Vec<Response>>,
}
impl Pending {
// answer as many of the given requests from the supplied cache as possible.
// TODO: support re-shuffling.
fn answer_from_cache(&mut self, cache: &Mutex<Cache>) {
while !self.requests.is_complete() {
let idx = self.requests.num_answered();
match self.requests[idx].respond_local(cache) {
Some(response) => {
self.requests.supply_response_unchecked(&response);
self.update_header_refs(idx, &response);
self.responses.push(response);
}
None => break,
}
}
}
// update header refs if the given response contains a header future requests require for
// verification.
// `idx` is the index of the request the response corresponds to.
fn update_header_refs(&mut self, idx: usize, response: &Response) {
match *response {
Response::HeaderByHash(ref hdr) => {
// fill the header for all requests waiting on this one.
// TODO: could be faster if we stored a map usize => Vec<usize>
// but typical use just has one header request that others
// depend on.
for r in self.requests.iter_mut().skip(idx + 1) {
if r.needs_header().map_or(false, |(i, _)| i == idx) {
r.provide_header(hdr.clone())
}
}
}
_ => {}, // no other responses produce headers.
}
}
// supply a response.
fn supply_response(&mut self, cache: &Mutex<Cache>, response: &basic_request::Response)
-> Result<(), basic_request::ResponseError<self::request::Error>>
{
match self.requests.supply_response(&cache, response) {
Ok(response) => {
let idx = self.responses.len();
self.update_header_refs(idx, &response);
self.responses.push(response);
Ok(())
}
Err(e) => Err(e),
}
}
// if the requests are complete, send the result and consume self.
fn try_complete(self) -> Option<Self> {
if self.requests.is_complete() {
let _ = self.sender.send(self.responses);
None
} else {
Some(self)
}
}
fn fill_unanswered(&mut self) {
self.requests.fill_unanswered();
}
// update the cached network requests.
fn update_net_requests(&mut self) {
use request::IncompleteRequest;
let mut builder = basic_request::RequestBuilder::default();
let num_answered = self.requests.num_answered();
let mut mapping = move |idx| idx - num_answered;
for request in self.requests.iter().skip(num_answered) {
let mut net_req = request.clone().into_net_request();
// all back-references with request index less than `num_answered` have
// been filled by now. all remaining requests point to nothing earlier
// than the next unanswered request.
net_req.adjust_refs(&mut mapping);
builder.push(net_req)
.expect("all back-references to answered requests have been filled; qed");
}
// update pending fields.
let capabilities = guess_capabilities(&self.requests[num_answered..]);
self.net_requests = builder.build();
self.required_capabilities = capabilities;
}
}
// helper to guess capabilities required for a given batch of network requests.
fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
let mut caps = Capabilities {
@@ -97,16 +184,21 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
caps.serve_headers = true,
CheckedRequest::HeaderByHash(_, _) =>
caps.serve_headers = true,
CheckedRequest::Body(ref req, _) =>
update_since(&mut caps.serve_chain_since, req.header.number()),
CheckedRequest::Receipts(ref req, _) =>
update_since(&mut caps.serve_chain_since, req.0.number()),
CheckedRequest::Account(ref req, _) =>
update_since(&mut caps.serve_state_since, req.header.number()),
CheckedRequest::Code(ref req, _) =>
update_since(&mut caps.serve_state_since, req.block_id.1),
CheckedRequest::Execution(ref req, _) =>
update_since(&mut caps.serve_state_since, req.header.number()),
CheckedRequest::Body(ref req, _) => if let Ok(ref hdr) = req.0.as_ref() {
update_since(&mut caps.serve_chain_since, hdr.number());
},
CheckedRequest::Receipts(ref req, _) => if let Ok(ref hdr) = req.0.as_ref() {
update_since(&mut caps.serve_chain_since, hdr.number());
},
CheckedRequest::Account(ref req, _) => if let Ok(ref hdr) = req.header.as_ref() {
update_since(&mut caps.serve_state_since, hdr.number());
},
CheckedRequest::Code(ref req, _) => if let Ok(ref hdr) = req.header.as_ref() {
update_since(&mut caps.serve_state_since, hdr.number());
},
CheckedRequest::Execution(ref req, _) => if let Ok(ref hdr) = req.header.as_ref() {
update_since(&mut caps.serve_state_since, hdr.number());
},
}
}
@@ -163,158 +255,6 @@ impl OnDemand {
me
}
/// Request a header's hash by block number and CHT root hash.
/// Returns the hash.
pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<H256, Canceled> {
let cached = {
let mut cache = self.cache.lock();
cache.block_hash(&req.num())
};
match cached {
Some(hash) => future::ok(hash).boxed(),
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.map(|(h, _)| h)
.boxed()
},
}
}
/// Request a canonical block's chain score.
/// Returns the chain score.
pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<U256, Canceled> {
let cached = {
let mut cache = self.cache.lock();
cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
};
match cached {
Some(score) => future::ok(score).boxed(),
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.map(|(_, s)| s)
.boxed()
},
}
}
/// Request a canonical block's hash and chain score by number.
/// Returns the hash and chain score.
pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> BoxFuture<(H256, U256), Canceled> {
let cached = {
let mut cache = self.cache.lock();
let hash = cache.block_hash(&req.num());
(
hash.clone(),
hash.and_then(|hash| cache.chain_score(&hash)),
)
};
match cached {
(Some(hash), Some(score)) => future::ok((hash, score)).boxed(),
_ => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
},
}
}
/// Request a header by hash. This is less accurate than by-number because we don't know
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have
/// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> BoxFuture<encoded::Header, Canceled> {
match { self.cache.lock().block_header(&req.0) } {
Some(hdr) => future::ok(hdr).boxed(),
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
},
}
}
/// Request a block, given its header. Block bodies are requestable by hash only,
/// and the header is required anyway to verify and complete the block body
/// -- this just doesn't obscure the network query.
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> BoxFuture<encoded::Block, Canceled> {
// fast path for empty body.
if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
let mut stream = RlpStream::new_list(3);
stream.append_raw(&req.header.into_inner(), 1);
stream.begin_list(0);
stream.begin_list(0);
future::ok(encoded::Block::new(stream.out())).boxed()
} else {
match { self.cache.lock().block_body(&req.hash) } {
Some(body) => {
let mut stream = RlpStream::new_list(3);
let body = body.rlp();
stream.append_raw(&req.header.into_inner(), 1);
stream.append_raw(&body.at(0).as_raw(), 1);
stream.append_raw(&body.at(1).as_raw(), 1);
future::ok(encoded::Block::new(stream.out())).boxed()
}
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
}
}
}
}
/// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> BoxFuture<Vec<Receipt>, Canceled> {
// fast path for empty receipts.
if req.0.receipts_root() == SHA3_NULL_RLP {
return future::ok(Vec::new()).boxed()
}
match { self.cache.lock().block_receipts(&req.0.hash()) } {
Some(receipts) => future::ok(receipts).boxed(),
None => {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
},
}
}
/// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against.
/// `None` here means that no account by the queried key exists in the queried state.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> BoxFuture<Option<BasicAccount>, Canceled> {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
}
/// Request code by address, known code hash, and block header.
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> BoxFuture<Bytes, Canceled> {
// fast path for no code.
if req.code_hash == SHA3_EMPTY {
future::ok(Vec::new()).boxed()
} else {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
}
}
/// Request proof-of-execution for a transaction.
pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> BoxFuture<ExecutionResult, Canceled> {
self.request(ctx, req)
.expect("request given fully fleshed out; qed")
.boxed()
}
/// Submit a vector of requests to be processed together.
///
/// Fails if back-references are not coherent.
@@ -332,15 +272,33 @@ impl OnDemand {
let mut builder = basic_request::RequestBuilder::default();
let responses = Vec::with_capacity(requests.len());
for request in requests {
builder.push(CheckedRequest::from(request))?;
let mut header_producers = HashMap::new();
for (i, request) in requests.into_iter().enumerate() {
let request = CheckedRequest::from(request);
// ensure that all requests needing headers will get them.
if let Some((idx, field)) = request.needs_header() {
// a request chain with a header back-reference is valid only if it both
// points to a request that returns a header and has the same back-reference
// for the block hash.
match header_producers.get(&idx) {
Some(ref f) if &field == *f => {}
_ => return Err(basic_request::NoSuchOutput),
}
}
if let CheckedRequest::HeaderByHash(ref req, _) = request {
header_producers.insert(i, req.0.clone());
}
builder.push(request)?;
}
let requests = builder.build();
let net_requests = requests.clone().map_requests(|req| req.into_net_request());
let capabilities = guess_capabilities(requests.requests());
self.pending.write().push(Pending {
self.submit_pending(ctx, Pending {
requests: requests,
net_requests: net_requests,
required_capabilities: capabilities,
@@ -348,8 +306,6 @@ impl OnDemand {
sender: sender,
});
self.attempt_dispatch(ctx);
Ok(receiver)
}
@@ -430,6 +386,19 @@ impl OnDemand {
})
.collect(); // `pending` now contains all requests we couldn't dispatch.
}
// submit a pending request set. attempts to answer from cache before
// going to the network. if complete, sends response and consumes the struct.
fn submit_pending(&self, ctx: &BasicContext, mut pending: Pending) {
// answer as many requests from cache as we can, and schedule for dispatch
// if incomplete.
pending.answer_from_cache(&*self.cache);
if let Some(mut pending) = pending.try_complete() {
pending.update_net_requests();
self.pending.write().push(pending);
self.attempt_dispatch(ctx);
}
}
}
impl Handler for OnDemand {
@@ -468,63 +437,27 @@ impl Handler for OnDemand {
}
fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) {
use request::IncompleteRequest;
let mut pending = match self.in_transit.write().remove(&req_id) {
Some(req) => req,
None => return,
};
// for each incoming response
// 1. ensure verification data filled. (still TODO since on_demand doesn't use back-references yet)
// 1. ensure verification data filled.
// 2. pending.requests.supply_response
// 3. if extracted on-demand response, keep it for later.
for response in responses {
match pending.requests.supply_response(&*self.cache, response) {
Ok(response) => {
pending.responses.push(response)
}
Err(e) => {
let peer = ctx.peer();
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e);
ctx.disable_peer(peer);
if let Err(e) = pending.supply_response(&*self.cache, response) {
let peer = ctx.peer();
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e);
ctx.disable_peer(peer);
break;
}
break;
}
}
pending.requests.fill_unanswered();
if pending.requests.is_complete() {
let _ = pending.sender.send(pending.responses);
return;
}
// update network requests (unless we're done, in which case fulfill the future.)
let mut builder = basic_request::RequestBuilder::default();
let num_answered = pending.requests.num_answered();
let mut mapping = move |idx| idx - num_answered;
for request in pending.requests.requests().iter().skip(num_answered) {
let mut net_req = request.clone().into_net_request();
// all back-references with request index less than `num_answered` have
// been filled by now. all remaining requests point to nothing earlier
// than the next unanswered request.
net_req.adjust_refs(&mut mapping);
builder.push(net_req)
.expect("all back-references to answered requests have been filled; qed");
}
// update pending fields and re-queue.
let capabilities = guess_capabilities(&pending.requests.requests()[num_answered..]);
pending.net_requests = builder.build();
pending.required_capabilities = capabilities;
self.pending.write().push(pending);
self.attempt_dispatch(ctx.as_basic());
pending.fill_unanswered();
self.submit_pending(ctx.as_basic(), pending);
}
fn tick(&self, ctx: &BasicContext) {