From 2ced0e344274eb9d372eb31361c59fd26951a439 Mon Sep 17 00:00:00 2001 From: lash Date: Mon, 24 Jan 2022 00:17:27 +0000 Subject: [PATCH] Limit cache of tx by address to addresses within rules --- eth_monitor/importers/etherscan.py | 8 +++--- eth_monitor/rules.py | 39 ++++++++++++++++++--------- eth_monitor/runnable/import.py | 42 +++++++++++++++++++++++++----- eth_monitor/runnable/sync.py | 14 +++++++--- eth_monitor/store/file.py | 13 +++++---- 5 files changed, 83 insertions(+), 33 deletions(-) diff --git a/eth_monitor/importers/etherscan.py b/eth_monitor/importers/etherscan.py index 8dd05c9..21edf92 100644 --- a/eth_monitor/importers/etherscan.py +++ b/eth_monitor/importers/etherscan.py @@ -24,10 +24,10 @@ class EtherscanImporter: def get(self, address): - f = open('sample_import.json', 'r') - o = json.load(f) - f.close() - #o = self.get_api(address) + #f = open('sample_import.json', 'r') + #o = json.load(f) + #f.close() + o = self.get_api(address) for v in o['result']: o = block_by_hash(v['blockHash']) diff --git a/eth_monitor/rules.py b/eth_monitor/rules.py index b53cf30..f91b13f 100644 --- a/eth_monitor/rules.py +++ b/eth_monitor/rules.py @@ -1,3 +1,12 @@ +# standard imports +import logging + +# external imports +from chainlib.eth.address import is_same_address + +logg = logging.getLogger() + + class AddressRules: def __init__(self, include_by_default=False): @@ -17,26 +26,30 @@ class AddressRules: def apply_rules(self, tx): - v = False + return self.apply_rules_addresses(tx.outputs[0], tx.inputs[0], tx.hash) + + + def apply_rules_addresses(self, sender, recipient, tx_hash): + v = self.include_by_default for rule in self.includes: - if rule[0] != None and is_same_address(tx.outputs[0], rule[0]): - logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0])) + if rule[0] != None and is_same_address(sender, rule[0]): + logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx_hash, sender)) v = True - elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]): - logg.debug('tx {} rule INCLUDE match in RECIPIENT {}'.format(tx.hash, tx.inputs[0])) + elif rule[1] != None and is_same_address(recipient, rule[1]): + logg.debug('tx {} rule INCLUDE match in RECIPIENT {}'.format(tx_hash, recipient)) v = True - elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]): - logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0])) + elif rule[2] != None and is_same_address(recipient, rule[2]): + logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx_hash, recipient)) v = True for rule in self.excludes: - if rule[0] != None and is_same_address(tx.outputs[0], rule[0]): - logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0])) + if rule[0] != None and is_same_address(sender, rule[0]): + logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx_hash, sender)) v = False - elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]): - logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0])) + elif rule[1] != None and is_same_address(recipient, rule[1]): + logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx_hash, recipient)) v = False - elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]): - logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0])) + elif rule[2] != None and is_same_address(recipient, rule[2]): + logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx_hash, recipient)) v = False return v diff --git a/eth_monitor/runnable/import.py b/eth_monitor/runnable/import.py index a4c48e5..6601e0c 100644 --- a/eth_monitor/runnable/import.py +++ b/eth_monitor/runnable/import.py @@ -3,6 +3,7 @@ import argparse import logging import sys import os +import time # external imports from chainlib.encode import TxHexNormalizer @@ -13,19 +14,23 @@ from chainlib.chain import ChainSpec from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.filters import RuledFilter from eth_monitor.store.file import FileStore +from eth_monitor.rules import AddressRules logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() normalize_address = TxHexNormalizer().wallet_address + argparser = argparse.ArgumentParser('master eth events monitor') argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from') argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') -argparser.add_argument('--include-data', dest='include_data', action='store_true', help='Include data objects') +argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default') +argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') argparser.add_argument('-f', '--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file') argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address') +argparser.add_argument('--delay', type=float, default=0.2, help='Seconds to wait between each retrieval from importer') argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-vv', action='store_true', help='Be more verbose') argparser.add_argument('-p', type=str, help='RPC provider') @@ -81,15 +86,21 @@ def collect_addresses(addresses=[], address_files=[]): return address_collection -def setup_filter(chain_spec, cache_dir): - store = FileStore(chain_spec, cache_dir) +def setup_address_rules(addresses): + rules = AddressRules() + for address in addresses: + rules.include(sender=address, recipient=address) + return rules + + +def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data, address_rules): + store = FileStore(chain_spec, cache_dir, address_rules=address_rules) cache_dir = os.path.realpath(cache_dir) if cache_dir == None: import tempfile cache_dir = tempfile.mkdtemp() logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir)) - include_data = bool(args.include_data) - RuledFilter.init(store, include_tx_data=include_data, include_block_data=include_data) + RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data) def main(): @@ -98,11 +109,28 @@ def main(): from eth_monitor.importers.etherscan import EtherscanImporter - setup_filter(chain_spec, args.cache_dir) - filters = [CacheFilter()] + address_rules = setup_address_rules(args.address) + + setup_filter( + chain_spec, + args.cache_dir, + bool(args.store_tx_data), + bool(args.store_block_data), + address_rules, + ) + + cache_filter = CacheFilter( + rules_filter=address_rules, + ) + + filters = [ + cache_filter, + ] + importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback) for a in addresses: importer.get(a) + time.sleep(args.delay) if __name__ == '__main__': diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 8734c48..70aa762 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -24,6 +24,7 @@ from eth_monitor.chain import EthChainInterface from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.rules import AddressRules from eth_monitor.filters import RuledFilter +from eth_monitor.store.file import FileStore logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -46,7 +47,9 @@ argparser.add_argument('--offset', type=int, default=0, help='Start sync on this argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids') argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync') argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file') -argparser.add_argument('--include-default', action='store_true', help='Include all transactions by default') +argparser.add_argument('--include-default', dest='include_default', action='store_true', help='Include all transactions by default') +argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default') +argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default') argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file') argparser.add_argument('-f', '--filter', type=str, action='append', help='Add python module filter path') argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') @@ -88,7 +91,7 @@ if os.environ.get('RPC_AUTHENTICATION') == 'basic': rpc = EthHTTPConnection(args.p) -def setup_address_rules(includes_file=None, excludes_file=None, include_default=False): +def setup_address_rules(includes_file=None, excludes_file=None, include_default=False, include_block_default=False): rules = AddressRules(include_by_default=include_default) @@ -141,13 +144,14 @@ def setup_address_rules(includes_file=None, excludes_file=None, include_default= return rules -def setup_filter(chain_spec, cache_dir): +def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data): + store = FileStore(chain_spec, cache_dir) cache_dir = os.path.realpath(cache_dir) if cache_dir == None: import tempfile cache_dir = tempfile.mkdtemp() logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir)) - RuledFilter.init(chain_spec, cache_dir) + RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data) def setup_cache_filter(rules_filter=None): @@ -201,6 +205,8 @@ if __name__ == '__main__': setup_filter( chain_spec, args.cache_dir, + bool(args.store_tx_data), + bool(args.store_block_data), ) cache_filter = setup_cache_filter( diff --git a/eth_monitor/store/file.py b/eth_monitor/store/file.py index 84c2f95..8eed8ed 100644 --- a/eth_monitor/store/file.py +++ b/eth_monitor/store/file.py @@ -25,10 +25,12 @@ class FileStore: tx_hash_dirnormal = strip_0x(tx.hash).upper() tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal) self.tx_raw_dir.add(tx_hash_bytes, raw) - address = bytes.fromhex(strip_0x(tx.inputs[0])) - self.address_dir.add_dir(tx_hash_dirnormal, address, b'') - address = bytes.fromhex(strip_0x(tx.outputs[0])) - self.address_dir.add_dir(tx_hash_dirnormal, address, b'') + if self.address_rules != None: + for a in tx.outputs + tx.inputs: + if self.address_rules.apply_rules_addresses(a, a, tx.hash): + a = bytes.fromhex(strip_0x(a)) + self.address_dir.add_dir(tx_hash_dirnormal, a, b'') + if include_data: src = json.dumps(tx.src()).encode('utf-8') self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) @@ -44,7 +46,7 @@ class FileStore: self.block_src_dir.add(hash_bytes, src) - def __init__(self, chain_spec, cache_root=base_dir): + def __init__(self, chain_spec, cache_root=base_dir, address_rules=None): self.cache_root = os.path.join( cache_root, 'eth_monitor', @@ -68,3 +70,4 @@ class FileStore: self.address_path = os.path.join(self.cache_dir, 'address') self.address_dir = HexDir(self.address_path, 20, levels=2) self.chain_spec = chain_spec + self.address_rules = address_rules