Compare commits

...

4 Commits

Author SHA1 Message Date
lash 2a54256821
Add state rundir with block height output 2023-08-19 11:04:52 +01:00
lash a29ae35597
Skip cache rules filter when deactivated 2023-08-17 13:27:12 +01:00
lash c99259b2ed
Add match-all criteria and flag 2023-08-17 12:48:33 +01:00
lash fa694c957b
Start filter test writing 2023-08-17 10:28:27 +01:00
15 changed files with 436 additions and 18 deletions

View File

@ -1,3 +1,8 @@
- 0.8.8
* Skip rules filter processing for cache when deactivated
* Add match-all flag to rule processing
* Add match-all flag to CLI to toggle setting match_all flag to rule processing for include criteria
* Implement state dir (rundir), and last synced block output
- 0.8.7
* Upgrade chainsyncer (and shep) to avoid state deletion on partial filter list interrupts
- 0.8.6

View File

@ -30,6 +30,8 @@ def process_args(argparser, args, flags):
argparser.add_argument('--store-tx-data', action='store_true', dest='store_tx_data', help='Store tx data in cache store')
argparser.add_argument('--store-block-data', action='store_true', dest='store_block_data', help='Store block data in cache store')
argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
argparser.add_argument('--match-all', action='store_true', dest='match_all', help='Match all include filter criteria')
# misc flags
argparser.add_argument('-k', '--context-key', dest='context_key', action='append', type=str, help='Add a key-value pair to be added to the context')
argparser.add_argument('--run-dir', type=str, dest='run_dir', help='Output key sync and processing state properties to given diretory')

View File

@ -40,6 +40,8 @@ def process_config(config, arg, args, flags):
arg_override['ETHMONITOR_CONTEXT_KEY'] = getattr(args, 'context_key')
arg_override['ETHMONITOR_MATCH_ALL'] = getattr(args, 'match_all')
arg_override['ETHCACHE_STORE_BLOCK'] = getattr(args, 'store_block_data')
arg_override['ETHCACHE_STORE_TX'] = getattr(args, 'store_tx_data')
@ -54,6 +56,7 @@ def process_config(config, arg, args, flags):
config.add(getattr(args, 'session_id'), '_SESSION_ID', False)
config.add(getattr(args, 'cache_dir'), '_CACHE_DIR', False)
config.add(getattr(args, 'run_dir'), '_RUN_DIR', False)
config.add(getattr(args, 'fresh'), '_FRESH', False)
return config

View File

@ -19,3 +19,4 @@ block_filter =
include_default = 0
state_dir = ./.eth-monitor
context_key =
match_all = 0

2
eth_monitor/error.py Normal file
View File

@ -0,0 +1,2 @@
class RuleFail(Exception):
pass

View File

@ -1,3 +1,7 @@
# standard imports
import os
class Filter:
def __init__(self, store, include_block_data=False):
@ -5,5 +9,5 @@ class Filter:
self.include_block_data = include_block_data
def filter(self, conn, block):
def filter(self, conn, block, **kwargs):
self.store.put_block(block, include_data=self.include_block_data)

View File

@ -0,0 +1,16 @@
# standard imports
import os
class Filter:
def __init__(self, run_dir):
self.run_dir = run_dir
self.fp = os.path.join(run_dir, 'block')
def filter(self, conn, block):
f = open(self.fp, 'w')
f.write(str(block.number))
f.close()
return False

View File

