From ffa113511b9c6cfa5973a52f11db66364710c2ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 28 May 2016 14:21:43 +0200 Subject: [PATCH] Separating eth_filter --- rpc/src/v1/impls/eth.rs | 224 +++------------------------------ rpc/src/v1/impls/eth_filter.rs | 214 +++++++++++++++++++++++++++++++ rpc/src/v1/impls/mod.rs | 4 +- 3 files changed, 237 insertions(+), 205 deletions(-) create mode 100644 rpc/src/v1/impls/eth_filter.rs diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index f5af2543e..2c18640d7 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -18,7 +18,6 @@ extern crate ethash; -use std::collections::HashSet; use std::sync::{Arc, Weak, Mutex}; use std::ops::Deref; use ethsync::{SyncProvider, SyncState}; @@ -35,9 +34,8 @@ use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Act use ethcore::log_entry::LogEntry; use ethcore::filter::Filter as EthcoreFilter; use self::ethash::SeedHashCompute; -use v1::traits::{Eth, EthFilter, EthSigning}; +use v1::traits::{Eth, EthSigning}; use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, TransactionRequest, CallRequest, OptionalValue, Index, Filter, Log, Receipt}; -use v1::helpers::{PollFilter, PollManager}; use v1::impls::{dispatch_transaction, sign_and_dispatch}; use util::keys::store::AccountProvider; use serde; @@ -170,6 +168,25 @@ impl EthClient where } } +pub fn pending_logs(miner: &M, filter: &EthcoreFilter) -> Vec where M: MinerService { + let receipts = miner.pending_receipts(); + + let pending_logs = receipts.into_iter() + .flat_map(|(hash, r)| r.logs.into_iter().map(|l| (hash.clone(), l)).collect::>()) + .collect::>(); + + let result = pending_logs.into_iter() + .filter(|pair| filter.matches(&pair.1)) + .map(|pair| { + let mut log = Log::from(pair.1); + log.transaction_hash = Some(pair.0); + log + }) + .collect(); + + result +} + const MAX_QUEUE_SIZE_TO_MINE_ON: usize = 4; // because uncles go back 6. fn params_len(params: &Params) -> usize { @@ -193,25 +210,6 @@ fn from_params_default_third(params: Params) -> Result<(F1, F2, BlockNum } } -fn pending_logs(miner: &M, filter: &EthcoreFilter) -> Vec where M: MinerService { - let receipts = miner.pending_receipts(); - - let pending_logs = receipts.into_iter() - .flat_map(|(hash, r)| r.logs.into_iter().map(|l| (hash.clone(), l)).collect::>()) - .collect::>(); - - let result = pending_logs.into_iter() - .filter(|pair| filter.matches(&pair.1)) - .map(|pair| { - let mut log = Log::from(pair.1); - log.transaction_hash = Some(pair.0); - log - }) - .collect(); - - result -} - // must be in range [-32099, -32000] const UNSUPPORTED_REQUEST_CODE: i64 = -32000; @@ -533,188 +531,6 @@ impl Eth for EthClient where } } -/// Eth filter rpc implementation. -pub struct EthFilterClient where - C: BlockChainClient, - M: MinerService { - - client: Weak, - miner: Weak, - polls: Mutex>, -} - -impl EthFilterClient where - C: BlockChainClient, - M: MinerService { - - /// Creates new Eth filter client. - pub fn new(client: &Arc, miner: &Arc) -> Self { - EthFilterClient { - client: Arc::downgrade(client), - miner: Arc::downgrade(miner), - polls: Mutex::new(PollManager::new()), - } - } -} - -impl EthFilter for EthFilterClient where - C: BlockChainClient + 'static, - M: MinerService + 'static { - - fn new_filter(&self, params: Params) -> Result { - from_params::<(Filter,)>(params) - .and_then(|(filter,)| { - let mut polls = self.polls.lock().unwrap(); - let block_number = take_weak!(self.client).chain_info().best_block_number; - let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); - to_value(&U256::from(id)) - }) - } - - fn new_block_filter(&self, params: Params) -> Result { - match params { - Params::None => { - let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); - to_value(&U256::from(id)) - }, - _ => Err(Error::invalid_params()) - } - } - - fn new_pending_transaction_filter(&self, params: Params) -> Result { - match params { - Params::None => { - let mut polls = self.polls.lock().unwrap(); - let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); - let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); - - to_value(&U256::from(id)) - }, - _ => Err(Error::invalid_params()) - } - } - - fn filter_changes(&self, params: Params) -> Result { - let client = take_weak!(self.client); - from_params::<(Index,)>(params) - .and_then(|(index,)| { - let mut polls = self.polls.lock().unwrap(); - match polls.poll_mut(&index.value()) { - None => Ok(Value::Array(vec![] as Vec)), - Some(filter) => match *filter { - PollFilter::Block(ref mut block_number) => { - // + 1, cause we want to return hashes including current block hash. - let current_number = client.chain_info().best_block_number + 1; - let hashes = (*block_number..current_number).into_iter() - .map(BlockID::Number) - .filter_map(|id| client.block_hash(id)) - .collect::>(); - - *block_number = current_number; - - to_value(&hashes) - }, - PollFilter::PendingTransaction(ref mut previous_hashes) => { - // get hashes of pending transactions - let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); - - let new_hashes = - { - let previous_hashes_set = previous_hashes.iter().collect::>(); - - // find all new hashes - current_hashes - .iter() - .filter(|hash| !previous_hashes_set.contains(hash)) - .cloned() - .collect::>() - }; - - // save all hashes of pending transactions - *previous_hashes = current_hashes; - - // return new hashes - to_value(&new_hashes) - }, - PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => { - // retrive the current block number - let current_number = client.chain_info().best_block_number; - - // check if we need to check pending hashes - let include_pending = filter.to_block == Some(BlockNumber::Pending); - - // build appropriate filter - let mut filter: EthcoreFilter = filter.clone().into(); - filter.from_block = BlockID::Number(*block_number); - filter.to_block = BlockID::Latest; - - // retrieve logs in range from_block..min(BlockID::Latest..to_block) - let mut logs = client.logs(filter.clone()) - .into_iter() - .map(From::from) - .collect::>(); - - // additionally retrieve pending logs - if include_pending { - let pending_logs = pending_logs(take_weak!(self.miner).deref(), &filter); - - // remove logs about which client was already notified about - let new_pending_logs: Vec<_> = pending_logs.iter() - .filter(|p| !previous_logs.contains(p)) - .cloned() - .collect(); - - // save all logs retrieved by client - *previous_logs = pending_logs.into_iter().collect(); - - // append logs array with new pending logs - logs.extend(new_pending_logs); - } - - // save current block number as next from block number - *block_number = current_number; - - to_value(&logs) - } - } - } - }) - } - - fn filter_logs(&self, params: Params) -> Result { - from_params::<(Index,)>(params) - .and_then(|(index,)| { - let mut polls = self.polls.lock().unwrap(); - match polls.poll(&index.value()) { - Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { - let include_pending = filter.to_block == Some(BlockNumber::Pending); - let filter: EthcoreFilter = filter.clone().into(); - let mut logs = take_weak!(self.client).logs(filter.clone()) - .into_iter() - .map(From::from) - .collect::>(); - - if include_pending { - logs.extend(pending_logs(take_weak!(self.miner).deref(), &filter)); - } - - to_value(&logs) - }, - // just empty array - _ => Ok(Value::Array(vec![] as Vec)), - } - }) - } - - fn uninstall_filter(&self, params: Params) -> Result { - from_params::<(Index,)>(params) - .and_then(|(index,)| { - self.polls.lock().unwrap().remove_poll(&index.value()); - to_value(&true) - }) - } -} /// Implementation of functions that require signing when no trusted signer is used. diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs new file mode 100644 index 000000000..a17e64052 --- /dev/null +++ b/rpc/src/v1/impls/eth_filter.rs @@ -0,0 +1,214 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Eth Filter RPC implementation + +use std::ops::Deref; +use std::sync::{Arc, Weak, Mutex}; +use std::collections::HashSet; +use jsonrpc_core::*; +use util::numbers::*; +use ethminer::{MinerService}; +use ethcore::filter::Filter as EthcoreFilter; +use ethcore::client::{BlockChainClient, BlockID}; +use v1::traits::EthFilter; +use v1::types::{BlockNumber, Index, Filter, Log}; +use v1::helpers::{PollFilter, PollManager}; +use v1::impls::eth::pending_logs; + + +/// Eth filter rpc implementation. +pub struct EthFilterClient where + C: BlockChainClient, + M: MinerService { + + client: Weak, + miner: Weak, + polls: Mutex>, +} + +impl EthFilterClient where + C: BlockChainClient, + M: MinerService { + + /// Creates new Eth filter client. + pub fn new(client: &Arc, miner: &Arc) -> Self { + EthFilterClient { + client: Arc::downgrade(client), + miner: Arc::downgrade(miner), + polls: Mutex::new(PollManager::new()), + } + } +} + +impl EthFilter for EthFilterClient where + C: BlockChainClient + 'static, + M: MinerService + 'static { + + fn new_filter(&self, params: Params) -> Result { + from_params::<(Filter,)>(params) + .and_then(|(filter,)| { + let mut polls = self.polls.lock().unwrap(); + let block_number = take_weak!(self.client).chain_info().best_block_number; + let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); + to_value(&U256::from(id)) + }) + } + + fn new_block_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } + } + + fn new_pending_transaction_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); + let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); + + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } + } + + fn filter_changes(&self, params: Params) -> Result { + let client = take_weak!(self.client); + from_params::<(Index,)>(params) + .and_then(|(index,)| { + let mut polls = self.polls.lock().unwrap(); + match polls.poll_mut(&index.value()) { + None => Ok(Value::Array(vec![] as Vec)), + Some(filter) => match *filter { + PollFilter::Block(ref mut block_number) => { + // + 1, cause we want to return hashes including current block hash. + let current_number = client.chain_info().best_block_number + 1; + let hashes = (*block_number..current_number).into_iter() + .map(BlockID::Number) + .filter_map(|id| client.block_hash(id)) + .collect::>(); + + *block_number = current_number; + + to_value(&hashes) + }, + PollFilter::PendingTransaction(ref mut previous_hashes) => { + // get hashes of pending transactions + let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); + + let new_hashes = + { + let previous_hashes_set = previous_hashes.iter().collect::>(); + + // find all new hashes + current_hashes + .iter() + .filter(|hash| !previous_hashes_set.contains(hash)) + .cloned() + .collect::>() + }; + + // save all hashes of pending transactions + *previous_hashes = current_hashes; + + // return new hashes + to_value(&new_hashes) + }, + PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => { + // retrive the current block number + let current_number = client.chain_info().best_block_number; + + // check if we need to check pending hashes + let include_pending = filter.to_block == Some(BlockNumber::Pending); + + // build appropriate filter + let mut filter: EthcoreFilter = filter.clone().into(); + filter.from_block = BlockID::Number(*block_number); + filter.to_block = BlockID::Latest; + + // retrieve logs in range from_block..min(BlockID::Latest..to_block) + let mut logs = client.logs(filter.clone()) + .into_iter() + .map(From::from) + .collect::>(); + + // additionally retrieve pending logs + if include_pending { + let pending_logs = pending_logs(take_weak!(self.miner).deref(), &filter); + + // remove logs about which client was already notified about + let new_pending_logs: Vec<_> = pending_logs.iter() + .filter(|p| !previous_logs.contains(p)) + .cloned() + .collect(); + + // save all logs retrieved by client + *previous_logs = pending_logs.into_iter().collect(); + + // append logs array with new pending logs + logs.extend(new_pending_logs); + } + + // save current block number as next from block number + *block_number = current_number; + + to_value(&logs) + } + } + } + }) + } + + fn filter_logs(&self, params: Params) -> Result { + from_params::<(Index,)>(params) + .and_then(|(index,)| { + let mut polls = self.polls.lock().unwrap(); + match polls.poll(&index.value()) { + Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { + let include_pending = filter.to_block == Some(BlockNumber::Pending); + let filter: EthcoreFilter = filter.clone().into(); + let mut logs = take_weak!(self.client).logs(filter.clone()) + .into_iter() + .map(From::from) + .collect::>(); + + if include_pending { + logs.extend(pending_logs(take_weak!(self.miner).deref(), &filter)); + } + + to_value(&logs) + }, + // just empty array + _ => Ok(Value::Array(vec![] as Vec)), + } + }) + } + + fn uninstall_filter(&self, params: Params) -> Result { + from_params::<(Index,)>(params) + .and_then(|(index,)| { + self.polls.lock().unwrap().remove_poll(&index.value()); + to_value(&true) + }) + } +} diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index 13e4fd512..979463a5d 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -27,6 +27,7 @@ macro_rules! take_weak { mod web3; mod eth; +mod eth_filter; mod net; mod personal; mod ethcore; @@ -34,7 +35,8 @@ mod traces; mod rpc; pub use self::web3::Web3Client; -pub use self::eth::{EthClient, EthFilterClient, EthSigningUnsafeClient}; +pub use self::eth::{EthClient, EthSigningUnsafeClient}; +pub use self::eth_filter::EthFilterClient; pub use self::net::NetClient; pub use self::personal::PersonalClient; pub use self::ethcore::EthcoreClient;