From a7218e96013d85f4e3f21f17bd25fa4bf704fd2b Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 8 Mar 2022 17:11:15 +0000 Subject: [PATCH] Implement keep-alive flag --- CHANGELOG | 2 ++ eth_monitor/runnable/sync.py | 41 ++++++++++++++++++++++++------------ setup.cfg | 2 +- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 7b693ba..8dcdf1f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,5 @@ +- 0.2.1 + * Implement --keep-alive flag - 0.2.0 * Dependency upgrades - 0.1.0 diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index ed7c230..e0ab870 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -105,6 +105,7 @@ config.add(args.offset, '_SYNC_OFFSET', True) config.add(args.skip_history, '_NO_HISTORY', True) config.add(args.single, '_SINGLE', True) config.add(args.head, '_HEAD', True) +config.add(args.keep_alive, '_KEEP_ALIVE', True) override(config, 'renderer', env=os.environ, args=args) override(config, 'filter', env=os.environ, args=args) logg.debug('loaded config:\n{}'.format(config)) @@ -237,7 +238,7 @@ def setup_cache_filter(rules_filter=None): return CacheFilter(rules_filter=rules_filter) -def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): +def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=False): syncers = [] syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir) if len(syncer_backends) == 0: @@ -258,42 +259,55 @@ def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callb syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=callback)) #RuledFilter.block_callback)) syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir) - syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=callback)) + + if len(syncers) == 0 or keep_alive: + head_syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) + syncers.append(HeadSyncer(head_syncer_backend, chain_interface, block_callback=callback)) + return syncers -def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): - logg.debug('block limit {}'.format(block_limit)) +def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=False): + logg.info('block limit {} offset {} syncoffset {}'.format(block_limit, block_offset, sync_offset)) syncer_backend = FileBackend.initial(chain_spec, block_limit, start_block_height=sync_offset, base_dir=state_dir) + syncers = [] syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=callback) - return [syncer] + syncers.append(syncer) + if keep_alive: + logg.info('i have keep alive') + head_syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) + syncers.append(HeadSyncer(head_syncer_backend, chain_interface, block_callback=callback)) + return syncers -def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): +def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=True): syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=callback) return [syncer] def main(): + keep_alive = False session_block_offset = 0 if args.head: session_block_offset = -1 + keep_alive = True else: session_block_offset = args.offset + o = block_latest() + r = rpc.do(o) + block_offset = int(strip_0x(r), 16) + 1 + logg.info('network block height is {}'.format(block_offset)) + block_limit = 0 if args.until > 0: if not args.head and args.until <= block_offset: raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(block_offset, args.until)) block_limit = args.until - - - - o = block_latest() - r = rpc.do(o) - block_offset = int(strip_0x(r), 16) + 1 - logg.info('network block height is {}'.format(block_offset)) + elif config.true('_KEEP_ALIVE'): + keep_alive=True + block_limit = block_offset if session_block_offset == -1: session_block_offset = block_offset @@ -357,6 +371,7 @@ def main(): chain_interface, sync_offset=config.get('_SYNC_OFFSET'), skip_history=config.true('_NO_HISTORY'), + keep_alive=keep_alive, ) out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods) diff --git a/setup.cfg b/setup.cfg index 70f0368..ebb3572 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = eth-monitor -version = 0.2.0 +version = 0.2.1 description = Monitor and cache transactions using match filters author = Louis Holbrook author_email = dev@holbrook.no