From 9ed43230cac4788bc2a72efb0e3db792109cca82 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Sat, 25 Aug 2018 14:06:01 -0700 Subject: [PATCH] Better support for eth_getLogs in light mode (#9186) * Light client on-demand request for headers range. * Cache headers in HeaderWithAncestors response. Also fulfills request locally if all headers are in cache. * LightFetch::logs fetches missing headers on demand. * LightFetch::logs limit the number of headers requested at a time. * LightFetch::logs refactor header fetching logic. * Enforce limit on header range length in light client logs request. * Fix light request tests after struct change. * Respond to review comments. --- Cargo.lock | 1 + ethcore/light/src/lib.rs | 2 +- ethcore/light/src/on_demand/mod.rs | 2 + ethcore/light/src/on_demand/request.rs | 198 ++++++++++++++++++ ethcore/light/src/provider.rs | 6 +- rpc/Cargo.toml | 1 + rpc/src/lib.rs | 1 + rpc/src/v1/helpers/errors.rs | 8 + rpc/src/v1/helpers/light_fetch.rs | 269 +++++++++++++++++++------ 9 files changed, 418 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbedd2fed..d02cff5ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2146,6 +2146,7 @@ dependencies = [ "ethstore 0.2.0", "fake-fetch 0.0.1", "fake-hardware-wallet 0.0.1", + "fastmap 0.1.0", "fetch 0.1.0", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index e151267a9..5510ca4aa 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -43,7 +43,7 @@ pub mod provider; mod types; pub use self::cache::Cache; -pub use self::provider::Provider; +pub use self::provider::{Provider, MAX_HEADERS_PER_REQUEST}; pub use self::transaction_queue::TransactionQueue; pub use types::request as request; diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 594d0dee4..a78adb1ed 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -204,6 +204,8 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities { caps.serve_headers = true, CheckedRequest::HeaderByHash(_, _) => caps.serve_headers = true, + CheckedRequest::HeaderWithAncestors(_, _) => + caps.serve_headers = true, CheckedRequest::TransactionIndex(_, _) => {} // hashes yield no info. CheckedRequest::Signal(_, _) => caps.serve_headers = true, diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index c305dea94..f3a451e6b 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -16,6 +16,7 @@ //! Request types, verification, and verification errors. +use std::cmp; use std::sync::Arc; use bytes::Bytes; @@ -47,6 +48,8 @@ pub enum Request { HeaderProof(HeaderProof), /// A request for a header by hash. HeaderByHash(HeaderByHash), + /// A request for a header by hash with a range of its ancestors. + HeaderWithAncestors(HeaderWithAncestors), /// A request for the index of a transaction. TransactionIndex(TransactionIndex), /// A request for block receipts. @@ -136,6 +139,7 @@ macro_rules! impl_single { // implement traits for each kind of request. impl_single!(HeaderProof, HeaderProof, (H256, U256)); impl_single!(HeaderByHash, HeaderByHash, encoded::Header); +impl_single!(HeaderWithAncestors, HeaderWithAncestors, Vec); impl_single!(TransactionIndex, TransactionIndex, net_request::TransactionIndexResponse); impl_single!(Receipts, BlockReceipts, Vec); impl_single!(Body, Body, encoded::Block); @@ -246,6 +250,7 @@ impl From for HeaderRef { pub enum CheckedRequest { HeaderProof(HeaderProof, net_request::IncompleteHeaderProofRequest), HeaderByHash(HeaderByHash, net_request::IncompleteHeadersRequest), + HeaderWithAncestors(HeaderWithAncestors, net_request::IncompleteHeadersRequest), TransactionIndex(TransactionIndex, net_request::IncompleteTransactionIndexRequest), Receipts(BlockReceipts, net_request::IncompleteReceiptsRequest), Body(Body, net_request::IncompleteBodyRequest), @@ -268,6 +273,16 @@ impl From for CheckedRequest { trace!(target: "on_demand", "HeaderByHash Request, {:?}", net_req); CheckedRequest::HeaderByHash(req, net_req) } + Request::HeaderWithAncestors(req) => { + let net_req = net_request::IncompleteHeadersRequest { + start: req.block_hash.map(Into::into), + skip: 0, + max: req.ancestor_count + 1, + reverse: true, + }; + trace!(target: "on_demand", "HeaderWithAncestors Request, {:?}", net_req); + CheckedRequest::HeaderWithAncestors(req, net_req) + } Request::HeaderProof(req) => { let net_req = net_request::IncompleteHeaderProofRequest { num: req.num().into(), @@ -344,6 +359,7 @@ impl CheckedRequest { match self { CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req), CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req), + CheckedRequest::HeaderWithAncestors(_, req) => NetRequest::Headers(req), CheckedRequest::TransactionIndex(_, req) => NetRequest::TransactionIndex(req), CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req), CheckedRequest::Body(_, req) => NetRequest::Body(req), @@ -399,6 +415,27 @@ impl CheckedRequest { None } + CheckedRequest::HeaderWithAncestors(_, ref req) => { + if req.skip != 1 || !req.reverse { + return None; + } + + if let Some(&net_request::HashOrNumber::Hash(start)) = req.start.as_ref() { + let mut result = Vec::with_capacity(req.max as usize); + let mut hash = start; + let mut cache = cache.lock(); + for _ in 0..req.max { + match cache.block_header(&hash) { + Some(header) => { + hash = header.parent_hash(); + result.push(header); + } + None => return None, + } + } + Some(Response::HeaderWithAncestors(result)) + } else { None } + } CheckedRequest::Receipts(ref check, ref req) => { // empty transactions -> no receipts if check.0.as_ref().ok().map_or(false, |hdr| hdr.receipts_root() == KECCAK_NULL_RLP) { @@ -467,6 +504,7 @@ macro_rules! match_me { match $me { CheckedRequest::HeaderProof($check, $req) => $e, CheckedRequest::HeaderByHash($check, $req) => $e, + CheckedRequest::HeaderWithAncestors($check, $req) => $e, CheckedRequest::TransactionIndex($check, $req) => $e, CheckedRequest::Receipts($check, $req) => $e, CheckedRequest::Body($check, $req) => $e, @@ -496,6 +534,15 @@ impl IncompleteRequest for CheckedRequest { _ => Ok(()), } } + CheckedRequest::HeaderWithAncestors(ref check, ref req) => { + req.check_outputs(&mut f)?; + + // make sure the output given is definitively a hash. + match check.block_hash { + Field::BackReference(r, idx) => f(r, idx, OutputKind::Hash), + _ => Ok(()), + } + } CheckedRequest::TransactionIndex(_, ref req) => req.check_outputs(f), CheckedRequest::Receipts(_, ref req) => req.check_outputs(f), CheckedRequest::Body(_, ref req) => req.check_outputs(f), @@ -524,6 +571,10 @@ impl IncompleteRequest for CheckedRequest { trace!(target: "on_demand", "HeaderByHash request completed {:?}", req); req.complete().map(CompleteRequest::Headers) } + CheckedRequest::HeaderWithAncestors(_, req) => { + trace!(target: "on_demand", "HeaderWithAncestors request completed {:?}", req); + req.complete().map(CompleteRequest::Headers) + } CheckedRequest::TransactionIndex(_, req) => { trace!(target: "on_demand", "TransactionIndex request completed {:?}", req); req.complete().map(CompleteRequest::TransactionIndex) @@ -587,6 +638,9 @@ impl net_request::CheckedRequest for CheckedRequest { CheckedRequest::HeaderByHash(ref prover, _) => expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) => prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderByHash)), + CheckedRequest::HeaderWithAncestors(ref prover, _) => + expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) => + prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderWithAncestors)), CheckedRequest::TransactionIndex(ref prover, _) => expect!((&NetResponse::TransactionIndex(ref res), _) => prover.check_response(cache, res).map(Response::TransactionIndex)), @@ -620,6 +674,8 @@ pub enum Response { HeaderProof((H256, U256)), /// Response to a header-by-hash request. HeaderByHash(encoded::Header), + /// Response to a header-by-hash with ancestors request. + HeaderWithAncestors(Vec), /// Response to a transaction-index request. TransactionIndex(net_request::TransactionIndexResponse), /// Response to a receipts request. @@ -661,6 +717,10 @@ pub enum Error { Decoder(::rlp::DecoderError), /// Empty response. Empty, + /// Response data length exceeds request max. + TooManyResults(u64, u64), + /// Response data is incomplete. + TooFewResults(u64, u64), /// Trie lookup error (result of bad proof) Trie(TrieError), /// Bad inclusion proof @@ -677,6 +737,8 @@ pub enum Error { WrongTrieRoot(H256, H256), /// Wrong response kind. WrongKind, + /// Wrong sequence of headers. + WrongHeaderSequence, } impl From<::rlp::DecoderError> for Error { @@ -737,6 +799,65 @@ impl HeaderProof { } } +/// Request for a header by hash with a range of ancestors. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HeaderWithAncestors { + /// Hash of the last block in the range to fetch. + pub block_hash: Field, + /// Number of headers before the last block to fetch in addition. + pub ancestor_count: u64, +} + +impl HeaderWithAncestors { + /// Check a response for the headers. + pub fn check_response( + &self, + cache: &Mutex<::cache::Cache>, + start: &net_request::HashOrNumber, + headers: &[encoded::Header] + ) -> Result, Error> { + let expected_hash = match (self.block_hash, start) { + (Field::Scalar(ref h), &net_request::HashOrNumber::Hash(ref h2)) => { + if h != h2 { return Err(Error::WrongHash(*h, *h2)) } + *h + } + (_, &net_request::HashOrNumber::Hash(h2)) => h2, + _ => return Err(Error::HeaderByNumber), + }; + + let start_header = headers.first().ok_or(Error::Empty)?; + let start_hash = start_header.hash(); + if start_hash != expected_hash { + return Err(Error::WrongHash(expected_hash, start_hash)); + } + + let expected_len = 1 + cmp::min(self.ancestor_count, start_header.number()); + let actual_len = headers.len() as u64; + match actual_len.cmp(&expected_len) { + cmp::Ordering::Less => + return Err(Error::TooFewResults(expected_len, actual_len)), + cmp::Ordering::Greater => + return Err(Error::TooManyResults(expected_len, actual_len)), + cmp::Ordering::Equal => (), + }; + + for (header, prev_header) in headers.iter().zip(headers[1..].iter()) { + if header.number() != prev_header.number() + 1 || + header.parent_hash() != prev_header.hash() + { + return Err(Error::WrongHeaderSequence) + } + } + + let mut cache = cache.lock(); + for header in headers { + cache.insert_block_header(header.hash(), header.clone()); + } + + Ok(headers.to_vec()) + } +} + /// Request for a header by hash. #[derive(Debug, Clone, PartialEq, Eq)] pub struct HeaderByHash(pub Field); @@ -1045,6 +1166,83 @@ mod tests { assert!(HeaderByHash(hash.into()).check_response(&cache, &hash.into(), &[raw_header]).is_ok()) } + #[test] + fn check_header_with_ancestors() { + let mut last_header_hash = H256::default(); + let mut headers = (0..11).map(|num| { + let mut header = Header::new(); + header.set_number(num); + header.set_parent_hash(last_header_hash); + + last_header_hash = header.hash(); + header + }).collect::>(); + + headers.reverse(); // because responses are in reverse order + + let raw_headers = headers.iter() + .map(|hdr| encoded::Header::new(::rlp::encode(hdr).into_vec())) + .collect::>(); + + let mut invalid_successor = Header::new(); + invalid_successor.set_number(11); + invalid_successor.set_parent_hash(headers[1].hash()); + + let raw_invalid_successor = encoded::Header::new(::rlp::encode(&invalid_successor).into_vec()); + + let cache = Mutex::new(make_cache()); + + let header_with_ancestors = |hash, count| { + HeaderWithAncestors { + block_hash: hash, + ancestor_count: count + } + }; + + // Correct responses + assert!(header_with_ancestors(headers[0].hash().into(), 0) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..1]).is_ok()); + assert!(header_with_ancestors(headers[0].hash().into(), 2) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..3]).is_ok()); + assert!(header_with_ancestors(headers[0].hash().into(), 10) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..11]).is_ok()); + assert!(header_with_ancestors(headers[2].hash().into(), 2) + .check_response(&cache, &headers[2].hash().into(), &raw_headers[2..5]).is_ok()); + assert!(header_with_ancestors(headers[2].hash().into(), 10) + .check_response(&cache, &headers[2].hash().into(), &raw_headers[2..11]).is_ok()); + assert!(header_with_ancestors(invalid_successor.hash().into(), 0) + .check_response(&cache, &invalid_successor.hash().into(), &[raw_invalid_successor.clone()]).is_ok()); + + // Incorrect responses + assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 0) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..1]), + Err(Error::WrongHash(invalid_successor.hash(), headers[0].hash()))); + assert_eq!(header_with_ancestors(headers[0].hash().into(), 0) + .check_response(&cache, &headers[0].hash().into(), &[]), + Err(Error::Empty)); + assert_eq!(header_with_ancestors(headers[0].hash().into(), 10) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..10]), + Err(Error::TooFewResults(11, 10))); + assert_eq!(header_with_ancestors(headers[0].hash().into(), 9) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..11]), + Err(Error::TooManyResults(10, 11))); + + let response = &[raw_headers[0].clone(), raw_headers[2].clone()]; + assert_eq!(header_with_ancestors(headers[0].hash().into(), 1) + .check_response(&cache, &headers[0].hash().into(), response), + Err(Error::WrongHeaderSequence)); + + let response = &[raw_invalid_successor.clone(), raw_headers[0].clone()]; + assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 1) + .check_response(&cache, &invalid_successor.hash().into(), response), + Err(Error::WrongHeaderSequence)); + + let response = &[raw_invalid_successor.clone(), raw_headers[1].clone()]; + assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 1) + .check_response(&cache, &invalid_successor.hash().into(), response), + Err(Error::WrongHeaderSequence)); + } + #[test] fn check_body() { use rlp::RlpStream; diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index a066cefb5..90cbe95b6 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -33,6 +33,9 @@ use transaction_queue::TransactionQueue; use request; +/// Maximum allowed size of a headers request. +pub const MAX_HEADERS_PER_REQUEST: u64 = 512; + /// Defines the operations that a provider for the light subprotocol must fulfill. pub trait Provider: Send + Sync { /// Provide current blockchain info. @@ -54,7 +57,6 @@ pub trait Provider: Send + Sync { /// results within must adhere to the `skip` and `reverse` parameters. fn block_headers(&self, req: request::CompleteHeadersRequest) -> Option { use request::HashOrNumber; - const MAX_HEADERS_TO_SEND: u64 = 512; if req.max == 0 { return None } @@ -83,7 +85,7 @@ pub trait Provider: Send + Sync { } }; - let max = ::std::cmp::min(MAX_HEADERS_TO_SEND, req.max); + let max = ::std::cmp::min(MAX_HEADERS_PER_REQUEST, req.max); let headers: Vec<_> = (0u64..max) .map(|x: u64| x.saturating_mul(req.skip.saturating_add(1))) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index b896a44ae..3122c5e30 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -37,6 +37,7 @@ jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = " ethash = { path = "../ethash" } ethcore = { path = "../ethcore", features = ["test-helpers"] } +fastmap = { path = "../util/fastmap" } parity-bytes = { git = "https://github.com/paritytech/parity-common" } parity-crypto = { git = "https://github.com/paritytech/parity-common" } ethcore-devtools = { path = "../devtools" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 2e731cd34..2f3f4968e 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -44,6 +44,7 @@ extern crate jsonrpc_pubsub; extern crate ethash; extern crate ethcore; +extern crate fastmap; extern crate parity_bytes as bytes; extern crate parity_crypto as crypto; extern crate ethcore_devtools as devtools; diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index 4afd40ff8..33575fcf1 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -101,6 +101,14 @@ pub fn request_rejected_limit() -> Error { } } +pub fn request_rejected_param_limit(limit: u64, items_desc: &str) -> Error { + Error { + code: ErrorCode::ServerError(codes::REQUEST_REJECTED_LIMIT), + message: format!("Requested data size exceeds limit of {} {}.", limit, items_desc), + data: None, + } +} + pub fn account(error: &str, details: T) -> Error { Error { code: ErrorCode::ServerError(codes::ACCOUNT_ERROR), diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 51fb0a5f8..6b1ecf493 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -16,6 +16,7 @@ //! Helpers for fetching blockchain data either from the light client or the network. +use std::cmp; use std::sync::Arc; use ethcore::basic_account::BasicAccount; @@ -31,7 +32,7 @@ use jsonrpc_macros::Trailing; use light::cache::Cache; use light::client::LightChainClient; -use light::cht; +use light::{cht, MAX_HEADERS_PER_REQUEST}; use light::on_demand::{ request, OnDemand, HeaderRef, Request as OnDemandRequest, Response as OnDemandResponse, ExecutionResult, @@ -42,6 +43,7 @@ use sync::LightSync; use ethereum_types::{U256, Address}; use hash::H256; use parking_lot::Mutex; +use fastmap::H256FastMap; use transaction::{Action, Transaction as EthTransaction, SignedTransaction, LocalizedTransaction}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; @@ -299,78 +301,67 @@ impl LightFetch { use std::collections::BTreeMap; use jsonrpc_core::futures::stream::{self, Stream}; - // early exit for "to" block before "from" block. - let best_number = self.client.chain_info().best_block_number; - let block_number = |id| match id { - BlockId::Earliest => Some(0), - BlockId::Latest => Some(best_number), - BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()), - BlockId::Number(x) => Some(x), - }; + const MAX_BLOCK_RANGE: u64 = 1000; - let (from_block_number, from_block_header) = match self.client.block_header(filter.from_block) { - Some(from) => (from.number(), from), - None => return Either::A(future::err(errors::unknown_block())), - }; + let fetcher = self.clone(); + self.headers_range_by_block_id(filter.from_block, filter.to_block, MAX_BLOCK_RANGE) + .and_then(move |mut headers| { + if headers.is_empty() { + return Either::A(future::ok(Vec::new())); + } - match block_number(filter.to_block) { - Some(to) if to < from_block_number || from_block_number > best_number - => return Either::A(future::ok(Vec::new())), - Some(_) => (), - _ => return Either::A(future::err(errors::unknown_block())), - } + let on_demand = &fetcher.on_demand; - let maybe_future = self.sync.with_context(move |ctx| { - // find all headers which match the filter, and fetch the receipts for each one. - // match them with their numbers for easy sorting later. - let bit_combos = filter.bloom_possibilities(); - let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) - .take_while(|ref hdr| hdr.number() != from_block_number) - .chain(Some(from_block_header)) - .filter(|ref hdr| { - let hdr_bloom = hdr.log_bloom(); - bit_combos.iter().any(|bloom| hdr_bloom.contains_bloom(bloom)) - }) - .map(|hdr| (hdr.number(), hdr.hash(), request::BlockReceipts(hdr.into()))) - .map(|(num, hash, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, hash, x))) - .collect(); + let maybe_future = fetcher.sync.with_context(move |ctx| { + // find all headers which match the filter, and fetch the receipts for each one. + // match them with their numbers for easy sorting later. + let bit_combos = filter.bloom_possibilities(); + let receipts_futures: Vec<_> = headers.drain(..) + .filter(|ref hdr| { + let hdr_bloom = hdr.log_bloom(); + bit_combos.iter().any(|bloom| hdr_bloom.contains_bloom(bloom)) + }) + .map(|hdr| (hdr.number(), hdr.hash(), request::BlockReceipts(hdr.into()))) + .map(|(num, hash, req)| on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, hash, x))) + .collect(); - // as the receipts come in, find logs within them which match the filter. - // insert them into a BTreeMap to maintain order by number and block index. - stream::futures_unordered(receipts_futures) - .fold(BTreeMap::new(), move |mut matches, (num, hash, receipts)| { - let mut block_index = 0; - for (transaction_index, receipt) in receipts.into_iter().enumerate() { - for (transaction_log_index, log) in receipt.logs.into_iter().enumerate() { - if filter.matches(&log) { - matches.insert((num, block_index), Log { - address: log.address.into(), - topics: log.topics.into_iter().map(Into::into).collect(), - data: log.data.into(), - block_hash: Some(hash.into()), - block_number: Some(num.into()), - // No way to easily retrieve transaction hash, so let's just skip it. - transaction_hash: None, - transaction_index: Some(transaction_index.into()), - log_index: Some(block_index.into()), - transaction_log_index: Some(transaction_log_index.into()), - log_type: "mined".into(), - removed: false, - }); + // as the receipts come in, find logs within them which match the filter. + // insert them into a BTreeMap to maintain order by number and block index. + stream::futures_unordered(receipts_futures) + .fold(BTreeMap::new(), move |mut matches, (num, hash, receipts)| { + let mut block_index = 0; + for (transaction_index, receipt) in receipts.into_iter().enumerate() { + for (transaction_log_index, log) in receipt.logs.into_iter().enumerate() { + if filter.matches(&log) { + matches.insert((num, block_index), Log { + address: log.address.into(), + topics: log.topics.into_iter().map(Into::into).collect(), + data: log.data.into(), + block_hash: Some(hash.into()), + block_number: Some(num.into()), + // No way to easily retrieve transaction hash, so let's just skip it. + transaction_hash: None, + transaction_index: Some(transaction_index.into()), + log_index: Some(block_index.into()), + transaction_log_index: Some(transaction_log_index.into()), + log_type: "mined".into(), + removed: false, + }); + } + block_index += 1; + } } - block_index += 1; - } - } - future::ok(matches) - }) // and then collect them into a vector. - .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) - .map_err(errors::on_demand_cancel) - }); + future::ok(matches) + }) // and then collect them into a vector. + .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) + .map_err(errors::on_demand_cancel) + }); - match maybe_future { - Some(fut) => Either::B(Either::A(fut)), - None => Either::B(Either::B(future::err(errors::network_disabled()))), - } + match maybe_future { + Some(fut) => Either::B(Either::A(fut)), + None => Either::B(Either::B(future::err(errors::network_disabled()))), + } + }) } // Get a transaction by hash. also returns the index in the block. @@ -448,6 +439,150 @@ impl LightFetch { None => Box::new(future::err(errors::network_disabled())) as Box + Send> } } + + fn headers_range_by_block_id( + &self, + from_block: BlockId, + to_block: BlockId, + max: u64 + ) -> impl Future, Error = Error> { + let fetch_hashes = [from_block, to_block].iter() + .filter_map(|block_id| match block_id { + BlockId::Hash(hash) => Some(hash.clone()), + _ => None, + }) + .collect::>(); + + let best_number = self.client.chain_info().best_block_number; + + let fetcher = self.clone(); + self.headers_by_hash(&fetch_hashes[..]).and_then(move |mut header_map| { + let (from_block_num, to_block_num) = { + let block_number = |id| match id { + &BlockId::Earliest => 0, + &BlockId::Latest => best_number, + &BlockId::Hash(ref h) => + header_map.get(h).map(|hdr| hdr.number()) + .expect("from_block and to_block headers are fetched by hash; this closure is only called on from_block and to_block; qed"), + &BlockId::Number(x) => x, + }; + (block_number(&from_block), block_number(&to_block)) + }; + + if to_block_num < from_block_num { + // early exit for "to" block before "from" block. + return Either::A(future::err(errors::filter_block_not_found(to_block))); + } else if to_block_num - from_block_num >= max { + return Either::A(future::err(errors::request_rejected_param_limit(max, "blocks"))); + } + + let to_header_hint = match to_block { + BlockId::Hash(ref h) => header_map.remove(h), + _ => None, + }; + let headers_fut = fetcher.headers_range(from_block_num, to_block_num, to_header_hint); + Either::B(headers_fut.map(move |headers| { + // Validate from_block if it's a hash + let last_hash = headers.last().map(|hdr| hdr.hash()); + match (last_hash, from_block) { + (Some(h1), BlockId::Hash(h2)) if h1 != h2 => Vec::new(), + _ => headers, + } + })) + }) + } + + fn headers_by_hash(&self, hashes: &[H256]) -> impl Future, Error = Error> { + let mut refs = H256FastMap::with_capacity_and_hasher(hashes.len(), Default::default()); + let mut reqs = Vec::with_capacity(hashes.len()); + + for hash in hashes { + refs.entry(*hash).or_insert_with(|| { + self.make_header_requests(BlockId::Hash(*hash), &mut reqs) + .expect("make_header_requests never fails for BlockId::Hash; qed") + }); + } + + self.send_requests(reqs, move |res| { + let headers = refs.drain() + .map(|(hash, header_ref)| { + let hdr = extract_header(&res, header_ref) + .expect("these responses correspond to requests that header_ref belongs to; \ + qed"); + (hash, hdr) + }) + .collect(); + headers + }) + } + + fn headers_range( + &self, + from_number: u64, + to_number: u64, + to_header_hint: Option + ) -> impl Future, Error = Error> { + let range_length = (to_number - from_number + 1) as usize; + let mut headers: Vec = Vec::with_capacity(range_length); + + let iter_start = match to_header_hint { + Some(hdr) => { + let block_id = BlockId::Hash(hdr.parent_hash()); + headers.push(hdr); + block_id + } + None => BlockId::Number(to_number), + }; + headers.extend(self.client.ancestry_iter(iter_start) + .take_while(|hdr| hdr.number() >= from_number)); + + let fetcher = self.clone(); + future::loop_fn(headers, move |mut headers| { + let remaining = range_length - headers.len(); + if remaining == 0 { + return Either::A(future::ok(future::Loop::Break(headers))); + } + + let mut reqs: Vec = Vec::with_capacity(2); + + let start_hash = if let Some(hdr) = headers.last() { + hdr.parent_hash().into() + } else { + let cht_root = cht::block_to_cht_number(to_number) + .and_then(|cht_num| fetcher.client.cht_root(cht_num as usize)); + + let cht_root = match cht_root { + Some(cht_root) => cht_root, + None => return Either::A(future::err(errors::unknown_block())), + }; + + let header_proof = request::HeaderProof::new(to_number, cht_root) + .expect("HeaderProof::new is Some(_) if cht::block_to_cht_number() is Some(_); \ + this would return above if block_to_cht_number returned None; qed"); + + let idx = reqs.len(); + let hash_ref = Field::back_ref(idx, 0); + reqs.push(header_proof.into()); + + hash_ref + }; + + let max = cmp::min(remaining as u64, MAX_HEADERS_PER_REQUEST); + reqs.push(request::HeaderWithAncestors { + block_hash: start_hash, + ancestor_count: max - 1, + }.into()); + + Either::B(fetcher.send_requests(reqs, |mut res| { + match res.last_mut() { + Some(&mut OnDemandResponse::HeaderWithAncestors(ref mut res_headers)) => + headers.extend(res_headers.drain(..)), + _ => panic!("reqs has at least one entry; each request maps to a response; qed"), + }; + future::Loop::Continue(headers) + })) + }) + } } #[derive(Clone)]