@ -4,6 +4,7 @@ import uuid
# external imports
from chainlib.eth.address import is_same_address
from .error import RuleFail
logg = logging.getLogger()
@ -11,14 +12,17 @@ logg = logging.getLogger()
class RuleData:
def __init__(self, fragments, description=None):
def __init__(self, fragments, description=None, match_all=False):
self.fragments = fragments
self.description = description
if self.description == None:
self.description = str(uuid.uuid4())
self.match_all = match_all
def check(self, sender, recipient, data, tx_hash):
have_fail = False
have_match = False
if len(self.fragments) == 0:
return False
@ -28,9 +32,16 @@ class RuleData:
continue
if fragment in data:
logg.debug('tx {} rule {} match in DATA FRAGMENT {}'.format(tx_hash, self.description, fragment))
return True
if not self.match_all:
return True
have_match = True
else:
logg.debug('data match all {}'.format(self.match_all))
if self.match_all:
return False
have_fail = True
return False
return have_match
def __str__(self):
@ -41,11 +52,13 @@ class RuleData:
class RuleMethod:
def __init__(self, methods, description=None):
def __init__(self, methods, description=None, match_all=False):
self.methods = methods
self.description = description
if self.description == None:
self.description = str(uuid.uuid4())
if match_all:
logg.warning('match_all ignord for RuleMethod rule')
def check(self, sender, recipient, data, tx_hash):
@ -71,30 +84,62 @@ class RuleMethod:
class RuleSimple:
def __init__(self, outputs, inputs, executables, description=None):
def __init__(self, outputs, inputs, executables, description=None, match_all=False):
self.description = description
if self.description == None:
self.description = str(uuid.uuid4())
self.outputs = outputs
self.inputs = inputs
self.executables = executables
self.match_all = match_all
def check(self, sender, recipient, data, tx_hash):
r = None
try:
r = self.__check(sender, recipient, data, tx_hash)
except RuleFail:
return False
return r
def __check(self, sender, recipient, data, tx_hash):
have_fail = False
have_match = False
for rule in self.outputs:
if rule != None and is_same_address(sender, rule):
logg.debug('tx {} rule {} match in SENDER {}'.format(tx_hash, self.description, sender))
return True
if not self.match_all:
return True
have_match = True
else:
if self.match_all:
raise RuleFail(rule)
have_fail = True
if recipient == None:
return False
for rule in self.inputs:
if rule != None and is_same_address(recipient, rule):
logg.debug('tx {} rule {} match in RECIPIENT {}'.format(tx_hash, self.description, recipient))
return True
if not self.match_all:
return True
have_match = True
else:
if self.match_all:
raise RuleFail(rule)
have_fail = True
for rule in self.executables:
if rule != None and is_same_address(recipient, rule):
logg.debug('tx {} rule {} match in EXECUTABLE {}'.format(tx_hash, self.description, recipient))
return True
if not self.match_all:
return True
have_match = True
else:
if self.match_all:
raise RuleFail(rule)
have_fail = True
return have_match
def __str__(self):
@ -107,10 +152,11 @@ class RuleSimple:
class AddressRules:
def __init__(self, include_by_default=False):
def __init__(self, include_by_default=False, match_all=False):
self.excludes = []
self.includes = []
self.include_by_default = include_by_default
self.match_all = match_all
def exclude(self, rule):
@ -127,20 +173,29 @@ class AddressRules:
return self.apply_rules_addresses(tx.outputs[0], tx.inputs[0], tx.payload, tx.hash)
# TODO: rename
def apply_rules_addresses(self, sender, recipient, data, tx_hash):
v = self.include_by_default
have_fail = False
have_match = False
for rule in self.includes:
if rule.check(sender, recipient, data, tx_hash):
v = True
logg.info('match in includes rule: {}'.format(rule))
if not self.match_all:
break
elif self.match_all:
v = False
break
if not v:
return v
for rule in self.excludes:
if rule.check(sender, recipient, data, tx_hash):
v = False
logg.info('match in excludes rule: {}'.format(rule))
break
if not self.match_all:
break
return v

17
eth_monitor/run.py Normal file
View File

@ -0,0 +1,17 @@
# standard imports
import os
import logging
logg = logging.getLogger(__name__)
def cleanup_run(settings):
if not settings.get('RUN_OUT'):
return
lockfile = os.path.join(settings.get('RUN_DIR'), '.lock')
os.unlink(lockfile)
logg.debug('freed rundir {}'.format(settings.get('RUN_DIR')))
def cleanup(settings):
cleanup_run(settings)

View File

