From 2999695bd4792719f64ca0f7f1b97dad6550450b Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 30 Mar 2022 08:11:46 +0000 Subject: [PATCH] Implement sync runnable on shep chainsyncer --- eth_monitor/filters/base.py | 7 +- eth_monitor/filters/cache.py | 4 +- eth_monitor/filters/out.py | 3 +- eth_monitor/runnable/sync.py | 130 ++++++++++++----------------------- setup.cfg | 2 +- 5 files changed, 54 insertions(+), 92 deletions(-) diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py index 983f04f..08cf208 100644 --- a/eth_monitor/filters/base.py +++ b/eth_monitor/filters/base.py @@ -5,11 +5,12 @@ import json # external imports from hexathon import strip_0x +from chainsyncer.filter import SyncFilter logg = logging.getLogger(__name__) -class RuledFilter: +class RuledFilter(SyncFilter): def __init__(self, rules_filter=None): if self.store.chain_dir == None: @@ -34,5 +35,5 @@ class RuledFilter: if self.rules_filter != None: if not self.rules_filter.apply_rules(tx): logg.debug('rule match failed for tx {}'.format(tx.hash)) - return False - return True + return True + return False diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py index 7db893d..1c24896 100644 --- a/eth_monitor/filters/cache.py +++ b/eth_monitor/filters/cache.py @@ -15,8 +15,8 @@ class Filter(RuledFilter): def filter(self, conn, block, tx, db_session=None): r = super(Filter, self).filter(conn, block, tx, db_session=db_session) - if r == False: + if r == True: return True self.ruled_filter(conn, block, tx, db_session=db_session) - return True + return False diff --git a/eth_monitor/filters/out.py b/eth_monitor/filters/out.py index a3844d6..d927771 100644 --- a/eth_monitor/filters/out.py +++ b/eth_monitor/filters/out.py @@ -45,7 +45,7 @@ class OutFilter(RuledFilter): def filter(self, conn, block, tx, db_session=None): r = super(OutFilter, self).filter(conn, block, tx, db_session=db_session) - if r == False: + if r == True: return True for renderer in self.renderers: @@ -65,3 +65,4 @@ class OutFilter(RuledFilter): self.w.write(s + '\n') self.c += 1 + return False diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index e0ab870..19d734c 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -6,6 +6,7 @@ import confini import logging import os import importlib +import uuid # external imports from chainlib.chain import ChainSpec @@ -15,10 +16,10 @@ from hexathon import ( strip_0x, add_0x, ) -from chainsyncer.driver.head import HeadSyncer -from chainsyncer.driver.history import HistorySyncer -from chainsyncer.backend.file import FileBackend -from chainsyncer.filter import NoopFilter +from chainsyncer.store.fs import SyncFsStore +from chainsyncer.driver.chain_interface import ChainInterfaceDriver +from chainsyncer.error import SyncDone + from eth_cache.rpc import CacheRPC from eth_cache.store.file import FileStore @@ -71,8 +72,10 @@ argparser.add_argument('--address-file', type=str, dest='excludes_file', help='L argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output') argparser.add_argument('--filter', type=str, action='append', help='Add python module filter path') argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') +argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state') argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available') argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states') +argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id') argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-vv', action='store_true', help='Be more verbose') argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose') @@ -106,14 +109,21 @@ config.add(args.skip_history, '_NO_HISTORY', True) config.add(args.single, '_SINGLE', True) config.add(args.head, '_HEAD', True) config.add(args.keep_alive, '_KEEP_ALIVE', True) +config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True) +config.add(args.cache_dir, '_CACHE_DIR', True) +config.add(args.session_id, '_SESSION_ID', True) override(config, 'renderer', env=os.environ, args=args) override(config, 'filter', env=os.environ, args=args) + +if config.get('_SESSION_ID') == None: + if not config.get('_SINGLE'): + #config.add('_SESSION_ID', str(uuid.uuid4()), True) + #else: + config.add('default', '_SESSION_ID', True) logg.debug('loaded config:\n{}'.format(config)) chain_spec = ChainSpec.from_chain_str(args.i) -state_dir = os.path.join(exec_dir, 'state') - rpc_id_generator = None if args.seq: rpc_id_generator = IntSequenceGenerator() @@ -238,76 +248,41 @@ def setup_cache_filter(rules_filter=None): return CacheFilter(rules_filter=rules_filter) -def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=False): - syncers = [] - syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir) - if len(syncer_backends) == 0: - initial_block_start = block_offset - 1 - if config.get('_SYNC_OFFSET') != None: - initial_block_start = config.get('_SYNC_OFFSET') - initial_block_offset = block_offset - if skip_history: - initial_block_start = block_offset - initial_block_offset += 1 - syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir)) - logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) - else: - for syncer_backend in syncer_backends: - logg.info('resuming sync session {}'.format(syncer_backend)) - - for syncer_backend in syncer_backends: - syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=callback)) #RuledFilter.block_callback)) - - syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir) - - if len(syncers) == 0 or keep_alive: - head_syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) - syncers.append(HeadSyncer(head_syncer_backend, chain_interface, block_callback=callback)) - - return syncers +def pre_callback(): + logg.debug('starting sync loop iteration') -def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=False): - logg.info('block limit {} offset {} syncoffset {}'.format(block_limit, block_offset, sync_offset)) - syncer_backend = FileBackend.initial(chain_spec, block_limit, start_block_height=sync_offset, base_dir=state_dir) - syncers = [] - syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=callback) - syncers.append(syncer) - if keep_alive: - logg.info('i have keep alive') - head_syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) - syncers.append(HeadSyncer(head_syncer_backend, chain_interface, block_callback=callback)) - return syncers +def post_callback(): + logg.debug('ending sync loop iteration') -def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=True): - syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) - syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=callback) - return [syncer] +def block_callback(block, tx): + logg.debug('processing block {}'.format(block)) def main(): - keep_alive = False - session_block_offset = 0 - if args.head: - session_block_offset = -1 - keep_alive = True - else: - session_block_offset = args.offset - o = block_latest() r = rpc.do(o) block_offset = int(strip_0x(r), 16) + 1 logg.info('network block height is {}'.format(block_offset)) + keep_alive = False + session_block_offset = 0 block_limit = 0 + if args.head: + session_block_offset = block_offset + block_limit = -1 + keep_alive = True + else: + session_block_offset = args.offset + if args.until > 0: if not args.head and args.until <= block_offset: raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(block_offset, args.until)) block_limit = args.until elif config.true('_KEEP_ALIVE'): keep_alive=True - block_limit = block_offset + block_limit = -1 if session_block_offset == -1: session_block_offset = block_offset @@ -328,7 +303,7 @@ def main(): store = setup_filter( chain_spec, - args.cache_dir, + config.get('_CACHE_DIR'), bool(args.store_tx_data), bool(args.store_block_data), ) @@ -353,44 +328,29 @@ def main(): renderers_mods.append(m) logg.info('using renderer module {}'.format(renderer)) - syncer_setup_func = None - if config.true('_HEAD'): - syncer_setup_func = setup_backend_head - elif config.true('_SINGLE'): - syncer_setup_func = setup_backend_single - else: - syncer_setup_func = setup_backend_resume - chain_interface = EthChainInterface() - syncers = syncer_setup_func( - chain_spec, - block_offset, - block_limit, - state_dir, - cache_filter.block_callback, - chain_interface, - sync_offset=config.get('_SYNC_OFFSET'), - skip_history=config.true('_NO_HISTORY'), - keep_alive=keep_alive, - ) out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods) filters.append(out_filter) + + sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID')) + logg.info('session is {}'.format(sync_store.session_id)) + + for fltr in filters: + sync_store.register(fltr) + drv = ChainInterfaceDriver(sync_store, chain_interface, offset=session_block_offset, target=block_limit, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback) use_rpc = rpc if not args.fresh: use_rpc = CacheRPC(rpc, store) i = 0 - for syncer in syncers: - logg.info('running syncer index {} {}'.format(i, str(syncer))) - for f in filters: - syncer.add_filter(f) + try: + r = drv.run(use_rpc) + except SyncDone as e: + sys.stderr.write("sync {} done at block {}\n".format(drv, e)) - r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), use_rpc) - sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) - - i += 1 + i += 1 if __name__ == '__main__': diff --git a/setup.cfg b/setup.cfg index ebb3572..97cb56f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = eth-monitor -version = 0.2.1 +version = 0.3.0 description = Monitor and cache transactions using match filters author = Louis Holbrook author_email = dev@holbrook.no