2020-09-22 14:53:52 +02:00
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of OpenEthereum.
2016-06-01 19:37:34 +02:00
2020-09-22 14:53:52 +02:00
// OpenEthereum is free software: you can redistribute it and/or modify
2016-06-01 19:37:34 +02:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
2020-09-22 14:53:52 +02:00
// OpenEthereum is distributed in the hope that it will be useful,
2016-06-01 19:37:34 +02:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
2020-09-22 14:53:52 +02:00
// along with OpenEthereum. If not, see <http://www.gnu.org/licenses/>.
2016-06-01 19:37:34 +02:00
//! Eth Filter RPC implementation
2017-04-12 12:07:54 +02:00
use std ::{
2018-11-21 16:50:41 +01:00
collections ::{ BTreeSet , VecDeque } ,
sync ::Arc ,
} ;
2020-08-05 06:08:03 +02:00
2016-12-09 23:01:43 +01:00
use ethcore ::{
client ::{ BlockChainClient , BlockId } ,
2019-01-04 14:05:46 +01:00
miner ::{ self , MinerService } ,
} ;
2019-02-25 14:27:28 +01:00
use ethereum_types ::{ H256 , U256 } ;
2017-09-02 20:09:13 +02:00
use parking_lot ::Mutex ;
2019-01-04 14:05:46 +01:00
use types ::filter ::Filter as EthcoreFilter ;
2017-04-12 12:07:54 +02:00
2017-11-14 11:38:17 +01:00
use jsonrpc_core ::{
2017-10-05 12:35:01 +02:00
futures ::{ future , future ::Either , Future } ,
BoxFuture , Result ,
2019-02-25 14:27:28 +01:00
} ;
use v1 ::{
2018-07-04 17:37:55 +02:00
helpers ::{ errors , limit_logs , PollFilter , PollManager , SyncPollFilter } ,
2016-06-01 19:37:34 +02:00
impls ::eth ::pending_logs ,
traits ::EthFilter ,
2019-02-25 14:27:28 +01:00
types ::{ BlockNumber , Filter , FilterChanges , Index , Log } ,
2016-06-01 19:37:34 +02:00
} ;
2017-04-12 12:07:54 +02:00
/// Something which provides data that can be filtered over.
pub trait Filterable {
/// Current best block number.
fn best_block_number ( & self ) -> u64 ;
/// Get a block hash by block id.
2018-06-13 13:39:27 +02:00
fn block_hash ( & self , id : BlockId ) -> Option < H256 > ;
2017-04-12 12:07:54 +02:00
2018-07-04 17:37:55 +02:00
/// pending transaction hashes at the given block (unordered).
fn pending_transaction_hashes ( & self ) -> BTreeSet < H256 > ;
2017-04-12 12:07:54 +02:00
/// Get logs that match the given filter.
2017-11-14 11:38:17 +01:00
fn logs ( & self , filter : EthcoreFilter ) -> BoxFuture < Vec < Log > > ;
2017-04-12 12:07:54 +02:00
/// Get logs from the pending block.
fn pending_logs ( & self , block_number : u64 , filter : & EthcoreFilter ) -> Vec < Log > ;
/// Get a reference to the poll manager.
2018-07-04 17:37:55 +02:00
fn polls ( & self ) -> & Mutex < PollManager < SyncPollFilter > > ;
2018-06-13 13:39:27 +02:00
/// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed.
fn removed_logs ( & self , block_hash : H256 , filter : & EthcoreFilter ) -> ( Vec < Log > , u64 ) ;
2017-04-12 12:07:54 +02:00
}
/// Eth filter rpc implementation for a full node.
2018-04-13 17:34:27 +02:00
pub struct EthFilterClient < C , M > {
2017-04-12 12:07:54 +02:00
client : Arc < C > ,
miner : Arc < M > ,
2018-07-04 17:37:55 +02:00
polls : Mutex < PollManager < SyncPollFilter > > ,
2016-06-01 19:37:34 +02:00
}
2018-04-13 17:34:27 +02:00
impl < C , M > EthFilterClient < C , M > {
2016-06-01 19:37:34 +02:00
/// Creates new Eth filter client.
2018-06-18 13:42:54 +02:00
pub fn new ( client : Arc < C > , miner : Arc < M > , poll_lifetime : u32 ) -> Self {
2016-06-01 19:37:34 +02:00
EthFilterClient {
2019-03-22 12:01:11 +01:00
client ,
miner ,
2018-06-18 13:42:54 +02:00
polls : Mutex ::new ( PollManager ::new ( poll_lifetime ) ) ,
2016-06-01 19:37:34 +02:00
}
}
}
2018-04-13 17:34:27 +02:00
impl < C , M > Filterable for EthFilterClient < C , M >
where
C : miner ::BlockChainClient + BlockChainClient ,
M : MinerService ,
{
2017-04-12 12:07:54 +02:00
fn best_block_number ( & self ) -> u64 {
self . client . chain_info ( ) . best_block_number
}
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
fn block_hash ( & self , id : BlockId ) -> Option < H256 > {
self . client . block_hash ( id )
2017-04-12 12:07:54 +02:00
}
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
fn pending_transaction_hashes ( & self ) -> BTreeSet < H256 > {
self . miner . pending_transaction_hashes ( & * self . client )
2017-04-12 12:07:54 +02:00
}
2020-08-05 06:08:03 +02:00
2017-11-14 11:38:17 +01:00
fn logs ( & self , filter : EthcoreFilter ) -> BoxFuture < Vec < Log > > {
2018-08-13 09:47:10 +02:00
Box ::new ( future ::ok (
self . client
. logs ( filter )
. unwrap_or_default ( )
. into_iter ( )
. map ( Into ::into )
. collect ( ) ,
) )
2017-04-12 12:07:54 +02:00
}
2020-08-05 06:08:03 +02:00
2017-04-12 12:07:54 +02:00
fn pending_logs ( & self , block_number : u64 , filter : & EthcoreFilter ) -> Vec < Log > {
pending_logs ( & * self . miner , block_number , filter )
}
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
fn polls ( & self ) -> & Mutex < PollManager < SyncPollFilter > > {
& self . polls
}
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
fn removed_logs ( & self , block_hash : H256 , filter : & EthcoreFilter ) -> ( Vec < Log > , u64 ) {
let inner = | | -> Option < Vec < H256 > > {
let mut route = Vec ::new ( ) ;
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
let mut current_block_hash = block_hash ;
let mut current_block_header = self
. client
. block_header ( BlockId ::Hash ( current_block_hash ) ) ? ;
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
while current_block_hash
! = self
. client
. block_hash ( BlockId ::Number ( current_block_header . number ( ) ) ) ?
{
route . push ( current_block_hash ) ;
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
current_block_hash = current_block_header . parent_hash ( ) ;
current_block_header = self
. client
. block_header ( BlockId ::Hash ( current_block_hash ) ) ? ;
}
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
Some ( route )
} ;
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
let route = inner ( ) . unwrap_or_default ( ) ;
let route_len = route . len ( ) as u64 ;
(
route
. into_iter ( )
. flat_map ( | block_hash | {
let mut filter = filter . clone ( ) ;
filter . from_block = BlockId ::Hash ( block_hash ) ;
filter . to_block = filter . from_block ;
2020-08-05 06:08:03 +02:00
2018-08-13 09:47:10 +02:00
self . client
. logs ( filter )
. unwrap_or_default ( )
. into_iter ( )
. map ( | log | {
2018-06-13 13:39:27 +02:00
let mut log : Log = log . into ( ) ;
log . log_type = " removed " . into ( ) ;
log . removed = true ;
2020-08-05 06:08:03 +02:00
2018-06-13 13:39:27 +02:00
log
} )
} )
. collect ( ) ,
route_len ,
2020-08-05 06:08:03 +02:00
)
2018-06-13 13:39:27 +02:00
}
2017-04-12 12:07:54 +02:00
}
impl < T : Filterable + Send + Sync + 'static > EthFilter for T {
2019-02-25 14:27:28 +01:00
fn new_filter ( & self , filter : Filter ) -> Result < U256 > {
2017-04-12 12:07:54 +02:00
let mut polls = self . polls ( ) . lock ( ) ;
let block_number = self . best_block_number ( ) ;
2018-08-13 09:47:10 +02:00
let include_pending = filter . to_block = = Some ( BlockNumber ::Pending ) ;
let filter = filter . try_into ( ) ? ;
let id = polls . create_poll ( SyncPollFilter ::new ( PollFilter ::Logs {
block_number ,
filter ,
include_pending ,
last_block_hash : None ,
previous_logs : Default ::default ( ) ,
} ) ) ;
2016-09-23 19:42:33 +02:00
Ok ( id . into ( ) )
2016-06-01 19:37:34 +02:00
}
2020-08-05 06:08:03 +02:00
2019-02-25 14:27:28 +01:00
fn new_block_filter ( & self ) -> Result < U256 > {
2017-04-12 12:07:54 +02:00
let mut polls = self . polls ( ) . lock ( ) ;
2018-02-23 10:41:29 +01:00
// +1, since we don't want to include the current block
2018-11-21 16:50:41 +01:00
let id = polls . create_poll ( SyncPollFilter ::new ( PollFilter ::Block {
last_block_number : self . best_block_number ( ) ,
recent_reported_hashes : VecDeque ::with_capacity ( PollFilter ::MAX_BLOCK_HISTORY_SIZE ) ,
} ) ) ;
2016-09-23 19:42:33 +02:00
Ok ( id . into ( ) )
2016-06-01 19:37:34 +02:00
}
2020-08-05 06:08:03 +02:00
2019-02-25 14:27:28 +01:00
fn new_pending_transaction_filter ( & self ) -> Result < U256 > {
2017-04-12 12:07:54 +02:00
let mut polls = self . polls ( ) . lock ( ) ;
2018-07-04 17:37:55 +02:00
let pending_transactions = self . pending_transaction_hashes ( ) ;
let id = polls . create_poll ( SyncPollFilter ::new ( PollFilter ::PendingTransaction (
pending_transactions ,
) ) ) ;
2016-09-23 19:42:33 +02:00
Ok ( id . into ( ) )
2016-06-01 19:37:34 +02:00
}
2020-08-05 06:08:03 +02:00
2017-11-14 11:38:17 +01:00
fn filter_changes ( & self , index : Index ) -> BoxFuture < FilterChanges > {
2018-07-04 17:37:55 +02:00
let filter = match self . polls ( ) . lock ( ) . poll_mut ( & index . value ( ) ) {
Some ( filter ) = > filter . clone ( ) ,
None = > return Box ::new ( future ::err ( errors ::filter_not_found ( ) ) ) ,
} ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
Box ::new ( filter . modify ( | filter | match * filter {
2018-11-21 16:50:41 +01:00
PollFilter ::Block {
ref mut last_block_number ,
ref mut recent_reported_hashes ,
} = > {
// Check validity of recently reported blocks -- in case of re-org, rewind block to last valid
while let Some ( ( num , hash ) ) = recent_reported_hashes . front ( ) . cloned ( ) {
if self . block_hash ( BlockId ::Number ( num ) ) = = Some ( hash ) {
break ;
}
* last_block_number = num - 1 ;
recent_reported_hashes . pop_front ( ) ;
}
let current_number = self . best_block_number ( ) ;
let mut hashes = Vec ::new ( ) ;
for n in ( * last_block_number + 1 ) ..= current_number {
let block_number = BlockId ::Number ( n ) ;
2019-03-22 12:01:11 +01:00
if let Some ( hash ) = self . block_hash ( block_number ) {
* last_block_number = n ;
hashes . push ( hash ) ;
// Only keep the most recent history
if recent_reported_hashes . len ( ) > = PollFilter ::MAX_BLOCK_HISTORY_SIZE {
recent_reported_hashes . pop_back ( ) ;
}
recent_reported_hashes . push_front ( ( n , hash ) ) ;
2020-08-05 06:08:03 +02:00
}
2018-11-21 16:50:41 +01:00
}
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
Either ::A ( future ::ok ( FilterChanges ::Hashes ( hashes ) ) )
}
PollFilter ::PendingTransaction ( ref mut previous_hashes ) = > {
// get hashes of pending transactions
let current_hashes = self . pending_transaction_hashes ( ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
let new_hashes = {
// find all new hashes
current_hashes
. difference ( previous_hashes )
. cloned ( )
. map ( Into ::into )
. collect ( )
} ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// save all hashes of pending transactions
* previous_hashes = current_hashes ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// return new hashes
Either ::A ( future ::ok ( FilterChanges ::Hashes ( new_hashes ) ) )
}
2018-08-13 09:47:10 +02:00
PollFilter ::Logs {
ref mut block_number ,
ref mut last_block_hash ,
ref mut previous_logs ,
ref filter ,
include_pending ,
} = > {
2018-07-04 17:37:55 +02:00
// retrive the current block number
let current_number = self . best_block_number ( ) ;
2020-08-05 06:08:03 +02:00
2018-08-13 09:47:10 +02:00
let mut filter = filter . clone ( ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// retrieve reorg logs
let ( mut reorg , reorg_len ) = last_block_hash
. map_or_else ( | | ( Vec ::new ( ) , 0 ) , | h | self . removed_logs ( h , & filter ) ) ;
* block_number - = reorg_len as u64 ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
filter . from_block = BlockId ::Number ( * block_number ) ;
filter . to_block = BlockId ::Latest ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// retrieve pending logs
let pending = if include_pending {
let pending_logs = self . pending_logs ( current_number , & filter ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// remove logs about which client was already notified about
let new_pending_logs : Vec < _ > = pending_logs
. iter ( )
. filter ( | p | ! previous_logs . contains ( p ) )
. cloned ( )
. collect ( ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// save all logs retrieved by client
* previous_logs = pending_logs . into_iter ( ) . collect ( ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
new_pending_logs
} else {
Vec ::new ( )
} ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// save the number of the next block as a first block from which
// we want to get logs
* block_number = current_number + 1 ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// save the current block hash, which we used to get back to the
// canon chain in case of reorg.
* last_block_hash = self . block_hash ( BlockId ::Number ( current_number ) ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let limit = filter . limit ;
Either ::B (
self . logs ( filter )
. map ( move | logs | {
reorg . extend ( logs ) ;
reorg
} ) // append reorg logs in the front
. map ( move | mut logs | {
logs . extend ( pending ) ;
logs
} ) // append fetched pending logs
. map ( move | logs | limit_logs ( logs , limit ) ) // limit the logs
. map ( FilterChanges ::Logs ) ,
)
2016-09-23 19:42:33 +02:00
}
2018-07-04 17:37:55 +02:00
} ) )
2016-06-01 19:37:34 +02:00
}
2020-08-05 06:08:03 +02:00
2017-11-14 11:38:17 +01:00
fn filter_logs ( & self , index : Index ) -> BoxFuture < Vec < Log > > {
2018-08-13 09:47:10 +02:00
let ( filter , include_pending ) = {
2017-09-10 18:03:35 +02:00
let mut polls = self . polls ( ) . lock ( ) ;
2020-08-05 06:08:03 +02:00
2018-07-04 17:37:55 +02:00
match polls . poll ( & index . value ( ) ) . and_then ( | f | {
f . modify ( | filter | match * filter {
2018-08-13 09:47:10 +02:00
PollFilter ::Logs {
ref filter ,
include_pending ,
..
} = > Some ( ( filter . clone ( ) , include_pending ) ) ,
2018-07-04 17:37:55 +02:00
_ = > None ,
2020-08-05 06:08:03 +02:00
} )
2018-07-04 17:37:55 +02:00
} ) {
2018-08-13 09:47:10 +02:00
Some ( ( filter , include_pending ) ) = > ( filter , include_pending ) ,
2018-04-10 10:36:14 +02:00
None = > return Box ::new ( future ::err ( errors ::filter_not_found ( ) ) ) ,
2017-09-10 18:03:35 +02:00
}
} ;
2020-08-05 06:08:03 +02:00
2017-09-10 18:03:35 +02:00
// fetch pending logs.
let pending = if include_pending {
let best_block = self . best_block_number ( ) ;
self . pending_logs ( best_block , & filter )
} else {
Vec ::new ( )
} ;
2020-08-05 06:08:03 +02:00
2017-09-10 18:03:35 +02:00
// retrieve logs asynchronously, appending pending logs.
let limit = filter . limit ;
let logs = self . logs ( filter ) ;
2017-10-05 12:35:01 +02:00
Box ::new (
2017-09-10 18:03:35 +02:00
logs . map ( move | mut logs | {
logs . extend ( pending ) ;
logs
} )
. map ( move | logs | limit_logs ( logs , limit ) ) ,
2017-10-05 12:35:01 +02:00
)
2016-06-01 19:37:34 +02:00
}
2020-08-05 06:08:03 +02:00
2017-11-14 11:38:17 +01:00
fn uninstall_filter ( & self , index : Index ) -> Result < bool > {
2018-03-31 11:06:16 +02:00
Ok ( self . polls ( ) . lock ( ) . remove_poll ( & index . value ( ) ) )
2016-06-01 19:37:34 +02:00
}
}