log fetching for light client

This commit is contained in:
Robert Habermeier 2017-03-28 15:42:23 +02:00
parent a6b6c312b8
commit f409168126
4 changed files with 115 additions and 14 deletions

View File

@ -537,23 +537,23 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
Err(errors::deprecated("Compilation functionality is deprecated.".to_string()))
}
fn logs(&self, filter: Filter) -> Result<Vec<Log>, Error> {
fn logs(&self, filter: Filter) -> BoxFuture<Vec<Log>, Error> {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone())
let mut logs = take_weakf!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
if include_pending {
let best_block = take_weak!(self.client).chain_info().best_block_number;
let pending = pending_logs(&*take_weak!(self.miner), best_block, &filter);
let best_block = take_weakf!(self.client).chain_info().best_block_number;
let pending = pending_logs(&*take_weakf!(self.miner), best_block, &filter);
logs.extend(pending);
}
let logs = limit_logs(logs, filter.limit);
Ok(logs)
future::ok(logs).boxed()
}
fn work(&self, no_new_work_timeout: Trailing<u64>) -> Result<Work, Error> {

View File

@ -53,7 +53,7 @@ pub trait Filterable {
fn polls(&self) -> &Mutex<PollManager<PollFilter>>;
}
/// Eth filter rpc implementation.
/// Eth filter rpc implementation for a full node.
pub struct EthFilterClient<C, M> where
C: BlockChainClient,
M: MinerService {
@ -98,6 +98,8 @@ impl<C, M> Filterable for EthFilterClient<C, M> where C: BlockChainClient, M: Mi
fn polls(&self) -> &Mutex<PollManager<PollFilter>> { &self.polls }
}
impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
fn new_filter(&self, filter: Filter) -> Result<RpcU256, Error> {
let mut polls = self.polls().lock();

View File

@ -34,6 +34,7 @@ use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::executed::{Executed, ExecutionError};
use ethcore::ids::BlockId;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::transaction::{Action, SignedTransaction, Transaction as EthTransaction};
use ethsync::LightSync;
use rlp::UntrustedRlp;
@ -43,7 +44,9 @@ use util::{RwLock, Mutex, Uint, U256};
use futures::{future, Future, BoxFuture, IntoFuture};
use futures::sync::oneshot;
use v1::impls::eth_filter::Filterable;
use v1::helpers::{CallRequest as CRequest, errors, limit_logs, dispatch};
use v1::helpers::{PollFilter, PollManager};
use v1::helpers::block_import::is_major_importing;
use v1::traits::Eth;
use v1::types::{
@ -55,7 +58,7 @@ use v1::metadata::Metadata;
use util::Address;
/// Light client `ETH` RPC.
/// Light client `ETH` (and filter) RPC.
pub struct EthClient {
sync: Arc<LightSync>,
client: Arc<LightClient>,
@ -63,6 +66,22 @@ pub struct EthClient {
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
cache: Arc<Mutex<LightDataCache>>,
polls: Mutex<PollManager<PollFilter>>,
}
impl Clone for EthClient {
fn clone(&self) -> Self {
// each instance should have its own poll manager.
EthClient {
sync: self.sync.clone(),
client: self.client.clone(),
on_demand: self.on_demand.clone(),
transaction_queue: self.transaction_queue.clone(),
accounts: self.accounts.clone(),
cache: self.cache.clone(),
polls: Mutex::new(PollManager::new()),
}
}
}
// helper for internal error: on demand sender cancelled.
@ -90,6 +109,7 @@ impl EthClient {
transaction_queue: transaction_queue,
accounts: accounts,
cache: cache,
polls: Mutex::new(PollManager::new()),
}
}
@ -484,19 +504,98 @@ impl Eth for EthClient {
Err(errors::deprecated("Compilation of Solidity via RPC is deprecated".to_string()))
}
fn logs(&self, _filter: Filter) -> Result<Vec<Log>, Error> {
Err(errors::unimplemented(None))
fn logs(&self, filter: Filter) -> BoxFuture<Vec<Log>, Error> {
let limit = filter.limit;
Filterable::logs(self, filter.into())
.map(move|logs| limit_logs(logs, limit))
.boxed()
}
fn work(&self, _timeout: Trailing<u64>) -> Result<Work, Error> {
Err(errors::unimplemented(None))
Err(errors::light_unimplemented(None))
}
fn submit_work(&self, _nonce: RpcH64, _pow_hash: RpcH256, _mix_hash: RpcH256) -> Result<bool, Error> {
Err(errors::unimplemented(None))
Err(errors::light_unimplemented(None))
}
fn submit_hashrate(&self, _rate: RpcU256, _id: RpcH256) -> Result<bool, Error> {
Err(errors::unimplemented(None))
Err(errors::light_unimplemented(None))
}
}
// This trait implementation triggers a blanked impl of `EthFilter`.
impl Filterable for EthClient {
fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number }
fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
}
fn pending_transactions_hashes(&self, _block_number: u64) -> Vec<::util::H256> {
Vec::new()
}
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> {
use std::collections::BTreeMap;
use futures::stream::{self, Stream};
use util::H2048;
// early exit for "to" block before "from" block.
match filter.from_block {
BlockId::Latest | BlockId::Pending => {
let best = self.client.best_block_header();
let chain_info = self.client.chain_info();
if best.number() != chain_info.best_block_number || best.hash() != chain_info.best_block_hash {
return future::ok(Vec::new()).boxed()
}
}
_ => {}
}
let maybe_future = self.sync.with_context(move |ctx| {
// find all headers which match the filter, and fetch the receipts for each one.
// match them with their numbers for easy sorting later.
let bit_combos = filter.bloom_possibilities();
let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block)
.take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block)
.take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block)
.filter(|ref hdr| {
let hdr_bloom = hdr.log_bloom();
bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some()
})
.map(|hdr| (hdr.number(), request::BlockReceipts(hdr)))
.map(|(num, req)| self.on_demand.block_receipts(ctx, req).map(move |x| (num, x)))
.collect();
// as the receipts come in, find logs within them which match the filter.
// insert them into a BTreeMap to maintain order by number and block index.
stream::futures_unordered(receipts_futures)
.fold(BTreeMap::new(), move |mut matches, (num, receipts)| {
for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() {
if filter.matches(&log) {
matches.insert((num, block_index), log.into());
}
}
future::ok(matches)
}) // and then collect them into a vector.
.map(|matches| matches.into_iter().map(|(_, v)| v).collect())
.map_err(err_premature_cancel)
});
match maybe_future {
Some(fut) => fut.boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}
fn pending_logs(&self, _block_number: u64, _filter: &EthcoreFilter) -> Vec<Log> {
Vec::new() // light clients don't mine.
}
fn polls(&self) -> &Mutex<PollManager<PollFilter>> {
&self.polls
}
}

View File

@ -162,8 +162,8 @@ build_rpc_trait! {
fn compile_serpent(&self, String) -> Result<Bytes, Error>;
/// Returns logs matching given filter object.
#[rpc(name = "eth_getLogs")]
fn logs(&self, Filter) -> Result<Vec<Log>, Error>;
#[rpc(async, name = "eth_getLogs")]
fn logs(&self, Filter) -> BoxFuture<Vec<Log>, Error>;
/// Returns the hash of the current block, the seedHash, and the boundary condition to be met.
#[rpc(name = "eth_getWork")]