Add output filter

This commit is contained in:
lash 2022-01-30 14:43:53 +00:00
parent 83f96640ae
commit 56be976b85
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 64 additions and 5 deletions

View File

@ -26,6 +26,7 @@ class RuledFilter:
@classmethod
def block_callback(cls, block, extra=None):
logg.info('processing {}'.format(block))
cls.store.put_block(block, include_data=cls.include_block_data)
@ -34,5 +35,5 @@ class RuledFilter:
if not self.rules_filter.apply_rules(tx):
logg.debug('rule match failed for tx {}'.format(tx.hash))
return
logg.info('applying filter {}'.format(self))
logg.debug('applying filter {}'.format(self))
self.ruled_filter(conn, block, tx, db_session=db_session)

View File

@ -0,0 +1,24 @@
# standard imports
import sys
# local imports
from .base import RuledFilter
class OutFilter(RuledFilter):
def __init__(self, writer=sys.stdout, renderers=[], rules_filter=None):
super(OutFilter, self).__init__(rules_filter=rules_filter)
self.w = writer
self.renderers = renderers
self.c = 0
def filter(self, con, block, tx, db_session=None):
data = tx.payload
if len(data) > 8:
data = data[:8] + '...'
if len(data) > 0:
data = 'data {}'.format(data)
self.w.write('{} {} {} {}\n'.format(self.c, block, tx, data))
self.c += 1

View File

@ -24,6 +24,7 @@ from eth_monitor.chain import EthChainInterface
from eth_monitor.filters.cache import Filter as CacheFilter
from eth_monitor.rules import AddressRules
from eth_monitor.filters import RuledFilter
from eth_monitor.filters.out import OutFilter
from eth_monitor.store.file import FileStore
logging.basicConfig(level=logging.WARNING)
@ -44,6 +45,8 @@ argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provide
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
#argparser.add_argument('--until', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--head', action='store_true', help='Start at current block height (overrides --offset, assumes --keep-alive)')
argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file')
@ -73,6 +76,20 @@ config.dict_override(args_override, 'cli')
config.add(args.offset, '_SYNC_OFFSET', True)
config.add(args.skip_history, '_NO_HISTORY', True)
config.add(args.single, '_SINGLE', True)
config.add(args.head, '_HEAD', True)
logg.debug('loaded config:\{}'.format(config))
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
block_limit = 0
#if args.until > 0:
# if not args.head and args.until <= block_offset:
# raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(block_offset, args.until))
# block_limit = args.until
logg.debug('config loaded:\n{}'.format(config))
@ -164,7 +181,7 @@ def setup_cache_filter(rules_filter=None):
return CacheFilter(rules_filter=rules_filter)
def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_offset=0, skip_history=False):
def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, sync_offset=0, skip_history=False):
syncers = []
syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
if len(syncer_backends) == 0:
@ -189,18 +206,29 @@ def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_off
return syncers
def setup_backend_single(chain_spec, block_offset, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
syncer_backend = FileBackend.initial(chain_spec, block_offset, start_block_height=sync_offset, base_dir=state_dir)
syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)
return [syncer]
def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir)
syncer = (HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
return [syncer]
if __name__ == '__main__':
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('current block height {}'.format(block_offset))
logg.info('network block height is {}'.format(block_offset))
if block_offset == -1:
block_offset = block_latest
elif not config.true('_KEEP_ALIVE'):
if block_limit == 0:
block_limit = block_latest
address_rules = setup_address_rules(
includes_file=args.includes_file,
@ -231,7 +259,9 @@ if __name__ == '__main__':
filters.append(fltr_object)
syncer_setup_func = None
if config.true('_SINGLE'):
if config.true('_HEAD'):
syncer_setup_func = setup_backend_head
elif config.true('_SINGLE'):
syncer_setup_func = setup_backend_single
else:
syncer_setup_func = setup_backend_resume
@ -240,6 +270,7 @@ if __name__ == '__main__':
syncers = syncer_setup_func(
chain_spec,
block_offset,
block_limit,
state_dir,
cache_filter.block_callback,
chain_interface,
@ -247,6 +278,9 @@ if __name__ == '__main__':
skip_history=config.true('_NO_HISTORY'),
)
out_filter = OutFilter(rules_filter=address_rules)
filters.append(out_filter)
i = 0
for syncer in syncers:
logg.info('running syncer index {} {}'.format(i, str(syncer)))