From a6b6c312b8da9ee3332a41408b18034d32c8c650 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 28 Mar 2017 14:19:21 +0200 Subject: [PATCH] abstraction and futures-based eth_filter --- parity/rpc_apis.rs | 2 +- rpc/src/v1/impls/eth_filter.rs | 170 ++++++++++++++++++++------------- rpc/src/v1/tests/mocked/eth.rs | 2 +- rpc/src/v1/traits/eth.rs | 8 +- 4 files changed, 112 insertions(+), 70 deletions(-) diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 5cfb28474..01a649469 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -216,7 +216,7 @@ impl Dependencies for FullDependencies { ); handler.extend_with(client.to_delegate()); - let filter_client = EthFilterClient::new(&self.client, &self.miner); + let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); handler.extend_with(filter_client.to_delegate()); add_signing_methods!(EthSigning, handler, self); diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index cf3398498..9a7281243 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -16,89 +16,131 @@ //! Eth Filter RPC implementation -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::collections::HashSet; + use jsonrpc_core::*; use ethcore::miner::MinerService; use ethcore::filter::Filter as EthcoreFilter; use ethcore::client::{BlockChainClient, BlockId}; -use util::Mutex; +use util::{H256, Mutex}; + +use futures::{future, Future, BoxFuture}; + use v1::traits::EthFilter; use v1::types::{BlockNumber, Index, Filter, FilterChanges, Log, H256 as RpcH256, U256 as RpcU256}; use v1::helpers::{PollFilter, PollManager, limit_logs}; use v1::impls::eth::pending_logs; +/// Something which provides data that can be filtered over. +pub trait Filterable { + /// Current best block number. + fn best_block_number(&self) -> u64; + + /// Get a block hash by block id. + fn block_hash(&self, id: BlockId) -> Option; + + /// pending transaction hashes at the given block. + fn pending_transactions_hashes(&self, block_number: u64) -> Vec; + + /// Get logs that match the given filter. + fn logs(&self, filter: EthcoreFilter) -> BoxFuture, Error>; + + /// Get logs from the pending block. + fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec; + + /// Get a reference to the poll manager. + fn polls(&self) -> &Mutex>; +} + /// Eth filter rpc implementation. pub struct EthFilterClient where C: BlockChainClient, M: MinerService { - client: Weak, - miner: Weak, + client: Arc, + miner: Arc, polls: Mutex>, } -impl EthFilterClient where - C: BlockChainClient, - M: MinerService { - +impl EthFilterClient where C: BlockChainClient, M: MinerService { /// Creates new Eth filter client. - pub fn new(client: &Arc, miner: &Arc) -> Self { + pub fn new(client: Arc, miner: Arc) -> Self { EthFilterClient { - client: Arc::downgrade(client), - miner: Arc::downgrade(miner), + client: client, + miner: miner, polls: Mutex::new(PollManager::new()), } } } -impl EthFilter for EthFilterClient - where C: BlockChainClient + 'static, M: MinerService + 'static -{ +impl Filterable for EthFilterClient where C: BlockChainClient, M: MinerService { + fn best_block_number(&self) -> u64 { + self.client.chain_info().best_block_number + } + + fn block_hash(&self, id: BlockId) -> Option { + self.client.block_hash(id).map(Into::into) + } + + fn pending_transactions_hashes(&self, best: u64) -> Vec { + self.miner.pending_transactions_hashes(best) + } + + fn logs(&self, filter: EthcoreFilter) -> BoxFuture, Error> { + future::ok(self.client.logs(filter).into_iter().map(Into::into).collect()).boxed() + } + + fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec { + pending_logs(&*self.miner, block_number, filter) + } + + fn polls(&self) -> &Mutex> { &self.polls } +} + +impl EthFilter for T { fn new_filter(&self, filter: Filter) -> Result { - let mut polls = self.polls.lock(); - let block_number = take_weak!(self.client).chain_info().best_block_number; + let mut polls = self.polls().lock(); + let block_number = self.best_block_number(); let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); Ok(id.into()) } fn new_block_filter(&self) -> Result { - let mut polls = self.polls.lock(); - let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); + let mut polls = self.polls().lock(); + let id = polls.create_poll(PollFilter::Block(self.best_block_number())); Ok(id.into()) } fn new_pending_transaction_filter(&self) -> Result { - let mut polls = self.polls.lock(); - let best_block = take_weak!(self.client).chain_info().best_block_number; - let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(best_block); + let mut polls = self.polls().lock(); + let best_block = self.best_block_number(); + let pending_transactions = self.pending_transactions_hashes(best_block); let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); Ok(id.into()) } - fn filter_changes(&self, index: Index) -> Result { - let client = take_weak!(self.client); - let mut polls = self.polls.lock(); + fn filter_changes(&self, index: Index) -> BoxFuture { + let mut polls = self.polls().lock(); match polls.poll_mut(&index.value()) { - None => Ok(FilterChanges::Empty), + None => future::ok(FilterChanges::Empty).boxed(), Some(filter) => match *filter { PollFilter::Block(ref mut block_number) => { // + 1, cause we want to return hashes including current block hash. - let current_number = client.chain_info().best_block_number + 1; + let current_number = self.best_block_number() + 1; let hashes = (*block_number..current_number).into_iter() .map(BlockId::Number) - .filter_map(|id| client.block_hash(id)) - .map(Into::into) + .filter_map(|id| self.block_hash(id)) .collect::>(); *block_number = current_number; - Ok(FilterChanges::Hashes(hashes)) + future::ok(FilterChanges::Hashes(hashes)).boxed() }, PollFilter::PendingTransaction(ref mut previous_hashes) => { // get hashes of pending transactions - let best_block = take_weak!(self.client).chain_info().best_block_number; - let current_hashes = take_weak!(self.miner).pending_transactions_hashes(best_block); + let best_block = self.best_block_number(); + let current_hashes = self.pending_transactions_hashes(best_block); let new_hashes = { @@ -117,11 +159,11 @@ impl EthFilter for EthFilterClient *previous_hashes = current_hashes; // return new hashes - Ok(FilterChanges::Hashes(new_hashes)) + future::ok(FilterChanges::Hashes(new_hashes)).boxed() }, PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => { // retrive the current block number - let current_number = client.chain_info().best_block_number; + let current_number = self.best_block_number(); // check if we need to check pending hashes let include_pending = filter.to_block == Some(BlockNumber::Pending); @@ -131,16 +173,9 @@ impl EthFilter for EthFilterClient filter.from_block = BlockId::Number(*block_number); filter.to_block = BlockId::Latest; - // retrieve logs in range from_block..min(BlockId::Latest..to_block) - let mut logs = client.logs(filter.clone()) - .into_iter() - .map(From::from) - .collect::>(); - - // additionally retrieve pending logs - if include_pending { - let best_block = take_weak!(self.client).chain_info().best_block_number; - let pending_logs = pending_logs(&*take_weak!(self.miner), best_block, &filter); + // retrieve pending logs + let pending = if include_pending { + let pending_logs = self.pending_logs(current_number, &filter); // remove logs about which client was already notified about let new_pending_logs: Vec<_> = pending_logs.iter() @@ -151,49 +186,56 @@ impl EthFilter for EthFilterClient // save all logs retrieved by client *previous_logs = pending_logs.into_iter().collect(); - // append logs array with new pending logs - logs.extend(new_pending_logs); - } - - let logs = limit_logs(logs, filter.limit); + new_pending_logs + } else { + Vec::new() + }; // save the number of the next block as a first block from which // we want to get logs *block_number = current_number + 1; - Ok(FilterChanges::Logs(logs)) + // retrieve logs in range from_block..min(BlockId::Latest..to_block) + let limit = filter.limit; + self.logs(filter) + .map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs + .map(move |logs| limit_logs(logs, limit)) // limit the logs + .map(FilterChanges::Logs) + .boxed() } } } } - fn filter_logs(&self, index: Index) -> Result, Error> { - let mut polls = self.polls.lock(); + fn filter_logs(&self, index: Index) -> BoxFuture, Error> { + let mut polls = self.polls().lock(); match polls.poll(&index.value()) { Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { let include_pending = filter.to_block == Some(BlockNumber::Pending); let filter: EthcoreFilter = filter.clone().into(); - let mut logs = take_weak!(self.client).logs(filter.clone()) - .into_iter() - .map(From::from) - .collect::>(); - if include_pending { - let best_block = take_weak!(self.client).chain_info().best_block_number; - logs.extend(pending_logs(&*take_weak!(self.miner), best_block, &filter)); - } + // fetch pending logs. + let pending = if include_pending { + let best_block = self.best_block_number(); + self.pending_logs(best_block, &filter) + } else { + Vec::new() + }; - let logs = limit_logs(logs, filter.limit); - - Ok(logs) + // retrieve logs asynchronously, appending pending logs. + let limit = filter.limit; + self.logs(filter) + .map(move |mut logs| { logs.extend(pending); logs }) + .map(move |logs| limit_logs(logs, limit)) + .boxed() }, // just empty array - _ => Ok(Vec::new()), + _ => future::ok(Vec::new()).boxed() } } fn uninstall_filter(&self, index: Index) -> Result { - self.polls.lock().remove_poll(&index.value()); + self.polls().lock().remove_poll(&index.value()); Ok(true) } } diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index dfd64d38d..8f05ed6d4 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -89,7 +89,7 @@ impl EthTester { let hashrates = Arc::new(Mutex::new(HashMap::new())); let external_miner = Arc::new(ExternalMiner::new(hashrates.clone())); let eth = EthClient::new(&client, &snapshot, &sync, &ap, &miner, &external_miner, options).to_delegate(); - let filter = EthFilterClient::new(&client, &miner).to_delegate(); + let filter = EthFilterClient::new(client.clone(), miner.clone()).to_delegate(); let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner)); let sign = SigningUnsafeClient::new(&ap, dispatcher).to_delegate(); diff --git a/rpc/src/v1/traits/eth.rs b/rpc/src/v1/traits/eth.rs index 365ad9320..7f21829c7 100644 --- a/rpc/src/v1/traits/eth.rs +++ b/rpc/src/v1/traits/eth.rs @@ -196,12 +196,12 @@ build_rpc_trait! { fn new_pending_transaction_filter(&self) -> Result; /// Returns filter changes since last poll. - #[rpc(name = "eth_getFilterChanges")] - fn filter_changes(&self, Index) -> Result; + #[rpc(async, name = "eth_getFilterChanges")] + fn filter_changes(&self, Index) -> BoxFuture; /// Returns all logs matching given filter (in a range 'from' - 'to'). - #[rpc(name = "eth_getFilterLogs")] - fn filter_logs(&self, Index) -> Result, Error>; + #[rpc(async, name = "eth_getFilterLogs")] + fn filter_logs(&self, Index) -> BoxFuture, Error>; /// Uninstalls filter. #[rpc(name = "eth_uninstallFilter")]