4
0
mirror of git://holbrook.no/eth-monitor.git synced 2024-11-21 20:06:46 +01:00

Add renderer interface to out filter

This commit is contained in:
lash 2022-01-30 19:44:03 +00:00
parent 56be976b85
commit e6bb7a4771
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
6 changed files with 60 additions and 24 deletions

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
include *requirements.txt eth_monitor/data/config/*

View File

@ -0,0 +1,2 @@
[chain]
spec = evm:berlin:1:ethereum

View File

@ -1,24 +1,45 @@
# standard imports # standard imports
import sys import sys
import logging
# local imports # local imports
from .base import RuledFilter from .base import RuledFilter
logg = logging.getLogger(__name__)
# Interface defining the signature for renderer in OutFilter
# return string after local transformation
def apply_interface(c, s, chain_str, conn, block, tx, db_session=None):
pass
class OutFilter(RuledFilter): class OutFilter(RuledFilter):
def __init__(self, writer=sys.stdout, renderers=[], rules_filter=None): def __init__(self, chain_spec, writer=sys.stdout, renderers=[], rules_filter=None):
super(OutFilter, self).__init__(rules_filter=rules_filter) super(OutFilter, self).__init__(rules_filter=rules_filter)
self.w = writer self.w = writer
self.renderers = renderers self.renderers = renderers
self.c = 0 self.c = 0
self.chain_spec = chain_spec
self.chain_spec_str = str(chain_spec)
def filter(self, con, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
data = tx.payload s = None
if len(data) > 8:
data = data[:8] + '...' for renderer in self.renderers:
if len(data) > 0: s = renderer.apply(self.c, s, self.chain_spec_str, conn, block, tx)
data = 'data {}'.format(data) if s != None:
self.w.write('{} {} {} {}\n'.format(self.c, block, tx, data)) break
if s == None:
data = tx.payload
if len(data) > 8:
data = data[:8] + '...'
if len(data) > 0:
data = 'data {}'.format(data)
s = '{} {} {} {}'.format(self.c, block, tx, data)
self.w.write(s + '\n')
self.c += 1 self.c += 1

View File

@ -5,6 +5,7 @@ import argparse
import confini import confini
import logging import logging
import os import os
import importlib
# external imports # external imports
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@ -38,12 +39,13 @@ if default_eth_provider == None:
script_dir = os.path.realpath(os.path.dirname(__file__)) script_dir = os.path.realpath(os.path.dirname(__file__))
exec_dir = os.path.realpath(os.getcwd()) exec_dir = os.path.realpath(os.getcwd())
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config')) #default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config'))
base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
argparser = argparse.ArgumentParser('master eth events monitor') argparser = argparse.ArgumentParser('master eth events monitor')
argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)') argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file') argparser.add_argument('-c', type=str, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block') argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
#argparser.add_argument('--until', type=int, default=0, help='Start sync on this block') #argparser.add_argument('--until', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--head', action='store_true', help='Start at current block height (overrides --offset, assumes --keep-alive)') argparser.add_argument('--head', action='store_true', help='Start at current block height (overrides --offset, assumes --keep-alive)')
@ -54,6 +56,7 @@ argparser.add_argument('--include-default', dest='include_default', action='stor
argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default') argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default')
argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default') argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default')
argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file') argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file')
argparser.add_argument('--renderer', type=str, action='append', help='Python modules to dynamically load for rendering of transaction output')
argparser.add_argument('-f', '--filter', type=str, action='append', help='Add python module filter path') argparser.add_argument('-f', '--filter', type=str, action='append', help='Add python module filter path')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states') argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states')
@ -67,7 +70,7 @@ elif args.v:
logg.setLevel(logging.INFO) logg.setLevel(logging.INFO)
config_dir = args.c config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) config = confini.Config(base_config_dir, os.environ.get('CONFINI_ENV_PREFIX'), override_dirs=args.c)
config.process() config.process()
args_override = { args_override = {
'CHAIN_SPEC': getattr(args, 'i'), 'CHAIN_SPEC': getattr(args, 'i'),
@ -181,7 +184,7 @@ def setup_cache_filter(rules_filter=None):
return CacheFilter(rules_filter=rules_filter) return CacheFilter(rules_filter=rules_filter)
def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, sync_offset=0, skip_history=False): def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
syncers = [] syncers = []
syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir) syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
if len(syncer_backends) == 0: if len(syncer_backends) == 0:
@ -199,26 +202,26 @@ def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callb
logg.info('resuming sync session {}'.format(syncer_backend)) logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends: for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=RuledFilter.block_callback)) syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=callback)) #RuledFilter.block_callback))
syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir) syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)) syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=callback))
return syncers return syncers
def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
syncer_backend = FileBackend.initial(chain_spec, block_offset, start_block_height=sync_offset, base_dir=state_dir) syncer_backend = FileBackend.initial(chain_spec, block_offset, start_block_height=sync_offset, base_dir=state_dir)
syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback) syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=callback)
return [syncer] return [syncer]
def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir)
syncer = (HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)) syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=callback)
return [syncer] return [syncer]
if __name__ == '__main__': def main():
o = block_latest() o = block_latest()
r = rpc.do(o) r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1 block_offset = int(strip_0x(r), 16) + 1
@ -226,10 +229,10 @@ if __name__ == '__main__':
if block_offset == -1: if block_offset == -1:
block_offset = block_latest block_offset = block_latest
elif not config.true('_KEEP_ALIVE'): # elif not config.true('_KEEP_ALIVE'):
if block_limit == 0: # if block_limit == 0:
block_limit = block_latest # block_limit = block_latest
#
address_rules = setup_address_rules( address_rules = setup_address_rules(
includes_file=args.includes_file, includes_file=args.includes_file,
excludes_file=args.excludes_file, excludes_file=args.excludes_file,
@ -252,7 +255,6 @@ if __name__ == '__main__':
] ]
if args.filter != None: if args.filter != None:
import importlib
for fltr in args.filter: for fltr in args.filter:
m = importlib.import_module(fltr) m = importlib.import_module(fltr)
fltr_object = m.Filter(rules_filter=address_rules) fltr_object = m.Filter(rules_filter=address_rules)
@ -278,7 +280,13 @@ if __name__ == '__main__':
skip_history=config.true('_NO_HISTORY'), skip_history=config.true('_NO_HISTORY'),
) )
out_filter = OutFilter(rules_filter=address_rules) renderers = ['local.ge']
renderers_mods = []
for renderer in renderers:
m = importlib.import_module(renderer)
renderers_mods.append(m)
out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods)
filters.append(out_filter) filters.append(out_filter)
i = 0 i = 0
@ -291,3 +299,7 @@ if __name__ == '__main__':
sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1 i += 1
if __name__ == '__main__':
main()

0
test_requirements.txt Normal file
View File