From 2a542568212b5f04f6e9111a32acc8456942e0f0 Mon Sep 17 00:00:00 2001 From: lash Date: Sat, 19 Aug 2023 11:04:52 +0100 Subject: [PATCH] Add state rundir with block height output --- CHANGELOG | 1 + eth_monitor/cli/arg.py | 1 + eth_monitor/cli/config.py | 1 + eth_monitor/filters/block.py | 6 +++++- eth_monitor/filters/run.py | 16 +++++++++++++++ eth_monitor/run.py | 17 ++++++++++++++++ eth_monitor/runnable/sync.py | 3 +++ eth_monitor/settings.py | 38 ++++++++++++++++++++++++++++++++++++ setup.cfg | 2 +- 9 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 eth_monitor/filters/run.py create mode 100644 eth_monitor/run.py diff --git a/CHANGELOG b/CHANGELOG index cb03b58..9a6b4f1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,7 @@ * Skip rules filter processing for cache when deactivated * Add match-all flag to rule processing * Add match-all flag to CLI to toggle setting match_all flag to rule processing for include criteria + * Implement state dir (rundir), and last synced block output - 0.8.7 * Upgrade chainsyncer (and shep) to avoid state deletion on partial filter list interrupts - 0.8.6 diff --git a/eth_monitor/cli/arg.py b/eth_monitor/cli/arg.py index bc4359f..c5316d4 100644 --- a/eth_monitor/cli/arg.py +++ b/eth_monitor/cli/arg.py @@ -34,3 +34,4 @@ def process_args(argparser, args, flags): # misc flags argparser.add_argument('-k', '--context-key', dest='context_key', action='append', type=str, help='Add a key-value pair to be added to the context') + argparser.add_argument('--run-dir', type=str, dest='run_dir', help='Output key sync and processing state properties to given diretory') diff --git a/eth_monitor/cli/config.py b/eth_monitor/cli/config.py index 59a069b..0a96308 100644 --- a/eth_monitor/cli/config.py +++ b/eth_monitor/cli/config.py @@ -56,6 +56,7 @@ def process_config(config, arg, args, flags): config.add(getattr(args, 'session_id'), '_SESSION_ID', False) config.add(getattr(args, 'cache_dir'), '_CACHE_DIR', False) + config.add(getattr(args, 'run_dir'), '_RUN_DIR', False) config.add(getattr(args, 'fresh'), '_FRESH', False) return config diff --git a/eth_monitor/filters/block.py b/eth_monitor/filters/block.py index 0b25698..ff9e1a6 100644 --- a/eth_monitor/filters/block.py +++ b/eth_monitor/filters/block.py @@ -1,3 +1,7 @@ +# standard imports +import os + + class Filter: def __init__(self, store, include_block_data=False): @@ -5,5 +9,5 @@ class Filter: self.include_block_data = include_block_data - def filter(self, conn, block): + def filter(self, conn, block, **kwargs): self.store.put_block(block, include_data=self.include_block_data) diff --git a/eth_monitor/filters/run.py b/eth_monitor/filters/run.py new file mode 100644 index 0000000..d49458a --- /dev/null +++ b/eth_monitor/filters/run.py @@ -0,0 +1,16 @@ +# standard imports +import os + + +class Filter: + + def __init__(self, run_dir): + self.run_dir = run_dir + self.fp = os.path.join(run_dir, 'block') + + + def filter(self, conn, block): + f = open(self.fp, 'w') + f.write(str(block.number)) + f.close() + return False diff --git a/eth_monitor/run.py b/eth_monitor/run.py new file mode 100644 index 0000000..c937ff4 --- /dev/null +++ b/eth_monitor/run.py @@ -0,0 +1,17 @@ +# standard imports +import os +import logging + +logg = logging.getLogger(__name__) + + +def cleanup_run(settings): + if not settings.get('RUN_OUT'): + return + lockfile = os.path.join(settings.get('RUN_DIR'), '.lock') + os.unlink(lockfile) + logg.debug('freed rundir {}'.format(settings.get('RUN_DIR'))) + + +def cleanup(settings): + cleanup_run(settings) diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 6f3c4f5..333d119 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -47,6 +47,7 @@ from eth_monitor.callback import ( import eth_monitor.cli from eth_monitor.cli.log import process_log from eth_monitor.settings import process_settings as process_settings_local +from eth_monitor.run import cleanup logg = logging.getLogger() @@ -111,6 +112,8 @@ def main(): except SyncDone as e: sys.stderr.write("sync {} done at block {}\n".format(drv, e)) + cleanup(settings) + if __name__ == '__main__': main() diff --git a/eth_monitor/settings.py b/eth_monitor/settings.py index 92a3423..f90b18b 100644 --- a/eth_monitor/settings.py +++ b/eth_monitor/settings.py @@ -34,6 +34,7 @@ from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.config import override, list_from_prefix from eth_monitor.filters.out import OutFilter from eth_monitor.filters.block import Filter as BlockFilter +from eth_monitor.filters.run import Filter as RunFilter logg = logging.getLogger(__name__) @@ -47,6 +48,32 @@ def process_monitor_session(settings, config): session_id = 'default' settings.set('SESSION_ID', session_id) + settings.set('SESSION_OK', True) + return settings + + +def process_monitor_rundir(settings, config): + settings.set('RUN_OUT', False) + if config.get('_RUN_DIR') == None: + return settings + + run_dir = config.get('_RUN_DIR') + try: + os.makedirs(run_dir, exist_ok=True) + except Exception as e: + logg.error('could not create run dir, deactivating run output: ' + str(e)) + return settings + + lockfile = os.path.join(run_dir, '.lock') + try: + f = open(lockfile, 'x') + f.close() + except FileExistsError: + logg.error('run dir {} is already in use, deactivating run output'.format(run_dir)) + return settings + + settings.set('RUN_OUT', True) + settings.set('RUN_DIR', run_dir) return settings @@ -288,6 +315,14 @@ def process_cache_filter(settings, config): return settings +def process_run_filter(settings, config): + if not settings.get('RUN_OUT'): + return settings + fltr = RunFilter(settings.get('RUN_DIR')) + hndlr = settings.get('BLOCK_HANDLER') + hndlr.register(fltr) + return settings + def process_tx_filter(settings, config): for fltr in list_from_prefix(config, 'filter'): m = importlib.import_module(fltr) @@ -335,6 +370,7 @@ def process_filter(settings, config): settings = process_renderer(settings, config) settings = process_block_filter(settings, config) settings = process_cache_filter(settings, config) + settings = process_run_filter(settings, config) settings = process_tx_filter(settings, config) settings = process_out_filter(settings, config) settings = process_arg_filter(settings, config) @@ -383,6 +419,7 @@ def process_user_context(settings, config): ctx_usr[k] = v ctx = { 'driver': 'eth-monitor', + 'rundir': settings.get('RUN_DIR'), 'usr': ctx_usr, } settings.set('SYNCER_CONTEXT', ctx) @@ -392,6 +429,7 @@ def process_user_context(settings, config): def process_settings(settings, config): settings = process_monitor_session(settings, config) settings = process_monitor_session_dir(settings, config) + settings = process_monitor_rundir(settings, config) settings = process_arg_rules(settings, config) settings = process_sync(settings, config) settings = process_cache(settings, config) diff --git a/setup.cfg b/setup.cfg index 1a8be0b..fb87836 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = eth-monitor -version = 0.8.7 +version = 0.8.8 description = Monitor and cache transactions using match filters author = Louis Holbrook author_email = dev@holbrook.no