From f4091681267b707a9a59861436422edf992be2ce Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 28 Mar 2017 15:42:23 +0200 Subject: [PATCH] log fetching for light client --- rpc/src/v1/impls/eth.rs | 10 +-- rpc/src/v1/impls/eth_filter.rs | 4 +- rpc/src/v1/impls/light/eth.rs | 111 +++++++++++++++++++++++++++++++-- rpc/src/v1/traits/eth.rs | 4 +- 4 files changed, 115 insertions(+), 14 deletions(-) diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 811d5aa90..677a50535 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -537,23 +537,23 @@ impl Eth for EthClient where Err(errors::deprecated("Compilation functionality is deprecated.".to_string())) } - fn logs(&self, filter: Filter) -> Result, Error> { + fn logs(&self, filter: Filter) -> BoxFuture, Error> { let include_pending = filter.to_block == Some(BlockNumber::Pending); let filter: EthcoreFilter = filter.into(); - let mut logs = take_weak!(self.client).logs(filter.clone()) + let mut logs = take_weakf!(self.client).logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); if include_pending { - let best_block = take_weak!(self.client).chain_info().best_block_number; - let pending = pending_logs(&*take_weak!(self.miner), best_block, &filter); + let best_block = take_weakf!(self.client).chain_info().best_block_number; + let pending = pending_logs(&*take_weakf!(self.miner), best_block, &filter); logs.extend(pending); } let logs = limit_logs(logs, filter.limit); - Ok(logs) + future::ok(logs).boxed() } fn work(&self, no_new_work_timeout: Trailing) -> Result { diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index 9a7281243..8f448feb5 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -53,7 +53,7 @@ pub trait Filterable { fn polls(&self) -> &Mutex>; } -/// Eth filter rpc implementation. +/// Eth filter rpc implementation for a full node. pub struct EthFilterClient where C: BlockChainClient, M: MinerService { @@ -98,6 +98,8 @@ impl Filterable for EthFilterClient where C: BlockChainClient, M: Mi fn polls(&self) -> &Mutex> { &self.polls } } + + impl EthFilter for T { fn new_filter(&self, filter: Filter) -> Result { let mut polls = self.polls().lock(); diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 1851f479e..11747873e 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -34,6 +34,7 @@ use ethcore::basic_account::BasicAccount; use ethcore::encoded; use ethcore::executed::{Executed, ExecutionError}; use ethcore::ids::BlockId; +use ethcore::filter::Filter as EthcoreFilter; use ethcore::transaction::{Action, SignedTransaction, Transaction as EthTransaction}; use ethsync::LightSync; use rlp::UntrustedRlp; @@ -43,7 +44,9 @@ use util::{RwLock, Mutex, Uint, U256}; use futures::{future, Future, BoxFuture, IntoFuture}; use futures::sync::oneshot; +use v1::impls::eth_filter::Filterable; use v1::helpers::{CallRequest as CRequest, errors, limit_logs, dispatch}; +use v1::helpers::{PollFilter, PollManager}; use v1::helpers::block_import::is_major_importing; use v1::traits::Eth; use v1::types::{ @@ -55,7 +58,7 @@ use v1::metadata::Metadata; use util::Address; -/// Light client `ETH` RPC. +/// Light client `ETH` (and filter) RPC. pub struct EthClient { sync: Arc, client: Arc, @@ -63,6 +66,22 @@ pub struct EthClient { transaction_queue: Arc>, accounts: Arc, cache: Arc>, + polls: Mutex>, +} + +impl Clone for EthClient { + fn clone(&self) -> Self { + // each instance should have its own poll manager. + EthClient { + sync: self.sync.clone(), + client: self.client.clone(), + on_demand: self.on_demand.clone(), + transaction_queue: self.transaction_queue.clone(), + accounts: self.accounts.clone(), + cache: self.cache.clone(), + polls: Mutex::new(PollManager::new()), + } + } } // helper for internal error: on demand sender cancelled. @@ -90,6 +109,7 @@ impl EthClient { transaction_queue: transaction_queue, accounts: accounts, cache: cache, + polls: Mutex::new(PollManager::new()), } } @@ -484,19 +504,98 @@ impl Eth for EthClient { Err(errors::deprecated("Compilation of Solidity via RPC is deprecated".to_string())) } - fn logs(&self, _filter: Filter) -> Result, Error> { - Err(errors::unimplemented(None)) + fn logs(&self, filter: Filter) -> BoxFuture, Error> { + let limit = filter.limit; + + Filterable::logs(self, filter.into()) + .map(move|logs| limit_logs(logs, limit)) + .boxed() } fn work(&self, _timeout: Trailing) -> Result { - Err(errors::unimplemented(None)) + Err(errors::light_unimplemented(None)) } fn submit_work(&self, _nonce: RpcH64, _pow_hash: RpcH256, _mix_hash: RpcH256) -> Result { - Err(errors::unimplemented(None)) + Err(errors::light_unimplemented(None)) } fn submit_hashrate(&self, _rate: RpcU256, _id: RpcH256) -> Result { - Err(errors::unimplemented(None)) + Err(errors::light_unimplemented(None)) + } +} + +// This trait implementation triggers a blanked impl of `EthFilter`. +impl Filterable for EthClient { + fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number } + + fn block_hash(&self, id: BlockId) -> Option { + self.client.block_hash(id).map(Into::into) + } + + fn pending_transactions_hashes(&self, _block_number: u64) -> Vec<::util::H256> { + Vec::new() + } + + fn logs(&self, filter: EthcoreFilter) -> BoxFuture, Error> { + use std::collections::BTreeMap; + + use futures::stream::{self, Stream}; + use util::H2048; + + // early exit for "to" block before "from" block. + match filter.from_block { + BlockId::Latest | BlockId::Pending => { + let best = self.client.best_block_header(); + let chain_info = self.client.chain_info(); + if best.number() != chain_info.best_block_number || best.hash() != chain_info.best_block_hash { + return future::ok(Vec::new()).boxed() + } + } + _ => {} + } + + let maybe_future = self.sync.with_context(move |ctx| { + // find all headers which match the filter, and fetch the receipts for each one. + // match them with their numbers for easy sorting later. + let bit_combos = filter.bloom_possibilities(); + let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) + .take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block) + .take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block) + .filter(|ref hdr| { + let hdr_bloom = hdr.log_bloom(); + bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some() + }) + .map(|hdr| (hdr.number(), request::BlockReceipts(hdr))) + .map(|(num, req)| self.on_demand.block_receipts(ctx, req).map(move |x| (num, x))) + .collect(); + + // as the receipts come in, find logs within them which match the filter. + // insert them into a BTreeMap to maintain order by number and block index. + stream::futures_unordered(receipts_futures) + .fold(BTreeMap::new(), move |mut matches, (num, receipts)| { + for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() { + if filter.matches(&log) { + matches.insert((num, block_index), log.into()); + } + } + future::ok(matches) + }) // and then collect them into a vector. + .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) + .map_err(err_premature_cancel) + }); + + match maybe_future { + Some(fut) => fut.boxed(), + None => future::err(errors::network_disabled()).boxed(), + } + } + + fn pending_logs(&self, _block_number: u64, _filter: &EthcoreFilter) -> Vec { + Vec::new() // light clients don't mine. + } + + fn polls(&self) -> &Mutex> { + &self.polls } } diff --git a/rpc/src/v1/traits/eth.rs b/rpc/src/v1/traits/eth.rs index 7f21829c7..941263335 100644 --- a/rpc/src/v1/traits/eth.rs +++ b/rpc/src/v1/traits/eth.rs @@ -162,8 +162,8 @@ build_rpc_trait! { fn compile_serpent(&self, String) -> Result; /// Returns logs matching given filter object. - #[rpc(name = "eth_getLogs")] - fn logs(&self, Filter) -> Result, Error>; + #[rpc(async, name = "eth_getLogs")] + fn logs(&self, Filter) -> BoxFuture, Error>; /// Returns the hash of the current block, the seedHash, and the boundary condition to be met. #[rpc(name = "eth_getWork")]