From 8bfb91d7db5945427521ad53b109962f6eb41293 Mon Sep 17 00:00:00 2001 From: lash Date: Sun, 23 Jan 2022 17:49:04 +0000 Subject: [PATCH] Make backend properties static across filters --- eth_monitor/filters/__init__.py | 1 + eth_monitor/filters/base.py | 54 +++++++++++++++++++++++++++++++++ eth_monitor/filters/cache.py | 29 +++--------------- eth_monitor/rules.py | 42 +++++++++++++++++++++++++ eth_monitor/runnable/sync.py | 16 +++++++--- 5 files changed, 113 insertions(+), 29 deletions(-) create mode 100644 eth_monitor/filters/__init__.py create mode 100644 eth_monitor/filters/base.py create mode 100644 eth_monitor/rules.py diff --git a/eth_monitor/filters/__init__.py b/eth_monitor/filters/__init__.py new file mode 100644 index 0000000..9b5ed21 --- /dev/null +++ b/eth_monitor/filters/__init__.py @@ -0,0 +1 @@ +from .base import * diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py new file mode 100644 index 0000000..bd2e8d0 --- /dev/null +++ b/eth_monitor/filters/base.py @@ -0,0 +1,54 @@ +# standard imports +import os +import logging + +# external imports +from chainsyncer.backend.file import chain_dir_for +from leveldir.numeric import NumDir +from leveldir.hex import HexDir + +logg = logging.getLogger(__name__) + +base_dir = '/var/lib' + + +class RuledFilter: + + cache_root = os.path.join(base_dir, 'eth_monitor') + chain_dir = None + cache_dir = None + block_num_path = None + block_num_dir = None + block_hash_path = None + block_hash_dir = None + tx_path = None + tx_dir = None + + + def __init__(self, rules_filter=None): + if self.chain_dir == None: + raise RuntimeError('filter must be initialized. call RuledFilter.init() first') + self.rules_filter = rules_filter + + + @staticmethod + def init(chain_spec, cache_root=None, rules_filter=None): + if cache_root != None: + RuledFilter.cache_root = os.path.join(cache_root, 'eth_monitor') + RuledFilter.chain_dir = chain_dir_for(RuledFilter.cache_root) + RuledFilter.cache_dir = os.path.join(RuledFilter.chain_dir, 'cache') + RuledFilter.block_num_path = os.path.join(RuledFilter.cache_dir, 'block', 'num') + RuledFilter.block_num_dir = NumDir(RuledFilter.block_num_path, [100000, 1000]) + RuledFilter.block_hash_path = os.path.join(RuledFilter.cache_dir, 'block', 'hash') + RuledFilter.block_hash_dir = HexDir(RuledFilter.block_hash_path, 32, levels=2) + RuledFilter.tx_path = os.path.join(RuledFilter.cache_dir, 'tx') + RuledFilter.tx_dir = HexDir(RuledFilter.tx_path, 32, levels=2) + + + def filter(self, conn, block, tx, db_session=None): + if self.rules_filter != None: + if not self.rules_filter.apply_rules(tx): + logg.debug('rule match failed for tx {}'.format(tx.hash)) + return + logg.info('applying filter {}'.format(self)) + self.ruled_filter(conn, block, tx, db_session=db_session) diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py index 3974d1d..58729fe 100644 --- a/eth_monitor/filters/cache.py +++ b/eth_monitor/filters/cache.py @@ -3,30 +3,15 @@ import os import logging # external imports -from chainsyncer.backend.file import chain_dir_for -from leveldir.numeric import NumDir -from leveldir.hex import HexDir from hexathon import strip_0x -from chainlib.eth.address import is_same_address -base_dir = '/var/lib' +# local imports +from eth_monitor.filters import RuledFilter -logg = logging.getLogger() +logg = logging.getLogger(__name__) -class CacheFilter: - def __init__(self, chain_spec, cache_root=base_dir, rules_filter=None): - cache_root = os.path.join(cache_root, 'eth_monitor') - chain_dir = chain_dir_for(cache_root) - self.cache_dir = os.path.join(chain_dir, 'cache') - block_num_path = os.path.join(self.cache_dir, 'block', 'num') - self.block_num_dir = NumDir(block_num_path, [100000, 1000]) - block_hash_path = os.path.join(self.cache_dir, 'block', 'hash') - self.block_hash_dir = HexDir(block_hash_path, 32, levels=2) - tx_path = os.path.join(self.cache_dir, 'tx') - self.tx_dir = HexDir(tx_path, 32, levels=2) - self.rules_filter = rules_filter - +class CacheFilter(RuledFilter): def block_callback(self, block, extra=None): src = str(block.src()).encode('utf-8') @@ -35,10 +20,6 @@ class CacheFilter: self.block_num_dir.add(block.number, hash_bytes) - def filter(self, conn, block, tx, db_session=None): - if self.rules_filter != None: - if not self.rules_filter.apply_rules(tx): - logg.debug('rule match failed for tx {}'.format(tx.hash)) - return + def ruled_filter(self, conn, block, tx, db_session=None): src = str(tx.src()).encode('utf-8') self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) diff --git a/eth_monitor/rules.py b/eth_monitor/rules.py new file mode 100644 index 0000000..b53cf30 --- /dev/null +++ b/eth_monitor/rules.py @@ -0,0 +1,42 @@ +class AddressRules: + + def __init__(self, include_by_default=False): + self.excludes = [] + self.includes = [] + self.include_by_default = include_by_default + + + def exclude(self, sender=None, recipient=None, executable=None): + self.excludes.append((sender, recipient, executable,)) + logg.info('cache filter added EXCLUDE rule sender {} recipient {} executable {}'.format(sender, recipient, executable)) + + + def include(self, sender=None, recipient=None, executable=None): + self.includes.append((sender, recipient, executable,)) + logg.info('cache filter added INCLUDE rule sender {} recipient {} executable {}'.format(sender, recipient, executable)) + + + def apply_rules(self, tx): + v = False + + for rule in self.includes: + if rule[0] != None and is_same_address(tx.outputs[0], rule[0]): + logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0])) + v = True + elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]): + logg.debug('tx {} rule INCLUDE match in RECIPIENT {}'.format(tx.hash, tx.inputs[0])) + v = True + elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]): + logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0])) + v = True + for rule in self.excludes: + if rule[0] != None and is_same_address(tx.outputs[0], rule[0]): + logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0])) + v = False + elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]): + logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0])) + v = False + elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]): + logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0])) + v = False + return v diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 7d98b17..796b158 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -23,6 +23,7 @@ from chainsyncer.filter import NoopFilter from eth_monitor.chain import EthChainInterface from eth_monitor.filters.cache import CacheFilter from eth_monitor.rules import AddressRules +from eth_monitor.filters import RuledFilter logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -139,15 +140,17 @@ def setup_address_rules(includes_file=None, excludes_file=None, include_default= return rules -def setup_cache_filter(chain_spec, cache_dir, rules_filter=None): +def setup_filter(chain_spec, cache_dir): cache_dir = os.path.realpath(cache_dir) if cache_dir == None: import tempfile cache_dir = tempfile.mkdtemp() - logg.info('using dir {}'.format(cache_dir)) - cache_filter = CacheFilter(chain_spec, cache_dir, rules_filter=rules_filter) + logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir)) + RuledFilter.init(chain_spec, cache_dir) - return cache_filter + +def setup_cache_filter(rules_filter=None): + return CacheFilter(rules_filter=rules_filter) def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_offset=0, skip_history=False): @@ -194,9 +197,12 @@ if __name__ == '__main__': include_default=bool(args.include_default), ) - cache_filter = setup_cache_filter( + setup_filter( chain_spec, args.cache_dir, + ) + + cache_filter = setup_cache_filter( rules_filter=address_rules, )