// Copyright 2015-2018 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! Eth Filter RPC implementation use std::sync::Arc; use std::collections::BTreeSet; use ethcore::miner::{self, MinerService}; use ethcore::filter::Filter as EthcoreFilter; use ethcore::client::{BlockChainClient, BlockId}; use ethereum_types::H256; use parking_lot::Mutex; use jsonrpc_core::{BoxFuture, Result}; use jsonrpc_core::futures::{future, Future}; use jsonrpc_core::futures::future::Either; use v1::traits::EthFilter; use v1::types::{BlockNumber, Index, Filter, FilterChanges, Log, H256 as RpcH256, U256 as RpcU256}; use v1::helpers::{errors, SyncPollFilter, PollFilter, PollManager, limit_logs}; use v1::impls::eth::pending_logs; /// Something which provides data that can be filtered over. pub trait Filterable { /// Current best block number. fn best_block_number(&self) -> u64; /// Get a block hash by block id. fn block_hash(&self, id: BlockId) -> Option; /// pending transaction hashes at the given block (unordered). fn pending_transaction_hashes(&self) -> BTreeSet; /// Get logs that match the given filter. fn logs(&self, filter: EthcoreFilter) -> BoxFuture>; /// Get logs from the pending block. fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec; /// Get a reference to the poll manager. fn polls(&self) -> &Mutex>; /// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed. fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec, u64); } /// Eth filter rpc implementation for a full node. pub struct EthFilterClient { client: Arc, miner: Arc, polls: Mutex>, } impl EthFilterClient { /// Creates new Eth filter client. pub fn new(client: Arc, miner: Arc, poll_lifetime: u32) -> Self { EthFilterClient { client: client, miner: miner, polls: Mutex::new(PollManager::new(poll_lifetime)), } } } impl Filterable for EthFilterClient where C: miner::BlockChainClient + BlockChainClient, M: MinerService, { fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number } fn block_hash(&self, id: BlockId) -> Option { self.client.block_hash(id) } fn pending_transaction_hashes(&self) -> BTreeSet { self.miner.pending_transaction_hashes(&*self.client) } fn logs(&self, filter: EthcoreFilter) -> BoxFuture> { Box::new(future::ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect())) } fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec { pending_logs(&*self.miner, block_number, filter) } fn polls(&self) -> &Mutex> { &self.polls } fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec, u64) { let inner = || -> Option> { let mut route = Vec::new(); let mut current_block_hash = block_hash; let mut current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?; while current_block_hash != self.client.block_hash(BlockId::Number(current_block_header.number()))? { route.push(current_block_hash); current_block_hash = current_block_header.parent_hash(); current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?; } Some(route) }; let route = inner().unwrap_or_default(); let route_len = route.len() as u64; (route.into_iter().flat_map(|block_hash| { let mut filter = filter.clone(); filter.from_block = BlockId::Hash(block_hash); filter.to_block = filter.from_block; self.client.logs(filter).unwrap_or_default().into_iter().map(|log| { let mut log: Log = log.into(); log.log_type = "removed".into(); log.removed = true; log }) }).collect(), route_len) } } impl EthFilter for T { fn new_filter(&self, filter: Filter) -> Result { let mut polls = self.polls().lock(); let block_number = self.best_block_number(); let include_pending = filter.to_block == Some(BlockNumber::Pending); let filter = filter.try_into()?; let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs { block_number, filter, include_pending, last_block_hash: None, previous_logs: Default::default() })); Ok(id.into()) } fn new_block_filter(&self) -> Result { let mut polls = self.polls().lock(); // +1, since we don't want to include the current block let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block(self.best_block_number() + 1))); Ok(id.into()) } fn new_pending_transaction_filter(&self) -> Result { let mut polls = self.polls().lock(); let pending_transactions = self.pending_transaction_hashes(); let id = polls.create_poll(SyncPollFilter::new(PollFilter::PendingTransaction(pending_transactions))); Ok(id.into()) } fn filter_changes(&self, index: Index) -> BoxFuture { let filter = match self.polls().lock().poll_mut(&index.value()) { Some(filter) => filter.clone(), None => return Box::new(future::err(errors::filter_not_found())), }; Box::new(filter.modify(|filter| match *filter { PollFilter::Block(ref mut block_number) => { // +1, cause we want to return hashes including current block hash. let current_number = self.best_block_number() + 1; let hashes = (*block_number..current_number).into_iter() .map(BlockId::Number) .filter_map(|id| self.block_hash(id).map(Into::into)) .collect::>(); *block_number = current_number; Either::A(future::ok(FilterChanges::Hashes(hashes))) }, PollFilter::PendingTransaction(ref mut previous_hashes) => { // get hashes of pending transactions let current_hashes = self.pending_transaction_hashes(); let new_hashes = { // find all new hashes current_hashes.difference(previous_hashes) .cloned() .map(Into::into) .collect() }; // save all hashes of pending transactions *previous_hashes = current_hashes; // return new hashes Either::A(future::ok(FilterChanges::Hashes(new_hashes))) }, PollFilter::Logs { ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter, include_pending, } => { // retrive the current block number let current_number = self.best_block_number(); let mut filter = filter.clone(); // retrieve reorg logs let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter)); *block_number -= reorg_len as u64; filter.from_block = BlockId::Number(*block_number); filter.to_block = BlockId::Latest; // retrieve pending logs let pending = if include_pending { let pending_logs = self.pending_logs(current_number, &filter); // remove logs about which client was already notified about let new_pending_logs: Vec<_> = pending_logs.iter() .filter(|p| !previous_logs.contains(p)) .cloned() .collect(); // save all logs retrieved by client *previous_logs = pending_logs.into_iter().collect(); new_pending_logs } else { Vec::new() }; // save the number of the next block as a first block from which // we want to get logs *block_number = current_number + 1; // save the current block hash, which we used to get back to the // canon chain in case of reorg. *last_block_hash = self.block_hash(BlockId::Number(current_number)); // retrieve logs in range from_block..min(BlockId::Latest..to_block) let limit = filter.limit; Either::B(self.logs(filter) .map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front .map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs .map(move |logs| limit_logs(logs, limit)) // limit the logs .map(FilterChanges::Logs)) } })) } fn filter_logs(&self, index: Index) -> BoxFuture> { let (filter, include_pending) = { let mut polls = self.polls().lock(); match polls.poll(&index.value()).and_then(|f| f.modify(|filter| match *filter { PollFilter::Logs { ref filter, include_pending, .. } => Some((filter.clone(), include_pending)), _ => None, })) { Some((filter, include_pending)) => (filter, include_pending), None => return Box::new(future::err(errors::filter_not_found())), } }; // fetch pending logs. let pending = if include_pending { let best_block = self.best_block_number(); self.pending_logs(best_block, &filter) } else { Vec::new() }; // retrieve logs asynchronously, appending pending logs. let limit = filter.limit; let logs = self.logs(filter); Box::new(logs .map(move |mut logs| { logs.extend(pending); logs }) .map(move |logs| limit_logs(logs, limit)) ) } fn uninstall_filter(&self, index: Index) -> Result { Ok(self.polls().lock().remove_poll(&index.value())) } }