@ -47,6 +47,7 @@ from eth_monitor.callback import (
import eth_monitor.cli
from eth_monitor.cli.log import process_log
from eth_monitor.settings import process_settings as process_settings_local
from eth_monitor.run import cleanup
logg = logging.getLogger()
@ -111,6 +112,8 @@ def main():
except SyncDone as e:
sys.stderr.write("sync {} done at block {}\n".format(drv, e))
cleanup(settings)
if __name__ == '__main__':
main()

View File

@ -34,6 +34,7 @@ from eth_monitor.filters.cache import Filter as CacheFilter
from eth_monitor.config import override, list_from_prefix
from eth_monitor.filters.out import OutFilter
from eth_monitor.filters.block import Filter as BlockFilter
from eth_monitor.filters.run import Filter as RunFilter
logg = logging.getLogger(__name__)
@ -47,6 +48,32 @@ def process_monitor_session(settings, config):
session_id = 'default'
settings.set('SESSION_ID', session_id)
settings.set('SESSION_OK', True)
return settings
def process_monitor_rundir(settings, config):
settings.set('RUN_OUT', False)
if config.get('_RUN_DIR') == None:
return settings
run_dir = config.get('_RUN_DIR')
try:
os.makedirs(run_dir, exist_ok=True)
except Exception as e:
logg.error('could not create run dir, deactivating run output: ' + str(e))
return settings
lockfile = os.path.join(run_dir, '.lock')
try:
f = open(lockfile, 'x')
f.close()
except FileExistsError:
logg.error('run dir {} is already in use, deactivating run output'.format(run_dir))
return settings
settings.set('RUN_OUT', True)
settings.set('RUN_DIR', run_dir)
return settings
@ -130,6 +157,7 @@ def process_address_arg_rules(settings, config):
category['input']['i'],
category['exec']['i'],
description='INCLUDE',
match_all=settings.get('MATCH_ALL'),
)
rules.include(includes)
@ -167,7 +195,7 @@ def process_data_arg_rules(settings, config):
for v in config.get('ETHMONITOR_X_DATA_IN'):
exclude_data.append(v.lower())
includes = RuleData(include_data, description='INCLUDE')
includes = RuleData(include_data, description='INCLUDE', match_all=settings.get('MATCH_ALL'))
rules.include(includes)
excludes = RuleData(exclude_data, description='EXCLUDE')
@ -211,7 +239,7 @@ def process_address_file_rules(settings, config): #rules, includes_file=None, ex
except IndexError:
pass
rule = RuleSimple(sender, recipient, executable)
rule = RuleSimple(sender, recipient, executable, match_all=settings.get('MATCH_ALL'))
rules.include(rule)
excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
@ -243,6 +271,7 @@ def process_address_file_rules(settings, config): #rules, includes_file=None, ex
def process_arg_rules(settings, config):
address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
settings.set('MATCH_ALL', config.true('ETHMONITOR_MATCH_ALL'))
settings.set('RULES', address_rules)
settings = process_address_arg_rules(settings, config)
settings = process_data_arg_rules(settings, config)
@ -272,7 +301,10 @@ def process_cache_store(settings, config):
def process_cache_filter(settings, config):
cache_store = settings.get('CACHE_STORE')
fltr = CacheFilter(cache_store, rules_filter=settings.o['RULES'], include_tx_data=config.true('ETHCACHE_STORE_TX'))
cache_rules = AddressRules(include_by_default=True)
if str(cache_store) != 'Nullstore':
cache_rules = settings.o['RULES']
fltr = CacheFilter(cache_store, rules_filter=cache_rules, include_tx_data=config.true('ETHCACHE_STORE_TX'))
sync_store = settings.get('SYNC_STORE')
sync_store.register(fltr)
@ -283,6 +315,14 @@ def process_cache_filter(settings, config):
return settings
def process_run_filter(settings, config):
if not settings.get('RUN_OUT'):
return settings
fltr = RunFilter(settings.get('RUN_DIR'))
hndlr = settings.get('BLOCK_HANDLER')
hndlr.register(fltr)
return settings
def process_tx_filter(settings, config):
for fltr in list_from_prefix(config, 'filter'):
m = importlib.import_module(fltr)
@ -330,6 +370,7 @@ def process_filter(settings, config):
settings = process_renderer(settings, config)
settings = process_block_filter(settings, config)
settings = process_cache_filter(settings, config)
settings = process_run_filter(settings, config)
settings = process_tx_filter(settings, config)
settings = process_out_filter(settings, config)
settings = process_arg_filter(settings, config)
@ -378,6 +419,7 @@ def process_user_context(settings, config):
ctx_usr[k] = v
ctx = {
'driver': 'eth-monitor',
'rundir': settings.get('RUN_DIR'),
'usr': ctx_usr,
}
settings.set('SYNCER_CONTEXT', ctx)
@ -387,6 +429,7 @@ def process_user_context(settings, config):
def process_settings(settings, config):
settings = process_monitor_session(settings, config)
settings = process_monitor_session_dir(settings, config)
settings = process_monitor_rundir(settings, config)
settings = process_arg_rules(settings, config)
settings = process_sync(settings, config)
settings = process_cache(settings, config)

17
run_tests.sh Normal file
View File

