Better support for eth_getLogs in light mode (#9186)

* Light client on-demand request for headers range.

* Cache headers in HeaderWithAncestors response.

Also fulfills request locally if all headers are in cache.

* LightFetch::logs fetches missing headers on demand.

* LightFetch::logs limit the number of headers requested at a time.

* LightFetch::logs refactor header fetching logic.

* Enforce limit on header range length in light client logs request.

* Fix light request tests after struct change.

* Respond to review comments.
This commit is contained in:
Jim Posen 2018-08-25 14:06:01 -07:00 committed by Afri Schoedon
parent 7abe9ec4cc
commit 9ed43230ca
9 changed files with 418 additions and 70 deletions

1
Cargo.lock generated
View File

@ -2146,6 +2146,7 @@ dependencies = [
"ethstore 0.2.0", "ethstore 0.2.0",
"fake-fetch 0.0.1", "fake-fetch 0.0.1",
"fake-hardware-wallet 0.0.1", "fake-hardware-wallet 0.0.1",
"fastmap 0.1.0",
"fetch 0.1.0", "fetch 0.1.0",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -43,7 +43,7 @@ pub mod provider;
mod types; mod types;
pub use self::cache::Cache; pub use self::cache::Cache;
pub use self::provider::Provider; pub use self::provider::{Provider, MAX_HEADERS_PER_REQUEST};
pub use self::transaction_queue::TransactionQueue; pub use self::transaction_queue::TransactionQueue;
pub use types::request as request; pub use types::request as request;

View File

@ -204,6 +204,8 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
caps.serve_headers = true, caps.serve_headers = true,
CheckedRequest::HeaderByHash(_, _) => CheckedRequest::HeaderByHash(_, _) =>
caps.serve_headers = true, caps.serve_headers = true,
CheckedRequest::HeaderWithAncestors(_, _) =>
caps.serve_headers = true,
CheckedRequest::TransactionIndex(_, _) => {} // hashes yield no info. CheckedRequest::TransactionIndex(_, _) => {} // hashes yield no info.
CheckedRequest::Signal(_, _) => CheckedRequest::Signal(_, _) =>
caps.serve_headers = true, caps.serve_headers = true,

View File

@ -16,6 +16,7 @@
//! Request types, verification, and verification errors. //! Request types, verification, and verification errors.
use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use bytes::Bytes; use bytes::Bytes;
@ -47,6 +48,8 @@ pub enum Request {
HeaderProof(HeaderProof), HeaderProof(HeaderProof),
/// A request for a header by hash. /// A request for a header by hash.
HeaderByHash(HeaderByHash), HeaderByHash(HeaderByHash),
/// A request for a header by hash with a range of its ancestors.
HeaderWithAncestors(HeaderWithAncestors),
/// A request for the index of a transaction. /// A request for the index of a transaction.
TransactionIndex(TransactionIndex), TransactionIndex(TransactionIndex),
/// A request for block receipts. /// A request for block receipts.
@ -136,6 +139,7 @@ macro_rules! impl_single {
// implement traits for each kind of request. // implement traits for each kind of request.
impl_single!(HeaderProof, HeaderProof, (H256, U256)); impl_single!(HeaderProof, HeaderProof, (H256, U256));
impl_single!(HeaderByHash, HeaderByHash, encoded::Header); impl_single!(HeaderByHash, HeaderByHash, encoded::Header);
impl_single!(HeaderWithAncestors, HeaderWithAncestors, Vec<encoded::Header>);
impl_single!(TransactionIndex, TransactionIndex, net_request::TransactionIndexResponse); impl_single!(TransactionIndex, TransactionIndex, net_request::TransactionIndexResponse);
impl_single!(Receipts, BlockReceipts, Vec<Receipt>); impl_single!(Receipts, BlockReceipts, Vec<Receipt>);
impl_single!(Body, Body, encoded::Block); impl_single!(Body, Body, encoded::Block);
@ -246,6 +250,7 @@ impl From<encoded::Header> for HeaderRef {
pub enum CheckedRequest { pub enum CheckedRequest {
HeaderProof(HeaderProof, net_request::IncompleteHeaderProofRequest), HeaderProof(HeaderProof, net_request::IncompleteHeaderProofRequest),
HeaderByHash(HeaderByHash, net_request::IncompleteHeadersRequest), HeaderByHash(HeaderByHash, net_request::IncompleteHeadersRequest),
HeaderWithAncestors(HeaderWithAncestors, net_request::IncompleteHeadersRequest),
TransactionIndex(TransactionIndex, net_request::IncompleteTransactionIndexRequest), TransactionIndex(TransactionIndex, net_request::IncompleteTransactionIndexRequest),
Receipts(BlockReceipts, net_request::IncompleteReceiptsRequest), Receipts(BlockReceipts, net_request::IncompleteReceiptsRequest),
Body(Body, net_request::IncompleteBodyRequest), Body(Body, net_request::IncompleteBodyRequest),
@ -268,6 +273,16 @@ impl From<Request> for CheckedRequest {
trace!(target: "on_demand", "HeaderByHash Request, {:?}", net_req); trace!(target: "on_demand", "HeaderByHash Request, {:?}", net_req);
CheckedRequest::HeaderByHash(req, net_req) CheckedRequest::HeaderByHash(req, net_req)
} }
Request::HeaderWithAncestors(req) => {
let net_req = net_request::IncompleteHeadersRequest {
start: req.block_hash.map(Into::into),
skip: 0,
max: req.ancestor_count + 1,
reverse: true,
};
trace!(target: "on_demand", "HeaderWithAncestors Request, {:?}", net_req);
CheckedRequest::HeaderWithAncestors(req, net_req)
}
Request::HeaderProof(req) => { Request::HeaderProof(req) => {
let net_req = net_request::IncompleteHeaderProofRequest { let net_req = net_request::IncompleteHeaderProofRequest {
num: req.num().into(), num: req.num().into(),
@ -344,6 +359,7 @@ impl CheckedRequest {
match self { match self {
CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req), CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req),
CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req), CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req),
CheckedRequest::HeaderWithAncestors(_, req) => NetRequest::Headers(req),
CheckedRequest::TransactionIndex(_, req) => NetRequest::TransactionIndex(req), CheckedRequest::TransactionIndex(_, req) => NetRequest::TransactionIndex(req),
CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req), CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req),
CheckedRequest::Body(_, req) => NetRequest::Body(req), CheckedRequest::Body(_, req) => NetRequest::Body(req),
@ -399,6 +415,27 @@ impl CheckedRequest {
None None
} }
CheckedRequest::HeaderWithAncestors(_, ref req) => {
if req.skip != 1 || !req.reverse {
return None;
}
if let Some(&net_request::HashOrNumber::Hash(start)) = req.start.as_ref() {
let mut result = Vec::with_capacity(req.max as usize);
let mut hash = start;
let mut cache = cache.lock();
for _ in 0..req.max {
match cache.block_header(&hash) {
Some(header) => {
hash = header.parent_hash();
result.push(header);
}
None => return None,
}
}
Some(Response::HeaderWithAncestors(result))
} else { None }
}
CheckedRequest::Receipts(ref check, ref req) => { CheckedRequest::Receipts(ref check, ref req) => {
// empty transactions -> no receipts // empty transactions -> no receipts
if check.0.as_ref().ok().map_or(false, |hdr| hdr.receipts_root() == KECCAK_NULL_RLP) { if check.0.as_ref().ok().map_or(false, |hdr| hdr.receipts_root() == KECCAK_NULL_RLP) {
@ -467,6 +504,7 @@ macro_rules! match_me {
match $me { match $me {
CheckedRequest::HeaderProof($check, $req) => $e, CheckedRequest::HeaderProof($check, $req) => $e,
CheckedRequest::HeaderByHash($check, $req) => $e, CheckedRequest::HeaderByHash($check, $req) => $e,
CheckedRequest::HeaderWithAncestors($check, $req) => $e,
CheckedRequest::TransactionIndex($check, $req) => $e, CheckedRequest::TransactionIndex($check, $req) => $e,
CheckedRequest::Receipts($check, $req) => $e, CheckedRequest::Receipts($check, $req) => $e,
CheckedRequest::Body($check, $req) => $e, CheckedRequest::Body($check, $req) => $e,
@ -496,6 +534,15 @@ impl IncompleteRequest for CheckedRequest {
_ => Ok(()), _ => Ok(()),
} }
} }
CheckedRequest::HeaderWithAncestors(ref check, ref req) => {
req.check_outputs(&mut f)?;
// make sure the output given is definitively a hash.
match check.block_hash {
Field::BackReference(r, idx) => f(r, idx, OutputKind::Hash),
_ => Ok(()),
}
}
CheckedRequest::TransactionIndex(_, ref req) => req.check_outputs(f), CheckedRequest::TransactionIndex(_, ref req) => req.check_outputs(f),
CheckedRequest::Receipts(_, ref req) => req.check_outputs(f), CheckedRequest::Receipts(_, ref req) => req.check_outputs(f),
CheckedRequest::Body(_, ref req) => req.check_outputs(f), CheckedRequest::Body(_, ref req) => req.check_outputs(f),
@ -524,6 +571,10 @@ impl IncompleteRequest for CheckedRequest {
trace!(target: "on_demand", "HeaderByHash request completed {:?}", req); trace!(target: "on_demand", "HeaderByHash request completed {:?}", req);
req.complete().map(CompleteRequest::Headers) req.complete().map(CompleteRequest::Headers)
} }
CheckedRequest::HeaderWithAncestors(_, req) => {
trace!(target: "on_demand", "HeaderWithAncestors request completed {:?}", req);
req.complete().map(CompleteRequest::Headers)
}
CheckedRequest::TransactionIndex(_, req) => { CheckedRequest::TransactionIndex(_, req) => {
trace!(target: "on_demand", "TransactionIndex request completed {:?}", req); trace!(target: "on_demand", "TransactionIndex request completed {:?}", req);
req.complete().map(CompleteRequest::TransactionIndex) req.complete().map(CompleteRequest::TransactionIndex)
@ -587,6 +638,9 @@ impl net_request::CheckedRequest for CheckedRequest {
CheckedRequest::HeaderByHash(ref prover, _) => CheckedRequest::HeaderByHash(ref prover, _) =>
expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) => expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) =>
prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderByHash)), prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderByHash)),
CheckedRequest::HeaderWithAncestors(ref prover, _) =>
expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) =>
prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderWithAncestors)),
CheckedRequest::TransactionIndex(ref prover, _) => CheckedRequest::TransactionIndex(ref prover, _) =>
expect!((&NetResponse::TransactionIndex(ref res), _) => expect!((&NetResponse::TransactionIndex(ref res), _) =>
prover.check_response(cache, res).map(Response::TransactionIndex)), prover.check_response(cache, res).map(Response::TransactionIndex)),
@ -620,6 +674,8 @@ pub enum Response {
HeaderProof((H256, U256)), HeaderProof((H256, U256)),
/// Response to a header-by-hash request. /// Response to a header-by-hash request.
HeaderByHash(encoded::Header), HeaderByHash(encoded::Header),
/// Response to a header-by-hash with ancestors request.
HeaderWithAncestors(Vec<encoded::Header>),
/// Response to a transaction-index request. /// Response to a transaction-index request.
TransactionIndex(net_request::TransactionIndexResponse), TransactionIndex(net_request::TransactionIndexResponse),
/// Response to a receipts request. /// Response to a receipts request.
@ -661,6 +717,10 @@ pub enum Error {
Decoder(::rlp::DecoderError), Decoder(::rlp::DecoderError),
/// Empty response. /// Empty response.
Empty, Empty,
/// Response data length exceeds request max.
TooManyResults(u64, u64),
/// Response data is incomplete.
TooFewResults(u64, u64),
/// Trie lookup error (result of bad proof) /// Trie lookup error (result of bad proof)
Trie(TrieError), Trie(TrieError),
/// Bad inclusion proof /// Bad inclusion proof
@ -677,6 +737,8 @@ pub enum Error {
WrongTrieRoot(H256, H256), WrongTrieRoot(H256, H256),
/// Wrong response kind. /// Wrong response kind.
WrongKind, WrongKind,
/// Wrong sequence of headers.
WrongHeaderSequence,
} }
impl From<::rlp::DecoderError> for Error { impl From<::rlp::DecoderError> for Error {
@ -737,6 +799,65 @@ impl HeaderProof {
} }
} }
/// Request for a header by hash with a range of ancestors.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderWithAncestors {
/// Hash of the last block in the range to fetch.
pub block_hash: Field<H256>,
/// Number of headers before the last block to fetch in addition.
pub ancestor_count: u64,
}
impl HeaderWithAncestors {
/// Check a response for the headers.
pub fn check_response(
&self,
cache: &Mutex<::cache::Cache>,
start: &net_request::HashOrNumber,
headers: &[encoded::Header]
) -> Result<Vec<encoded::Header>, Error> {
let expected_hash = match (self.block_hash, start) {
(Field::Scalar(ref h), &net_request::HashOrNumber::Hash(ref h2)) => {
if h != h2 { return Err(Error::WrongHash(*h, *h2)) }
*h
}
(_, &net_request::HashOrNumber::Hash(h2)) => h2,
_ => return Err(Error::HeaderByNumber),
};
let start_header = headers.first().ok_or(Error::Empty)?;
let start_hash = start_header.hash();
if start_hash != expected_hash {
return Err(Error::WrongHash(expected_hash, start_hash));
}
let expected_len = 1 + cmp::min(self.ancestor_count, start_header.number());
let actual_len = headers.len() as u64;
match actual_len.cmp(&expected_len) {
cmp::Ordering::Less =>
return Err(Error::TooFewResults(expected_len, actual_len)),
cmp::Ordering::Greater =>
return Err(Error::TooManyResults(expected_len, actual_len)),
cmp::Ordering::Equal => (),
};
for (header, prev_header) in headers.iter().zip(headers[1..].iter()) {
if header.number() != prev_header.number() + 1 ||
header.parent_hash() != prev_header.hash()
{
return Err(Error::WrongHeaderSequence)
}
}
let mut cache = cache.lock();
for header in headers {
cache.insert_block_header(header.hash(), header.clone());
}
Ok(headers.to_vec())
}
}
/// Request for a header by hash. /// Request for a header by hash.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderByHash(pub Field<H256>); pub struct HeaderByHash(pub Field<H256>);
@ -1045,6 +1166,83 @@ mod tests {
assert!(HeaderByHash(hash.into()).check_response(&cache, &hash.into(), &[raw_header]).is_ok()) assert!(HeaderByHash(hash.into()).check_response(&cache, &hash.into(), &[raw_header]).is_ok())
} }
#[test]
fn check_header_with_ancestors() {
let mut last_header_hash = H256::default();
let mut headers = (0..11).map(|num| {
let mut header = Header::new();
header.set_number(num);
header.set_parent_hash(last_header_hash);
last_header_hash = header.hash();
header
}).collect::<Vec<_>>();
headers.reverse(); // because responses are in reverse order
let raw_headers = headers.iter()
.map(|hdr| encoded::Header::new(::rlp::encode(hdr).into_vec()))
.collect::<Vec<_>>();
let mut invalid_successor = Header::new();
invalid_successor.set_number(11);
invalid_successor.set_parent_hash(headers[1].hash());
let raw_invalid_successor = encoded::Header::new(::rlp::encode(&invalid_successor).into_vec());
let cache = Mutex::new(make_cache());
let header_with_ancestors = |hash, count| {
HeaderWithAncestors {
block_hash: hash,
ancestor_count: count
}
};
// Correct responses
assert!(header_with_ancestors(headers[0].hash().into(), 0)
.check_response(&cache, &headers[0].hash().into(), &raw_headers[0..1]).is_ok());
assert!(header_with_ancestors(headers[0].hash().into(), 2)
.check_response(&cache, &headers[0].hash().into(), &raw_headers[0..3]).is_ok());
assert!(header_with_ancestors(headers[0].hash().into(), 10)
.check_response(&cache, &headers[0].hash().into(), &raw_headers[0..11]).is_ok());
assert!(header_with_ancestors(headers[2].hash().into(), 2)
.check_response(&cache, &headers[2].hash().into(), &raw_headers[2..5]).is_ok());
assert!(header_with_ancestors(headers[2].hash().into(), 10)
.check_response(&cache, &headers[2].hash().into(), &raw_headers[2..11]).is_ok());
assert!(header_with_ancestors(invalid_successor.hash().into(), 0)
.check_response(&cache, &invalid_successor.hash().into(), &[raw_invalid_successor.clone()]).is_ok());
// Incorrect responses
assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 0)
.check_response(&cache, &headers[0].hash().into(), &raw_headers[0..1]),
Err(Error::WrongHash(invalid_successor.hash(), headers[0].hash())));
assert_eq!(header_with_ancestors(headers[0].hash().into(), 0)
.check_response(&cache, &headers[0].hash().into(), &[]),
Err(Error::Empty));
assert_eq!(header_with_ancestors(headers[0].hash().into(), 10)
.check_response(&cache, &headers[0].hash().into(), &raw_headers[0..10]),
Err(Error::TooFewResults(11, 10)));
assert_eq!(header_with_ancestors(headers[0].hash().into(), 9)
.check_response(&cache, &headers[0].hash().into(), &raw_headers[0..11]),
Err(Error::TooManyResults(10, 11)));
let response = &[raw_headers[0].clone(), raw_headers[2].clone()];
assert_eq!(header_with_ancestors(headers[0].hash().into(), 1)
.check_response(&cache, &headers[0].hash().into(), response),
Err(Error::WrongHeaderSequence));
let response = &[raw_invalid_successor.clone(), raw_headers[0].clone()];
assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 1)
.check_response(&cache, &invalid_successor.hash().into(), response),
Err(Error::WrongHeaderSequence));
let response = &[raw_invalid_successor.clone(), raw_headers[1].clone()];
assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 1)
.check_response(&cache, &invalid_successor.hash().into(), response),
Err(Error::WrongHeaderSequence));
}
#[test] #[test]
fn check_body() { fn check_body() {
use rlp::RlpStream; use rlp::RlpStream;

View File

@ -33,6 +33,9 @@ use transaction_queue::TransactionQueue;
use request; use request;
/// Maximum allowed size of a headers request.
pub const MAX_HEADERS_PER_REQUEST: u64 = 512;
/// Defines the operations that a provider for the light subprotocol must fulfill. /// Defines the operations that a provider for the light subprotocol must fulfill.
pub trait Provider: Send + Sync { pub trait Provider: Send + Sync {
/// Provide current blockchain info. /// Provide current blockchain info.
@ -54,7 +57,6 @@ pub trait Provider: Send + Sync {
/// results within must adhere to the `skip` and `reverse` parameters. /// results within must adhere to the `skip` and `reverse` parameters.
fn block_headers(&self, req: request::CompleteHeadersRequest) -> Option<request::HeadersResponse> { fn block_headers(&self, req: request::CompleteHeadersRequest) -> Option<request::HeadersResponse> {
use request::HashOrNumber; use request::HashOrNumber;
const MAX_HEADERS_TO_SEND: u64 = 512;
if req.max == 0 { return None } if req.max == 0 { return None }
@ -83,7 +85,7 @@ pub trait Provider: Send + Sync {
} }
}; };
let max = ::std::cmp::min(MAX_HEADERS_TO_SEND, req.max); let max = ::std::cmp::min(MAX_HEADERS_PER_REQUEST, req.max);
let headers: Vec<_> = (0u64..max) let headers: Vec<_> = (0u64..max)
.map(|x: u64| x.saturating_mul(req.skip.saturating_add(1))) .map(|x: u64| x.saturating_mul(req.skip.saturating_add(1)))

View File

@ -37,6 +37,7 @@ jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "
ethash = { path = "../ethash" } ethash = { path = "../ethash" }
ethcore = { path = "../ethcore", features = ["test-helpers"] } ethcore = { path = "../ethcore", features = ["test-helpers"] }
fastmap = { path = "../util/fastmap" }
parity-bytes = { git = "https://github.com/paritytech/parity-common" } parity-bytes = { git = "https://github.com/paritytech/parity-common" }
parity-crypto = { git = "https://github.com/paritytech/parity-common" } parity-crypto = { git = "https://github.com/paritytech/parity-common" }
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }

View File

@ -44,6 +44,7 @@ extern crate jsonrpc_pubsub;
extern crate ethash; extern crate ethash;
extern crate ethcore; extern crate ethcore;
extern crate fastmap;
extern crate parity_bytes as bytes; extern crate parity_bytes as bytes;
extern crate parity_crypto as crypto; extern crate parity_crypto as crypto;
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;

View File

@ -101,6 +101,14 @@ pub fn request_rejected_limit() -> Error {
} }
} }
pub fn request_rejected_param_limit(limit: u64, items_desc: &str) -> Error {
Error {
code: ErrorCode::ServerError(codes::REQUEST_REJECTED_LIMIT),
message: format!("Requested data size exceeds limit of {} {}.", limit, items_desc),
data: None,
}
}
pub fn account<T: fmt::Debug>(error: &str, details: T) -> Error { pub fn account<T: fmt::Debug>(error: &str, details: T) -> Error {
Error { Error {
code: ErrorCode::ServerError(codes::ACCOUNT_ERROR), code: ErrorCode::ServerError(codes::ACCOUNT_ERROR),

View File

@ -16,6 +16,7 @@
//! Helpers for fetching blockchain data either from the light client or the network. //! Helpers for fetching blockchain data either from the light client or the network.
use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use ethcore::basic_account::BasicAccount; use ethcore::basic_account::BasicAccount;
@ -31,7 +32,7 @@ use jsonrpc_macros::Trailing;
use light::cache::Cache; use light::cache::Cache;
use light::client::LightChainClient; use light::client::LightChainClient;
use light::cht; use light::{cht, MAX_HEADERS_PER_REQUEST};
use light::on_demand::{ use light::on_demand::{
request, OnDemand, HeaderRef, Request as OnDemandRequest, request, OnDemand, HeaderRef, Request as OnDemandRequest,
Response as OnDemandResponse, ExecutionResult, Response as OnDemandResponse, ExecutionResult,
@ -42,6 +43,7 @@ use sync::LightSync;
use ethereum_types::{U256, Address}; use ethereum_types::{U256, Address};
use hash::H256; use hash::H256;
use parking_lot::Mutex; use parking_lot::Mutex;
use fastmap::H256FastMap;
use transaction::{Action, Transaction as EthTransaction, SignedTransaction, LocalizedTransaction}; use transaction::{Action, Transaction as EthTransaction, SignedTransaction, LocalizedTransaction};
use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch};
@ -299,78 +301,67 @@ impl LightFetch {
use std::collections::BTreeMap; use std::collections::BTreeMap;
use jsonrpc_core::futures::stream::{self, Stream}; use jsonrpc_core::futures::stream::{self, Stream};
// early exit for "to" block before "from" block. const MAX_BLOCK_RANGE: u64 = 1000;
let best_number = self.client.chain_info().best_block_number;
let block_number = |id| match id {
BlockId::Earliest => Some(0),
BlockId::Latest => Some(best_number),
BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()),
BlockId::Number(x) => Some(x),
};
let (from_block_number, from_block_header) = match self.client.block_header(filter.from_block) { let fetcher = self.clone();
Some(from) => (from.number(), from), self.headers_range_by_block_id(filter.from_block, filter.to_block, MAX_BLOCK_RANGE)
None => return Either::A(future::err(errors::unknown_block())), .and_then(move |mut headers| {
}; if headers.is_empty() {
return Either::A(future::ok(Vec::new()));
}
match block_number(filter.to_block) { let on_demand = &fetcher.on_demand;
Some(to) if to < from_block_number || from_block_number > best_number
=> return Either::A(future::ok(Vec::new())),
Some(_) => (),
_ => return Either::A(future::err(errors::unknown_block())),
}
let maybe_future = self.sync.with_context(move |ctx| { let maybe_future = fetcher.sync.with_context(move |ctx| {
// find all headers which match the filter, and fetch the receipts for each one. // find all headers which match the filter, and fetch the receipts for each one.
// match them with their numbers for easy sorting later. // match them with their numbers for easy sorting later.
let bit_combos = filter.bloom_possibilities(); let bit_combos = filter.bloom_possibilities();
let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) let receipts_futures: Vec<_> = headers.drain(..)
.take_while(|ref hdr| hdr.number() != from_block_number) .filter(|ref hdr| {
.chain(Some(from_block_header)) let hdr_bloom = hdr.log_bloom();
.filter(|ref hdr| { bit_combos.iter().any(|bloom| hdr_bloom.contains_bloom(bloom))
let hdr_bloom = hdr.log_bloom(); })
bit_combos.iter().any(|bloom| hdr_bloom.contains_bloom(bloom)) .map(|hdr| (hdr.number(), hdr.hash(), request::BlockReceipts(hdr.into())))
}) .map(|(num, hash, req)| on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, hash, x)))
.map(|hdr| (hdr.number(), hdr.hash(), request::BlockReceipts(hdr.into()))) .collect();
.map(|(num, hash, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, hash, x)))
.collect();
// as the receipts come in, find logs within them which match the filter. // as the receipts come in, find logs within them which match the filter.
// insert them into a BTreeMap to maintain order by number and block index. // insert them into a BTreeMap to maintain order by number and block index.
stream::futures_unordered(receipts_futures) stream::futures_unordered(receipts_futures)
.fold(BTreeMap::new(), move |mut matches, (num, hash, receipts)| { .fold(BTreeMap::new(), move |mut matches, (num, hash, receipts)| {
let mut block_index = 0; let mut block_index = 0;
for (transaction_index, receipt) in receipts.into_iter().enumerate() { for (transaction_index, receipt) in receipts.into_iter().enumerate() {
for (transaction_log_index, log) in receipt.logs.into_iter().enumerate() { for (transaction_log_index, log) in receipt.logs.into_iter().enumerate() {
if filter.matches(&log) { if filter.matches(&log) {
matches.insert((num, block_index), Log { matches.insert((num, block_index), Log {
address: log.address.into(), address: log.address.into(),
topics: log.topics.into_iter().map(Into::into).collect(), topics: log.topics.into_iter().map(Into::into).collect(),
data: log.data.into(), data: log.data.into(),
block_hash: Some(hash.into()), block_hash: Some(hash.into()),
block_number: Some(num.into()), block_number: Some(num.into()),
// No way to easily retrieve transaction hash, so let's just skip it. // No way to easily retrieve transaction hash, so let's just skip it.
transaction_hash: None, transaction_hash: None,
transaction_index: Some(transaction_index.into()), transaction_index: Some(transaction_index.into()),
log_index: Some(block_index.into()), log_index: Some(block_index.into()),
transaction_log_index: Some(transaction_log_index.into()), transaction_log_index: Some(transaction_log_index.into()),
log_type: "mined".into(), log_type: "mined".into(),
removed: false, removed: false,
}); });
}
block_index += 1;
}
} }
block_index += 1; future::ok(matches)
} }) // and then collect them into a vector.
} .map(|matches| matches.into_iter().map(|(_, v)| v).collect())
future::ok(matches) .map_err(errors::on_demand_cancel)
}) // and then collect them into a vector. });
.map(|matches| matches.into_iter().map(|(_, v)| v).collect())
.map_err(errors::on_demand_cancel)
});
match maybe_future { match maybe_future {
Some(fut) => Either::B(Either::A(fut)), Some(fut) => Either::B(Either::A(fut)),
None => Either::B(Either::B(future::err(errors::network_disabled()))), None => Either::B(Either::B(future::err(errors::network_disabled()))),
} }
})
} }
// Get a transaction by hash. also returns the index in the block. // Get a transaction by hash. also returns the index in the block.
@ -448,6 +439,150 @@ impl LightFetch {
None => Box::new(future::err(errors::network_disabled())) as Box<Future<Item = _, Error = _> + Send> None => Box::new(future::err(errors::network_disabled())) as Box<Future<Item = _, Error = _> + Send>
} }
} }
fn headers_range_by_block_id(
&self,
from_block: BlockId,
to_block: BlockId,
max: u64
) -> impl Future<Item = Vec<encoded::Header>, Error = Error> {
let fetch_hashes = [from_block, to_block].iter()
.filter_map(|block_id| match block_id {
BlockId::Hash(hash) => Some(hash.clone()),
_ => None,
})
.collect::<Vec<_>>();
let best_number = self.client.chain_info().best_block_number;
let fetcher = self.clone();
self.headers_by_hash(&fetch_hashes[..]).and_then(move |mut header_map| {
let (from_block_num, to_block_num) = {
let block_number = |id| match id {
&BlockId::Earliest => 0,
&BlockId::Latest => best_number,
&BlockId::Hash(ref h) =>
header_map.get(h).map(|hdr| hdr.number())
.expect("from_block and to_block headers are fetched by hash; this closure is only called on from_block and to_block; qed"),
&BlockId::Number(x) => x,
};
(block_number(&from_block), block_number(&to_block))
};
if to_block_num < from_block_num {
// early exit for "to" block before "from" block.
return Either::A(future::err(errors::filter_block_not_found(to_block)));
} else if to_block_num - from_block_num >= max {
return Either::A(future::err(errors::request_rejected_param_limit(max, "blocks")));
}
let to_header_hint = match to_block {
BlockId::Hash(ref h) => header_map.remove(h),
_ => None,
};
let headers_fut = fetcher.headers_range(from_block_num, to_block_num, to_header_hint);
Either::B(headers_fut.map(move |headers| {
// Validate from_block if it's a hash
let last_hash = headers.last().map(|hdr| hdr.hash());
match (last_hash, from_block) {
(Some(h1), BlockId::Hash(h2)) if h1 != h2 => Vec::new(),
_ => headers,
}
}))
})
}
fn headers_by_hash(&self, hashes: &[H256]) -> impl Future<Item = H256FastMap<encoded::Header>, Error = Error> {
let mut refs = H256FastMap::with_capacity_and_hasher(hashes.len(), Default::default());
let mut reqs = Vec::with_capacity(hashes.len());
for hash in hashes {
refs.entry(*hash).or_insert_with(|| {
self.make_header_requests(BlockId::Hash(*hash), &mut reqs)
.expect("make_header_requests never fails for BlockId::Hash; qed")
});
}
self.send_requests(reqs, move |res| {
let headers = refs.drain()
.map(|(hash, header_ref)| {
let hdr = extract_header(&res, header_ref)
.expect("these responses correspond to requests that header_ref belongs to; \
qed");
(hash, hdr)
})
.collect();
headers
})
}
fn headers_range(
&self,
from_number: u64,
to_number: u64,
to_header_hint: Option<encoded::Header>
) -> impl Future<Item = Vec<encoded::Header>, Error = Error> {
let range_length = (to_number - from_number + 1) as usize;
let mut headers: Vec<encoded::Header> = Vec::with_capacity(range_length);
let iter_start = match to_header_hint {
Some(hdr) => {
let block_id = BlockId::Hash(hdr.parent_hash());
headers.push(hdr);
block_id
}
None => BlockId::Number(to_number),
};
headers.extend(self.client.ancestry_iter(iter_start)
.take_while(|hdr| hdr.number() >= from_number));
let fetcher = self.clone();
future::loop_fn(headers, move |mut headers| {
let remaining = range_length - headers.len();
if remaining == 0 {
return Either::A(future::ok(future::Loop::Break(headers)));
}
let mut reqs: Vec<request::Request> = Vec::with_capacity(2);
let start_hash = if let Some(hdr) = headers.last() {
hdr.parent_hash().into()
} else {
let cht_root = cht::block_to_cht_number(to_number)
.and_then(|cht_num| fetcher.client.cht_root(cht_num as usize));
let cht_root = match cht_root {
Some(cht_root) => cht_root,
None => return Either::A(future::err(errors::unknown_block())),
};
let header_proof = request::HeaderProof::new(to_number, cht_root)
.expect("HeaderProof::new is Some(_) if cht::block_to_cht_number() is Some(_); \
this would return above if block_to_cht_number returned None; qed");
let idx = reqs.len();
let hash_ref = Field::back_ref(idx, 0);
reqs.push(header_proof.into());
hash_ref
};
let max = cmp::min(remaining as u64, MAX_HEADERS_PER_REQUEST);
reqs.push(request::HeaderWithAncestors {
block_hash: start_hash,
ancestor_count: max - 1,
}.into());
Either::B(fetcher.send_requests(reqs, |mut res| {
match res.last_mut() {
Some(&mut OnDemandResponse::HeaderWithAncestors(ref mut res_headers)) =>
headers.extend(res_headers.drain(..)),
_ => panic!("reqs has at least one entry; each request maps to a response; qed"),
};
future::Loop::Continue(headers)
}))
})
}
} }
#[derive(Clone)] #[derive(Clone)]