@ -0,0 +1,17 @@
#!/bin/bash
set -a
set -e
set -x
default_pythonpath=$PYTHONPATH:.
export PYTHONPATH=${default_pythonpath:-.}
>&2 echo using pythonpath $PYTHONPATH
for f in `ls tests/*.py`; do
python $f
done
for f in `ls tests/rules/*.py`; do
python $f
done
set +x
set +e
set +a

View File

@ -1,6 +1,6 @@
[metadata]
name = eth-monitor
version = 0.8.7
version = 0.8.8
description = Monitor and cache transactions using match filters
author = Louis Holbrook
author_email = dev@holbrook.no

160
tests/rules/test_base.py Normal file
View File

@ -0,0 +1,160 @@
# standard imports
import logging
import unittest
import os
# local imports
from eth_monitor.rules import *
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestRule(unittest.TestCase):
def setUp(self):
self.alice = os.urandom(20).hex()
self.bob = os.urandom(20).hex()
self.carol = os.urandom(20).hex()
self.dave = os.urandom(20).hex()
self.x = os.urandom(20).hex()
self.y = os.urandom(20).hex()
self.hsh = os.urandom(32).hex()
def test_address_include(self):
data = b''
outs = [self.alice]
ins = []
execs = []
rule = RuleSimple(outs, ins, execs)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
outs = []
ins = [self.alice]
execs = []
rule = RuleSimple(outs, ins, execs)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertTrue(r)
outs = []
ins = []
execs = [self.x]
rule = RuleSimple(outs, ins, execs)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
data = b'deadbeef0123456789'
data_match = [data[:8]]
rule = RuleMethod(data_match)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
self.assertFalse(r)
rule = RuleData(data_match)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
self.assertTrue(r)
def test_address_exclude(self):
data = b''
outs = [self.alice]
ins = []
execs = []
rule = RuleSimple(outs, ins, execs)
c = AddressRules()
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
c = AddressRules(include_by_default=True)
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertTrue(r)
outs = []
ins = [self.alice]
execs = []
rule = RuleSimple(outs, ins, execs)
c = AddressRules(include_by_default=True)
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
outs = []
ins = []
execs = [self.x]
rule = RuleSimple(outs, ins, execs)
c = AddressRules(include_by_default=True)
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertTrue(r)
data = b'deadbeef0123456789'
data_match = [data[:8]]
rule = RuleMethod(data_match)
c = AddressRules(include_by_default=True)
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
self.assertTrue(r)
rule = RuleData(data_match)
c = AddressRules(include_by_default=True)
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd', self.hsh)
self.assertTrue(r)
def test_address_include_exclude(self):
data = b''
outs = [self.alice]
ins = []
execs = []
rule = RuleSimple(outs, ins, execs)
c = AddressRules()
c.include(rule)
c.exclude(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,90 @@
import logging
import unittest
import os
# local imports
from eth_monitor.rules import *
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestRule(unittest.TestCase):
def setUp(self):
self.alice = os.urandom(20).hex()
self.bob = os.urandom(20).hex()
self.carol = os.urandom(20).hex()
self.dave = os.urandom(20).hex()
self.x = os.urandom(20).hex()
self.y = os.urandom(20).hex()
self.hsh = os.urandom(32).hex()
def test_greedy_includes(self):
data = b''
outs = [self.alice]
ins = [self.carol]
execs = []
rule = RuleSimple(outs, ins, execs, match_all=True)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.carol, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.alice, self.carol, data, self.hsh)
self.assertTrue(r)
rule = RuleSimple(outs, ins, execs)
c = AddressRules(match_all=True)
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
self.assertFalse(r)
r = c.apply_rules_addresses(self.bob, self.carol, data, self.hsh)
self.assertTrue(r)
r = c.apply_rules_addresses(self.alice, self.carol, data, self.hsh)
self.assertTrue(r)
def test_greedy_data(self):
data = os.urandom(128).hex()
data_match_one = data[4:8]
data_match_two = data[32:42]
data_match_fail = os.urandom(64).hex()
data_match = [data_match_one]
rule = RuleData(data_match, match_all=True)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertTrue(r)
data_match = [data_match_two]
rule = RuleData(data_match, match_all=True)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertTrue(r)
data_match = [data_match_two, data_match_one]
rule = RuleData(data_match, match_all=True)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertTrue(r)
data_match = [data_match_two, data_match_fail, data_match_one]
rule = RuleData(data_match, match_all=True)
c = AddressRules()
c.include(rule)
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
self.assertFalse(r)
if __name__ == '__main__':
unittest.main()